diff --git a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex index 9371571532..6036354e8c 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -5,11 +5,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do require Ecto.Query - import Ecto.Query, only: [from: 2, select: 2, update: 2] + import Ecto.Query, only: [from: 2, select: 2, subquery: 1, update: 2] alias Ecto.Adapters.SQL alias Ecto.{Changeset, Multi, Repo} - alias Explorer.Chain.{Block, Import, InternalTransaction, Transaction} + alias Explorer.Chain.{Address, Block, Hash, Import, InternalTransaction, Transaction} alias Explorer.Chain.Block.Reward alias Explorer.Chain.Import.Runner @@ -43,6 +43,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |> Map.put_new(:timeout, @timeout) |> Map.put(:timestamps, timestamps) + ordered_consensus_block_numbers = ordered_consensus_block_numbers(changes_list) where_forked = where_forked(changes_list) multi @@ -64,9 +65,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do }) end) |> Multi.run(:lose_consenus, fn repo, _ -> - lose_consensus(repo, changes_list, insert_options) + lose_consensus(repo, ordered_consensus_block_numbers, insert_options) end) - |> Multi.run(:remove_rewards, fn repo, _ -> + |> Multi.run(:delete_address_token_balances, fn repo, _ -> + delete_address_token_balances(repo, ordered_consensus_block_numbers, insert_options) + end) + |> Multi.run(:delete_address_current_token_balances, fn repo, _ -> + delete_address_current_token_balances(repo, ordered_consensus_block_numbers, insert_options) + end) + |> Multi.run(:derive_address_current_token_balances, fn repo, + %{ + delete_address_current_token_balances: + deleted_address_current_token_balances + } -> + derive_address_current_token_balances(repo, deleted_address_current_token_balances, insert_options) + end) + |> Multi.run(:delete_rewards, fn repo, _ -> delete_rewards(repo, changes_list, insert_options) end) |> Multi.run(:blocks, fn repo, _ -> @@ -247,19 +261,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do ) end - defp lose_consensus(repo, blocks_changes, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) - when is_list(blocks_changes) do - ordered_consensus_block_number = - blocks_changes - |> Enum.reduce(MapSet.new(), fn - %{consensus: true, number: number}, acc -> - MapSet.put(acc, number) + defp ordered_consensus_block_numbers(blocks_changes) when is_list(blocks_changes) do + blocks_changes + |> Enum.reduce(MapSet.new(), fn + %{consensus: true, number: number}, acc -> + MapSet.put(acc, number) - %{consensus: false}, acc -> - acc - end) - |> Enum.sort() + %{consensus: false}, acc -> + acc + end) + |> Enum.sort() + end + defp lose_consensus(_, [], _), do: {:ok, []} + + defp lose_consensus(repo, ordered_consensus_block_number, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) + when is_list(ordered_consensus_block_number) do query = from( block in Block, @@ -283,6 +300,162 @@ defmodule Explorer.Chain.Import.Runner.Blocks do end end + defp delete_address_token_balances(_, [], _), do: {:ok, []} + + defp delete_address_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do + ordered_query = + from(address_token_balance in Address.TokenBalance, + where: address_token_balance.block_number in ^ordered_consensus_block_numbers, + select: map(address_token_balance, [:address_hash, :token_contract_address_hash, :block_number]), + # MUST match order in `Explorer.Chain.Import.Runner.Address.TokenBalances.insert` to prevent ShareLock ordering deadlocks. + order_by: [ + address_token_balance.address_hash, + address_token_balance.token_contract_address_hash, + address_token_balance.block_number + ], + # ensures rows remains locked while outer query is joining to it + lock: "FOR UPDATE" + ) + + query = + from(address_token_balance in Address.TokenBalance, + select: map(address_token_balance, [:address_hash, :token_contract_address_hash, :block_number]), + inner_join: ordered_address_token_balance in subquery(ordered_query), + on: + ordered_address_token_balance.address_hash == address_token_balance.address_hash and + ordered_address_token_balance.token_contract_address_hash == + address_token_balance.token_contract_address_hash and + ordered_address_token_balance.block_number == address_token_balance.block_number + ) + + try do + {_count, deleted_address_token_balances} = repo.delete_all(query, timeout: timeout) + + {:ok, deleted_address_token_balances} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, block_numbers: ordered_consensus_block_numbers}} + end + end + + defp delete_address_current_token_balances(_, [], _), do: {:ok, []} + + defp delete_address_current_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do + ordered_query = + from(address_current_token_balance in Address.CurrentTokenBalance, + where: address_current_token_balance.block_number in ^ordered_consensus_block_numbers, + select: map(address_current_token_balance, [:address_hash, :token_contract_address_hash]), + # MUST match order in `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert` to prevent ShareLock ordering deadlocks. + order_by: [ + address_current_token_balance.address_hash, + address_current_token_balance.token_contract_address_hash + ], + # ensures row remains locked while outer query is joining to it + lock: "FOR UPDATE" + ) + + query = + from(address_current_token_balance in Address.CurrentTokenBalance, + select: map(address_current_token_balance, [:address_hash, :token_contract_address_hash]), + inner_join: ordered_address_current_token_balance in subquery(ordered_query), + on: + ordered_address_current_token_balance.address_hash == address_current_token_balance.address_hash and + ordered_address_current_token_balance.token_contract_address_hash == + address_current_token_balance.token_contract_address_hash + ) + + try do + {_count, deleted_address_current_token_balances} = repo.delete_all(query, timeout: timeout) + + {:ok, deleted_address_current_token_balances} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, block_numbers: ordered_consensus_block_numbers}} + end + end + + defp derive_address_current_token_balances(_, [], _), do: {:ok, []} + + # sobelow_skip ["SQL.Query"] + defp derive_address_current_token_balances(repo, deleted_address_current_token_balances, %{timeout: timeout}) + when is_list(deleted_address_current_token_balances) do + initial_query = + from(address_token_balance in Address.TokenBalance, + select: %{ + address_hash: address_token_balance.address_hash, + token_contract_address_hash: address_token_balance.token_contract_address_hash, + block_number: max(address_token_balance.block_number) + }, + group_by: [address_token_balance.address_hash, address_token_balance.token_contract_address_hash] + ) + + final_query = + Enum.reduce(deleted_address_current_token_balances, initial_query, fn %{ + address_hash: address_hash, + token_contract_address_hash: + token_contract_address_hash + }, + acc_query -> + from(address_token_balance in acc_query, + or_where: + address_token_balance.address_hash == ^address_hash and + address_token_balance.token_contract_address_hash == ^token_contract_address_hash + ) + end) + + new_current_token_balance_query = + from(new_current_token_balance in subquery(final_query), + inner_join: address_token_balance in Address.TokenBalance, + on: + address_token_balance.address_hash == new_current_token_balance.address_hash and + address_token_balance.token_contract_address_hash == new_current_token_balance.token_contract_address_hash and + address_token_balance.block_number == new_current_token_balance.block_number, + select: { + new_current_token_balance.address_hash, + new_current_token_balance.token_contract_address_hash, + new_current_token_balance.block_number, + address_token_balance.value, + over(min(address_token_balance.inserted_at), :w), + over(max(address_token_balance.updated_at), :w) + }, + # Prevent ShareLock deadlock by matching order of `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert` + order_by: [new_current_token_balance.address_hash, new_current_token_balance.token_contract_address_hash], + windows: [ + w: [partition_by: [address_token_balance.address_hash, address_token_balance.token_contract_address_hash]] + ] + ) + + {select_sql, parameters} = SQL.to_sql(:all, repo, new_current_token_balance_query) + + # No `ON CONFLICT` because `delete_address_current_token_balances` should have removed any conflicts. + insert_sql = """ + INSERT INTO address_current_token_balances (address_hash, token_contract_address_hash, block_number, value, inserted_at, updated_at) + #{select_sql} + RETURNING address_hash, token_contract_address_hash, block_number + """ + + with {:ok, + %Postgrex.Result{ + columns: ["address_hash", "token_contract_address_hash", "block_number"], + command: :insert, + rows: rows + }} <- SQL.query(repo, insert_sql, parameters, timeout: timeout) do + derived_address_current_token_balances = + Enum.map(rows, fn [address_hash_bytes, token_contract_address_hash_bytes, block_number] -> + {:ok, address_hash} = Hash.Address.load(address_hash_bytes) + {:ok, token_contract_address_hash} = Hash.Address.load(token_contract_address_hash_bytes) + + %{ + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number + } + end) + + {:ok, derived_address_current_token_balances} + end + end + # `block_rewards` are linked to `blocks.hash`, but fetched by `blocks.number`, so when a block with the same number is # inserted, the old block rewards need to be deleted, so that the old and new rewards aren't combined. defp delete_rewards(repo, blocks_changes, %{timeout: timeout}) do diff --git a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs index cba86e999a..b04d59895d 100644 --- a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs @@ -5,34 +5,34 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do alias Ecto.Multi alias Explorer.Chain.Import.Runner.{Blocks, Transaction} - alias Explorer.Chain.{Block, Transaction} + alias Explorer.Chain.{Address, Block, Transaction} alias Explorer.Repo describe "run/1" do setup do block = insert(:block, consensus: true) - transaction = - :transaction - |> insert() - |> with_block(block) + timestamp = DateTime.utc_now() + options = %{timestamps: %{inserted_at: timestamp, updated_at: timestamp}} - %{consensus_block: block, transaction: transaction} + %{consensus_block: block, options: options} end test "derive_transaction_forks replaces hash on conflicting (uncle_hash, index)", %{ - consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, - transaction: transaction + consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number} = consensus_block, + options: options } do + transaction = + :transaction + |> insert() + |> with_block(consensus_block) + block_params = params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: false) %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params) changes_list = [block_changes] - timestamp = DateTime.utc_now() - options = %{timestamps: %{inserted_at: timestamp, updated_at: timestamp}} - assert Repo.aggregate(from(transaction in Transaction, where: is_nil(transaction.block_number)), :count, :hash) == 0 @@ -76,6 +76,139 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do assert Repo.one!(from(transaction_fork in Transaction.Fork, select: "ctid")) == ctid, "Tuple was written even though it is not distinct" end + + test "delete_address_current_token_balances deletes rows with matching block number when consensus is true", + %{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do + %Address.CurrentTokenBalance{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} = + insert(:address_current_token_balance, block_number: block_number) + + block_params = params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: true) + + %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params) + changes_list = [block_changes] + + assert count(Address.CurrentTokenBalance) == 1 + + assert {:ok, + %{ + delete_address_current_token_balances: [ + %{address_hash: ^address_hash, token_contract_address_hash: ^token_contract_address_hash} + ] + }} = + Multi.new() + |> Blocks.run(changes_list, options) + |> Repo.transaction() + + assert count(Address.CurrentTokenBalance) == 0 + end + + test "delete_address_current_token_balances does not delete rows with matching block number when consensus is false", + %{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do + %Address.CurrentTokenBalance{} = insert(:address_current_token_balance, block_number: block_number) + + block_params = + params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: false) + + %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params) + changes_list = [block_changes] + + count = 1 + + assert count(Address.CurrentTokenBalance) == count + + assert {:ok, + %{ + delete_address_current_token_balances: [] + }} = + Multi.new() + |> Blocks.run(changes_list, options) + |> Repo.transaction() + + assert count(Address.CurrentTokenBalance) == count + end + + test "derive_address_current_token_balances inserts rows if there is an address_token_balance left for the rows deleted by delete_address_current_token_balances", + %{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do + %Address.TokenBalance{ + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash, + value: previous_value, + block_number: previous_block_number + } = insert(:token_balance, block_number: block_number - 1) + + address = Repo.get(Address, address_hash) + + %Address.TokenBalance{ + address_hash: ^address_hash, + token_contract_address_hash: ^token_contract_address_hash, + value: current_value, + block_number: ^block_number + } = + insert(:token_balance, + address: address, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number + ) + + refute current_value == previous_value + + %Address.CurrentTokenBalance{ + address_hash: ^address_hash, + token_contract_address_hash: ^token_contract_address_hash, + block_number: ^block_number, + value: ^current_value + } = + insert(:address_current_token_balance, + address: address, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number, + value: current_value + ) + + block_params = params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: true) + + %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params) + changes_list = [block_changes] + + assert count(Address.TokenBalance) == 2 + assert count(Address.CurrentTokenBalance) == 1 + + assert {:ok, + %{ + delete_address_current_token_balances: [ + %{ + address_hash: ^address_hash, + token_contract_address_hash: ^token_contract_address_hash + } + ], + delete_address_token_balances: [ + %{ + address_hash: ^address_hash, + token_contract_address_hash: ^token_contract_address_hash, + block_number: ^block_number + } + ], + derive_address_current_token_balances: [ + %{ + address_hash: ^address_hash, + token_contract_address_hash: ^token_contract_address_hash, + block_number: ^previous_block_number + } + ] + }} = + Multi.new() + |> Blocks.run(changes_list, options) + |> Repo.transaction() + + assert count(Address.TokenBalance) == 1 + assert count(Address.CurrentTokenBalance) == 1 + + assert %Address.CurrentTokenBalance{block_number: ^previous_block_number, value: ^previous_value} = + Repo.get_by(Address.CurrentTokenBalance, + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash + ) + end end defp count(schema) do diff --git a/apps/explorer/test/explorer/chain/import_test.exs b/apps/explorer/test/explorer/chain/import_test.exs index eebee1a76a..93a981648a 100644 --- a/apps/explorer/test/explorer/chain/import_test.exs +++ b/apps/explorer/test/explorer/chain/import_test.exs @@ -1828,5 +1828,256 @@ defmodule Explorer.Chain.ImportTest do assert transaction_after.error == nil assert transaction_after.status == nil end + + test "address_token_balances and address_current_token_balances are deleted during reorgs" do + %Block{number: block_number} = insert(:block, consensus: true) + value_before = Decimal.new(1) + + %Address{hash: address_hash} = address = insert(:address) + + %Address.TokenBalance{ + address_hash: ^address_hash, + token_contract_address_hash: token_contract_address_hash, + block_number: ^block_number + } = insert(:token_balance, address: address, block_number: block_number, value: value_before) + + %Address.CurrentTokenBalance{ + address_hash: ^address_hash, + token_contract_address_hash: ^token_contract_address_hash, + block_number: ^block_number + } = + insert(:address_current_token_balance, + address: address, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number, + value: value_before + ) + + miner_hash_after = address_hash() + from_address_hash_after = address_hash() + block_hash_after = block_hash() + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_after}, + %{hash: from_address_hash_after} + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: 1, + gas_limit: 1, + gas_used: 1, + hash: block_hash_after, + miner_hash: miner_hash_after, + nonce: 1, + number: block_number, + parent_hash: block_hash(), + size: 1, + timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"), + total_difficulty: 1 + } + ] + } + }) + + assert is_nil( + Repo.get_by(Address.CurrentTokenBalance, + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash + ) + ) + + assert is_nil( + Repo.get_by(Address.TokenBalance, + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number + ) + ) + end + + test "address_current_token_balances is derived during reorgs" do + %Block{number: block_number} = insert(:block, consensus: true) + previous_block_number = block_number - 1 + + %Address.TokenBalance{ + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash, + value: previous_value, + block_number: previous_block_number + } = insert(:token_balance, block_number: previous_block_number) + + address = Repo.get(Address, address_hash) + + %Address.TokenBalance{ + address_hash: ^address_hash, + token_contract_address_hash: token_contract_address_hash, + value: current_value, + block_number: ^block_number + } = + insert(:token_balance, + address: address, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number + ) + + refute current_value == previous_value + + %Address.CurrentTokenBalance{ + address_hash: ^address_hash, + token_contract_address_hash: ^token_contract_address_hash, + block_number: ^block_number + } = + insert(:address_current_token_balance, + address: address, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number, + value: current_value + ) + + miner_hash_after = address_hash() + from_address_hash_after = address_hash() + block_hash_after = block_hash() + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: miner_hash_after}, + %{hash: from_address_hash_after} + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: 1, + gas_limit: 1, + gas_used: 1, + hash: block_hash_after, + miner_hash: miner_hash_after, + nonce: 1, + number: block_number, + parent_hash: block_hash(), + size: 1, + timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"), + total_difficulty: 1 + } + ] + } + }) + + assert %Address.CurrentTokenBalance{block_number: ^previous_block_number, value: ^previous_value} = + Repo.get_by(Address.CurrentTokenBalance, + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash + ) + + assert is_nil( + Repo.get_by(Address.TokenBalance, + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number + ) + ) + end + + test "address_token_balances and address_current_token_balances can be replaced during reorgs" do + %Block{number: block_number} = insert(:block, consensus: true) + value_before = Decimal.new(1) + + %Address{hash: address_hash} = address = insert(:address) + + %Address.TokenBalance{ + address_hash: ^address_hash, + token_contract_address_hash: token_contract_address_hash, + block_number: ^block_number + } = insert(:token_balance, address: address, block_number: block_number, value: value_before) + + %Address.CurrentTokenBalance{ + address_hash: ^address_hash, + token_contract_address_hash: ^token_contract_address_hash, + block_number: ^block_number + } = + insert(:address_current_token_balance, + address: address, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number, + value: value_before + ) + + miner_hash_after = address_hash() + from_address_hash_after = address_hash() + block_hash_after = block_hash() + value_after = Decimal.add(value_before, 1) + + assert {:ok, _} = + Import.all(%{ + addresses: %{ + params: [ + %{hash: address_hash}, + %{hash: token_contract_address_hash}, + %{hash: miner_hash_after}, + %{hash: from_address_hash_after} + ] + }, + address_token_balances: %{ + params: [ + %{ + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number, + value: value_after + } + ] + }, + address_current_token_balances: %{ + params: [ + %{ + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number, + value: value_after + } + ] + }, + blocks: %{ + params: [ + %{ + consensus: true, + difficulty: 1, + gas_limit: 1, + gas_used: 1, + hash: block_hash_after, + miner_hash: miner_hash_after, + nonce: 1, + number: block_number, + parent_hash: block_hash(), + size: 1, + timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"), + total_difficulty: 1 + } + ] + } + }) + + assert %Address.CurrentTokenBalance{value: ^value_after} = + Repo.get_by(Address.CurrentTokenBalance, + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash + ) + + assert %Address.TokenBalance{value: ^value_after} = + Repo.get_by(Address.TokenBalance, + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number + ) + end end end