fix: Workaround for repeating logIndex (#10880)
* fix: Workaround for repeating logIndex * Fix tests * Done migration, but need to rewrite tuples usage * Migration rewritten * Fix tests * Fix test * Process review comments * Update cspell ignore * Refactoringpull/10977/head
parent
1bb721865b
commit
213a3247ae
@ -0,0 +1,271 @@ |
||||
defmodule Explorer.Migrator.SanitizeDuplicatedLogIndexLogs do |
||||
@moduledoc """ |
||||
This module is responsible for sanitizing duplicate log index entries in the database. |
||||
The migration process includes identifying duplicate log indexes and updating the related token transfers and token instances accordingly. |
||||
""" |
||||
|
||||
use Explorer.Migrator.FillingMigration |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Chain.Cache.BackgroundMigrations |
||||
alias Explorer.Chain.{Log, TokenTransfer} |
||||
alias Explorer.Chain.Token.Instance |
||||
alias Explorer.Migrator.FillingMigration |
||||
alias Explorer.Repo |
||||
|
||||
require Logger |
||||
|
||||
@migration_name "sanitize_duplicated_log_index_logs" |
||||
|
||||
@impl FillingMigration |
||||
def migration_name, do: @migration_name |
||||
|
||||
@impl FillingMigration |
||||
def last_unprocessed_identifiers(state) do |
||||
block_number = state[:block_number_to_process] || 0 |
||||
|
||||
limit = batch_size() * concurrency() |
||||
|
||||
ids = |
||||
block_number |
||||
|> unprocessed_data_query(block_number + limit) |
||||
|> Repo.all(timeout: :infinity) |
||||
|> Enum.group_by(& &1.block_hash) |
||||
|> Map.to_list() |
||||
|
||||
{ids, Map.put(state, :block_number_to_process, block_number + limit)} |
||||
end |
||||
|
||||
@doc """ |
||||
Stub implementation to satisfy FillingMigration behaviour |
||||
""" |
||||
@impl FillingMigration |
||||
@spec unprocessed_data_query() :: nil |
||||
def unprocessed_data_query do |
||||
nil |
||||
end |
||||
|
||||
def unprocessed_data_query(block_number_start, block_number_end) do |
||||
Log |
||||
|> where([l], l.block_number >= ^block_number_start and l.block_number < ^block_number_end) |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
@doc """ |
||||
Updates a batch of logs grouped by block. |
||||
|
||||
## Parameters |
||||
|
||||
- logs_by_block: A map where the keys are block identifiers and the values are lists of logs associated with those blocks. |
||||
|
||||
## Returns |
||||
|
||||
:ok |
||||
""" |
||||
def update_batch(logs_by_block) do |
||||
logs_to_update = |
||||
logs_by_block |
||||
|> Enum.map(&process_block/1) |
||||
|> Enum.reject(&(&1 == :ignore)) |
||||
|> List.flatten() |
||||
|
||||
{ids, logs, ids_to_new_index} = |
||||
logs_to_update |
||||
|> Enum.reduce({[], [], %{}}, fn {log, new_index}, {ids, logs, ids_to_new_index} -> |
||||
id = {log.transaction_hash, log.block_hash, log.index} |
||||
|
||||
{[id | ids], |
||||
[ |
||||
%Log{log | index: new_index} |> Map.from_struct() |> Map.drop([:block, :address, :transaction, :__meta__]) |
||||
| logs |
||||
], Map.put(ids_to_new_index, id, new_index)} |
||||
end) |
||||
|
||||
prepared_ids = |
||||
Enum.map(ids, fn {transaction_hash, block_hash, log_index} -> |
||||
{transaction_hash.bytes, block_hash.bytes, log_index} |
||||
end) |
||||
|
||||
Repo.transaction(fn -> |
||||
Log |
||||
|> where( |
||||
[log], |
||||
fragment( |
||||
"(?, ?, ?) = ANY(?::log_id[])", |
||||
log.transaction_hash, |
||||
log.block_hash, |
||||
log.index, |
||||
^prepared_ids |
||||
) |
||||
) |
||||
|> Repo.delete_all(timeout: :infinity) |
||||
|
||||
{_, token_transfers} = |
||||
TokenTransfer |
||||
|> where( |
||||
[token_transfer], |
||||
fragment( |
||||
"(?, ?, ?) = ANY(?::log_id[])", |
||||
token_transfer.transaction_hash, |
||||
token_transfer.block_hash, |
||||
token_transfer.log_index, |
||||
^prepared_ids |
||||
) |
||||
) |
||||
|> select([token_transfer], token_transfer) |
||||
|> Repo.delete_all(timeout: :infinity) |
||||
|
||||
Repo.insert_all(Log, logs, timeout: :infinity) |
||||
|
||||
token_transfers |
||||
|> Enum.map(fn token_transfer -> |
||||
id = token_transfer_to_index(token_transfer) |
||||
|
||||
%TokenTransfer{token_transfer | log_index: ids_to_new_index[id]} |
||||
|> Map.from_struct() |
||||
|> Map.drop([ |
||||
:token_id, |
||||
:index_in_batch, |
||||
:reverse_index_in_batch, |
||||
:token_decimals, |
||||
:from_address, |
||||
:to_address, |
||||
:token_contract_address, |
||||
:block, |
||||
:instances, |
||||
:token, |
||||
:transaction, |
||||
:__meta__ |
||||
]) |
||||
end) |
||||
|> (&Repo.insert_all(TokenTransfer, &1, timeout: :infinity)).() |
||||
|
||||
nft_instances_params = |
||||
token_transfers |
||||
|> Enum.filter(&(&1.token_type == "ERC-721")) |
||||
|> Enum.map(fn token_transfer -> {token_transfer.block_number, token_transfer.log_index} end) |
||||
|
||||
nft_updates_map = |
||||
token_transfers |
||||
|> Enum.filter(&(&1.token_type == "ERC-721" && &1.block_consensus)) |
||||
|> Enum.reduce(%{}, fn token_transfer, acc -> |
||||
id = token_transfer_to_index(token_transfer) |
||||
Map.put(acc, {token_transfer.block_number, token_transfer.log_index}, ids_to_new_index[id]) |
||||
end) |
||||
|
||||
Instance |
||||
|> where( |
||||
[nft], |
||||
fragment( |
||||
"(?, ?) = ANY(?::nft_id[])", |
||||
nft.owner_updated_at_block, |
||||
nft.owner_updated_at_log_index, |
||||
^nft_instances_params |
||||
) |
||||
) |
||||
|> Repo.all(timeout: :infinity) |
||||
|> Enum.map(fn nft -> |
||||
%Instance{ |
||||
nft |
||||
| owner_updated_at_log_index: nft_updates_map[{nft.owner_updated_at_block, nft.owner_updated_at_log_index}] |
||||
} |
||||
|> Map.from_struct() |
||||
|> Map.drop([ |
||||
:current_token_balance, |
||||
:is_unique, |
||||
:owner, |
||||
:token, |
||||
:__meta__ |
||||
]) |
||||
end) |
||||
|> (&Repo.insert_all(Instance, &1, |
||||
conflict_target: [:token_contract_address_hash, :token_id], |
||||
on_conflict: {:replace, [:owner_updated_at_log_index]}, |
||||
timeout: :infinity |
||||
)).() |
||||
end) |
||||
|
||||
:ok |
||||
end |
||||
|
||||
defp process_block({block_hash, logs}) do |
||||
if logs |> Enum.frequencies_by(& &1.index) |> Map.values() |> Enum.max() == 1 do |
||||
:ignore |
||||
else |
||||
Logger.error("Found logs with same index within one block: #{block_hash} in DB") |
||||
|
||||
logs = Repo.preload(logs, :transaction) |
||||
|
||||
logs |
||||
|> Enum.sort_by(&{&1.transaction.index, &1.index, &1.transaction_hash}) |
||||
# credo:disable-for-next-line Credo.Check.Refactor.Nesting |
||||
|> Enum.map_reduce(0, fn log, index -> |
||||
{{log, index}, index + 1} |
||||
end) |
||||
|> elem(0) |
||||
end |
||||
end |
||||
|
||||
@impl FillingMigration |
||||
def update_cache do |
||||
BackgroundMigrations.set_sanitize_duplicated_log_index_logs_finished(true) |
||||
end |
||||
|
||||
defp token_transfer_to_index(token_transfer) do |
||||
{token_transfer.transaction_hash, token_transfer.block_hash, token_transfer.log_index} |
||||
end |
||||
|
||||
@doc """ |
||||
Callback function that is executed before the migration process starts. |
||||
""" |
||||
@impl FillingMigration |
||||
def before_start do |
||||
""" |
||||
DO $$ |
||||
BEGIN |
||||
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'log_id') THEN |
||||
CREATE TYPE log_id AS ( |
||||
transaction_hash bytea, |
||||
block_hash bytea, |
||||
log_index integer |
||||
); |
||||
END IF; |
||||
END$$; |
||||
""" |
||||
|> Repo.query!() |
||||
|
||||
""" |
||||
DO $$ |
||||
BEGIN |
||||
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'nft_id') THEN |
||||
CREATE TYPE nft_id AS ( |
||||
block_number bigint, |
||||
log_index integer |
||||
); |
||||
END IF; |
||||
END$$; |
||||
""" |
||||
|> Repo.query!() |
||||
|
||||
:ok |
||||
end |
||||
|
||||
@doc """ |
||||
Callback function that is executed when the migration process finishes. |
||||
""" |
||||
@impl FillingMigration |
||||
def on_finish do |
||||
""" |
||||
DROP TYPE log_id; |
||||
""" |
||||
|> Repo.query!([], timeout: :infinity) |
||||
|
||||
""" |
||||
DROP TYPE nft_id; |
||||
""" |
||||
|> Repo.query!([], timeout: :infinity) |
||||
|
||||
:ok |
||||
end |
||||
end |
@ -0,0 +1,144 @@ |
||||
defmodule Explorer.Migrator.SanitizeDuplicatedLogIndexLogsTest do |
||||
use Explorer.DataCase, async: false |
||||
|
||||
alias Explorer.Chain.Cache.BackgroundMigrations |
||||
alias Explorer.Chain.Log |
||||
alias Explorer.Chain.TokenTransfer |
||||
alias Explorer.Chain.Token.Instance |
||||
alias Explorer.Migrator.{SanitizeDuplicatedLogIndexLogs, MigrationStatus} |
||||
|
||||
describe "Sanitize duplicated log index logs" do |
||||
test "correctly identifies and updates duplicated log index logs" do |
||||
block = insert(:block) |
||||
|
||||
tx1 = :transaction |> insert() |> with_block(block, index: 0) |
||||
tx2 = :transaction |> insert() |> with_block(block, index: 1) |
||||
|
||||
_log1 = insert(:log, transaction: tx1, index: 3, data: "0x01", block: block, block_number: block.number) |
||||
_log2 = insert(:log, transaction: tx1, index: 0, data: "0x02", block: block, block_number: block.number) |
||||
_log3 = insert(:log, transaction: tx2, index: 3, data: "0x03", block: block, block_number: block.number) |
||||
|
||||
log4 = insert(:log) |
||||
|
||||
assert MigrationStatus.get_status("sanitize_duplicated_log_index_logs") == nil |
||||
|
||||
SanitizeDuplicatedLogIndexLogs.start_link([]) |
||||
Process.sleep(300) |
||||
|
||||
assert MigrationStatus.get_status("sanitize_duplicated_log_index_logs") == "completed" |
||||
assert BackgroundMigrations.get_sanitize_duplicated_log_index_logs_finished() == true |
||||
|
||||
updated_logs = Repo.all(Log |> where([log], log.block_number == ^block.number) |> order_by([log], asc: log.index)) |
||||
|
||||
assert match?( |
||||
[ |
||||
%{index: 0, data: %Explorer.Chain.Data{bytes: <<2>>}}, |
||||
%{index: 1, data: %Explorer.Chain.Data{bytes: <<1>>}}, |
||||
%{index: 2, data: %Explorer.Chain.Data{bytes: <<3>>}} |
||||
], |
||||
updated_logs |
||||
) |
||||
|
||||
assert %Log{log4 | address: nil, block: nil, transaction: nil} == %Log{ |
||||
Repo.one(Log |> where([log], log.block_number != ^block.number)) |
||||
| address: nil, |
||||
block: nil, |
||||
transaction: nil |
||||
} |
||||
end |
||||
|
||||
test "correctly identifies and updates duplicated log index logs & updates corresponding token transfers and token instances" do |
||||
block = insert(:block) |
||||
token_address = insert(:contract_address) |
||||
insert(:token, contract_address: token_address, type: "ERC-721") |
||||
|
||||
instance = insert(:token_instance, token_contract_address_hash: token_address.hash) |
||||
|
||||
tx1 = :transaction |> insert() |> with_block(block, index: 0) |
||||
tx2 = :transaction |> insert() |> with_block(block, index: 1) |
||||
|
||||
log1 = insert(:log, transaction: tx1, index: 3, data: "0x01", block: block, block_number: block.number) |
||||
log2 = insert(:log, transaction: tx1, index: 0, data: "0x02", block: block, block_number: block.number) |
||||
log3 = insert(:log, transaction: tx2, index: 3, data: "0x03", block: block, block_number: block.number) |
||||
|
||||
log4 = insert(:log) |
||||
|
||||
_tt1 = |
||||
insert(:token_transfer, |
||||
token_type: "ERC-721", |
||||
block: block, |
||||
block_number: block.number, |
||||
log_index: log1.index, |
||||
token_ids: [instance.token_id], |
||||
token_contract_address: token_address, |
||||
token_contract_address_hash: token_address.hash, |
||||
transaction: tx1, |
||||
transaction_hash: tx1.hash, |
||||
block_hash: block.hash |
||||
) |
||||
|
||||
_tt2 = |
||||
insert(:token_transfer, |
||||
block: block, |
||||
block_number: block.number, |
||||
log_index: log2.index, |
||||
transaction: tx1, |
||||
transaction_hash: tx1.hash |
||||
) |
||||
|
||||
_tt3 = |
||||
insert(:token_transfer, |
||||
block: block, |
||||
block_number: block.number, |
||||
log_index: log3.index, |
||||
transaction: tx2, |
||||
transaction_hash: tx2.hash |
||||
) |
||||
|
||||
Instance.changeset(instance, %{owner_updated_at_block: block.number, owner_updated_at_log_index: log1.index}) |
||||
|> Repo.update!() |
||||
|
||||
assert MigrationStatus.get_status("sanitize_duplicated_log_index_logs") == nil |
||||
|
||||
SanitizeDuplicatedLogIndexLogs.start_link([]) |
||||
Process.sleep(300) |
||||
|
||||
assert MigrationStatus.get_status("sanitize_duplicated_log_index_logs") == "completed" |
||||
assert BackgroundMigrations.get_sanitize_duplicated_log_index_logs_finished() == true |
||||
|
||||
updated_logs = Repo.all(Log |> where([log], log.block_number == ^block.number) |> order_by([log], asc: log.index)) |
||||
|
||||
assert match?( |
||||
[ |
||||
%{index: 0, data: %Explorer.Chain.Data{bytes: <<2>>}}, |
||||
%{index: 1, data: %Explorer.Chain.Data{bytes: <<1>>}}, |
||||
%{index: 2, data: %Explorer.Chain.Data{bytes: <<3>>}} |
||||
], |
||||
updated_logs |
||||
) |
||||
|
||||
block_number = block.number |
||||
assert [%{owner_updated_at_block: ^block_number, owner_updated_at_log_index: 1}] = Repo.all(Instance) |
||||
|
||||
assert [%{log_index: 1, block_number: ^block_number}] = |
||||
Repo.all(TokenTransfer |> where([tt], tt.token_type == "ERC-721")) |
||||
|
||||
assert %Log{log4 | address: nil, block: nil, transaction: nil} == %Log{ |
||||
Repo.one(Log |> where([log], log.block_number != ^block.number)) |
||||
| address: nil, |
||||
block: nil, |
||||
transaction: nil |
||||
} |
||||
end |
||||
|
||||
test "correctly handles cases where there are no duplicated log index logs" do |
||||
assert MigrationStatus.get_status("sanitize_duplicated_log_index_logs") == nil |
||||
|
||||
SanitizeDuplicatedLogIndexLogs.start_link([]) |
||||
Process.sleep(100) |
||||
|
||||
assert MigrationStatus.get_status("sanitize_duplicated_log_index_logs") == "completed" |
||||
assert BackgroundMigrations.get_sanitize_duplicated_log_index_logs_finished() == true |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue