diff --git a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex index 209d5d2fa6..3c949270cf 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -9,7 +9,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do alias Ecto.Adapters.SQL alias Ecto.{Changeset, Multi, Repo} - alias Explorer.Chain.{Address, Block, Hash, Import, InternalTransaction, Transaction} + alias Explorer.Chain.{Address, Block, Hash, Import, InternalTransaction, Transaction, TokenTransfer} alias Explorer.Chain.Block.Reward alias Explorer.Chain.Import.Runner alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances @@ -73,6 +73,21 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |> Multi.run(:lose_invalid_neighbour_consensus, fn repo, _ -> lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, insert_options) end) + |> Multi.run(:remove_nonconsensus_data, fn repo, + %{ + lose_consensus: lost_consensus_blocks, + lose_invalid_neighbour_consensus: lost_consensus_neighbours + } -> + nonconsensus_block_numbers = + lost_consensus_blocks + |> Kernel.++(lost_consensus_neighbours) + |> Enum.map(fn %{number: number} -> + number + end) + |> Enum.sort() + + remove_nonconsensus_data(repo, nonconsensus_block_numbers, insert_options) + end) |> Multi.run(:delete_address_token_balances, fn repo, _ -> delete_address_token_balances(repo, ordered_consensus_block_numbers, insert_options) end) @@ -342,6 +357,44 @@ defmodule Explorer.Chain.Import.Runner.Blocks do end end + defp remove_nonconsensus_data(repo, nonconsensus_block_numbers, insert_options) do + with {:ok, deleted_token_transfers} <- + remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, insert_options) do + {:ok, %{token_transfers: deleted_token_transfers}} + end + end + + defp remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, %{timeout: timeout}) do + ordered_token_transfers = + from(token_transfer in TokenTransfer, + where: token_transfer.block_number in ^nonconsensus_block_numbers, + select: map(token_transfer, [:transaction_hash, :log_index]), + order_by: [ + token_transfer.transaction_hash, + token_transfer.log_index + ], + lock: "FOR UPDATE" + ) + + query = + from(token_transfer in TokenTransfer, + select: map(token_transfer, [:transaction_hash, :log_index]), + inner_join: ordered_token_transfer in subquery(ordered_token_transfers), + on: + ordered_token_transfer.transaction_hash == + token_transfer.transaction_hash and + ordered_token_transfer.log_index == token_transfer.log_index + ) + + try do + {_count, deleted_token_transfers} = repo.delete_all(query, timeout: timeout) + + {:ok, deleted_token_transfers} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}} + end + end defp delete_address_token_balances(_, [], _), do: {:ok, []} defp delete_address_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do diff --git a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs index 63110cba40..f1ebaa4c77 100644 --- a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs @@ -7,13 +7,14 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do alias Ecto.Multi alias Explorer.Chain.Import.Runner.{Blocks, Transactions} - alias Explorer.Chain.{Address, Block, Transaction} + alias Explorer.Chain.{Address, Block, Transaction, TokenTransfer} alias Explorer.Chain alias Explorer.Repo describe "run/1" do setup do - block = insert(:block, consensus: true) + miner = insert(:address) + block = params_for(:block, consensus: true, miner_hash: miner.hash) timestamp = DateTime.utc_now() options = %{timestamps: %{inserted_at: timestamp, updated_at: timestamp}} @@ -22,9 +23,11 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do end test "derive_transaction_forks replaces hash on conflicting (uncle_hash, index)", %{ - consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number} = consensus_block, + consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options } do + consensus_block = insert(:block, %{hash: block_hash, number: block_number}) + transaction = :transaction |> insert() @@ -81,7 +84,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do end test "delete_address_current_token_balances deletes rows with matching block number when consensus is true", - %{consensus_block: %Block{number: block_number} = block, options: options} do + %{consensus_block: %{number: block_number} = block, options: options} do %Address.CurrentTokenBalance{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} = insert(:address_current_token_balance, block_number: block_number) @@ -98,7 +101,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do end test "delete_address_current_token_balances does not delete rows with matching block number when consensus is false", - %{consensus_block: %Block{number: block_number} = block, options: options} do + %{consensus_block: %{number: block_number} = block, options: options} do %Address.CurrentTokenBalance{} = insert(:address_current_token_balance, block_number: block_number) count = 1 @@ -113,8 +116,47 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do assert count(Address.CurrentTokenBalance) == count end + test "remove_nonconsensus_data deletes rows with matching block number when new consensus block is inserted", + %{consensus_block: %{number: block_number} = block, options: options} do + insert(:block, number: block_number, consensus: true) + + %TokenTransfer{transaction_hash: transaction_hash, log_index: log_index} = + insert(:token_transfer, block_number: block_number, transaction: insert(:transaction)) + + assert count(TokenTransfer) == 1 + + assert {:ok, + %{ + remove_nonconsensus_data: %{ + token_transfers: [ + %{transaction_hash: ^transaction_hash, log_index: ^log_index} + ] + } + }} = run_block_consensus_change(block, true, options) + + assert count(TokenTransfer) == 0 + end + + test "delete_token_transfers does not delete rows with matching block number when consensus is false", + %{consensus_block: %{number: block_number} = block, options: options} do + insert(:token_transfer, block_number: block_number, transaction: insert(:transaction)) + + count = 1 + + assert count(TokenTransfer) == count + + assert {:ok, + %{ + remove_nonconsensus_data: %{ + token_transfers: [] + } + }} = run_block_consensus_change(block, false, options) + + assert count(TokenTransfer) == count + end + test "derive_address_current_token_balances inserts rows if there is an address_token_balance left for the rows deleted by delete_address_current_token_balances", - %{consensus_block: %Block{number: block_number} = block, options: options} do + %{consensus_block: %{number: block_number} = block, options: options} do token = insert(:token) token_contract_address_hash = token.contract_address_hash @@ -172,7 +214,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do end test "a non-holder reverting to a holder increases the holder_count", - %{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do + %{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do token = insert(:token) token_contract_address_hash = token.contract_address_hash @@ -204,7 +246,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do end test "a holder reverting to a non-holder decreases the holder_count", - %{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do + %{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do token = insert(:token) token_contract_address_hash = token.contract_address_hash @@ -236,7 +278,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do end test "a non-holder becoming and a holder becoming while a holder becomes a non-holder cancels out and holder_count does not change", - %{consensus_block: %Block{number: block_number} = block, options: options} do + %{consensus_block: %{number: block_number} = block, options: options} do token = insert(:token) token_contract_address_hash = token.contract_address_hash @@ -262,7 +304,8 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do # Regression test for https://github.com/poanetwork/blockscout/issues/1644 test "discards neighbouring blocks if they aren't related to the current one because of reorg and/or import timeout", - %{consensus_block: %Block{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do + %{consensus_block: %{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do + insert(:block, %{number: block_number, hash: block_hash}) old_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1) new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1) @@ -286,7 +329,8 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do # Regression test for https://github.com/poanetwork/blockscout/issues/1911 test "forces block refetch if transaction is re-collated in a different block", - %{consensus_block: %Block{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do + %{consensus_block: %{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do + insert(:block, %{number: block_number, hash: block_hash}) new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1) new_block2 = params_for(:block, miner_hash: miner_hash, parent_hash: new_block1.hash, number: block_number + 2) @@ -365,7 +409,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do end defp run_block_consensus_change( - %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, + %{hash: block_hash, miner_hash: miner_hash, number: block_number}, consensus, options ) do