diff --git a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex index 9371571532..6036354e8c 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -5,11 +5,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do require Ecto.Query - import Ecto.Query, only: [from: 2, select: 2, update: 2] + import Ecto.Query, only: [from: 2, select: 2, subquery: 1, update: 2] alias Ecto.Adapters.SQL alias Ecto.{Changeset, Multi, Repo} - alias Explorer.Chain.{Block, Import, InternalTransaction, Transaction} + alias Explorer.Chain.{Address, Block, Hash, Import, InternalTransaction, Transaction} alias Explorer.Chain.Block.Reward alias Explorer.Chain.Import.Runner @@ -43,6 +43,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |> Map.put_new(:timeout, @timeout) |> Map.put(:timestamps, timestamps) + ordered_consensus_block_numbers = ordered_consensus_block_numbers(changes_list) where_forked = where_forked(changes_list) multi @@ -64,9 +65,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do }) end) |> Multi.run(:lose_consenus, fn repo, _ -> - lose_consensus(repo, changes_list, insert_options) + lose_consensus(repo, ordered_consensus_block_numbers, insert_options) end) - |> Multi.run(:remove_rewards, fn repo, _ -> + |> Multi.run(:delete_address_token_balances, fn repo, _ -> + delete_address_token_balances(repo, ordered_consensus_block_numbers, insert_options) + end) + |> Multi.run(:delete_address_current_token_balances, fn repo, _ -> + delete_address_current_token_balances(repo, ordered_consensus_block_numbers, insert_options) + end) + |> Multi.run(:derive_address_current_token_balances, fn repo, + %{ + delete_address_current_token_balances: + deleted_address_current_token_balances + } -> + derive_address_current_token_balances(repo, deleted_address_current_token_balances, insert_options) + end) + |> Multi.run(:delete_rewards, fn repo, _ -> delete_rewards(repo, changes_list, insert_options) end) |> Multi.run(:blocks, fn repo, _ -> @@ -247,19 +261,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do ) end - defp lose_consensus(repo, blocks_changes, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) - when is_list(blocks_changes) do - ordered_consensus_block_number = - blocks_changes - |> Enum.reduce(MapSet.new(), fn - %{consensus: true, number: number}, acc -> - MapSet.put(acc, number) + defp ordered_consensus_block_numbers(blocks_changes) when is_list(blocks_changes) do + blocks_changes + |> Enum.reduce(MapSet.new(), fn + %{consensus: true, number: number}, acc -> + MapSet.put(acc, number) - %{consensus: false}, acc -> - acc - end) - |> Enum.sort() + %{consensus: false}, acc -> + acc + end) + |> Enum.sort() + end + defp lose_consensus(_, [], _), do: {:ok, []} + + defp lose_consensus(repo, ordered_consensus_block_number, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) + when is_list(ordered_consensus_block_number) do query = from( block in Block, @@ -283,6 +300,162 @@ defmodule Explorer.Chain.Import.Runner.Blocks do end end + defp delete_address_token_balances(_, [], _), do: {:ok, []} + + defp delete_address_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do + ordered_query = + from(address_token_balance in Address.TokenBalance, + where: address_token_balance.block_number in ^ordered_consensus_block_numbers, + select: map(address_token_balance, [:address_hash, :token_contract_address_hash, :block_number]), + # MUST match order in `Explorer.Chain.Import.Runner.Address.TokenBalances.insert` to prevent ShareLock ordering deadlocks. + order_by: [ + address_token_balance.address_hash, + address_token_balance.token_contract_address_hash, + address_token_balance.block_number + ], + # ensures rows remains locked while outer query is joining to it + lock: "FOR UPDATE" + ) + + query = + from(address_token_balance in Address.TokenBalance, + select: map(address_token_balance, [:address_hash, :token_contract_address_hash, :block_number]), + inner_join: ordered_address_token_balance in subquery(ordered_query), + on: + ordered_address_token_balance.address_hash == address_token_balance.address_hash and + ordered_address_token_balance.token_contract_address_hash == + address_token_balance.token_contract_address_hash and + ordered_address_token_balance.block_number == address_token_balance.block_number + ) + + try do + {_count, deleted_address_token_balances} = repo.delete_all(query, timeout: timeout) + + {:ok, deleted_address_token_balances} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, block_numbers: ordered_consensus_block_numbers}} + end + end + + defp delete_address_current_token_balances(_, [], _), do: {:ok, []} + + defp delete_address_current_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do + ordered_query = + from(address_current_token_balance in Address.CurrentTokenBalance, + where: address_current_token_balance.block_number in ^ordered_consensus_block_numbers, + select: map(address_current_token_balance, [:address_hash, :token_contract_address_hash]), + # MUST match order in `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert` to prevent ShareLock ordering deadlocks. + order_by: [ + address_current_token_balance.address_hash, + address_current_token_balance.token_contract_address_hash + ], + # ensures row remains locked while outer query is joining to it + lock: "FOR UPDATE" + ) + + query = + from(address_current_token_balance in Address.CurrentTokenBalance, + select: map(address_current_token_balance, [:address_hash, :token_contract_address_hash]), + inner_join: ordered_address_current_token_balance in subquery(ordered_query), + on: + ordered_address_current_token_balance.address_hash == address_current_token_balance.address_hash and + ordered_address_current_token_balance.token_contract_address_hash == + address_current_token_balance.token_contract_address_hash + ) + + try do + {_count, deleted_address_current_token_balances} = repo.delete_all(query, timeout: timeout) + + {:ok, deleted_address_current_token_balances} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, block_numbers: ordered_consensus_block_numbers}} + end + end + + defp derive_address_current_token_balances(_, [], _), do: {:ok, []} + + # sobelow_skip ["SQL.Query"] + defp derive_address_current_token_balances(repo, deleted_address_current_token_balances, %{timeout: timeout}) + when is_list(deleted_address_current_token_balances) do + initial_query = + from(address_token_balance in Address.TokenBalance, + select: %{ + address_hash: address_token_balance.address_hash, + token_contract_address_hash: address_token_balance.token_contract_address_hash, + block_number: max(address_token_balance.block_number) + }, + group_by: [address_token_balance.address_hash, address_token_balance.token_contract_address_hash] + ) + + final_query = + Enum.reduce(deleted_address_current_token_balances, initial_query, fn %{ + address_hash: address_hash, + token_contract_address_hash: + token_contract_address_hash + }, + acc_query -> + from(address_token_balance in acc_query, + or_where: + address_token_balance.address_hash == ^address_hash and + address_token_balance.token_contract_address_hash == ^token_contract_address_hash + ) + end) + + new_current_token_balance_query = + from(new_current_token_balance in subquery(final_query), + inner_join: address_token_balance in Address.TokenBalance, + on: + address_token_balance.address_hash == new_current_token_balance.address_hash and + address_token_balance.token_contract_address_hash == new_current_token_balance.token_contract_address_hash and + address_token_balance.block_number == new_current_token_balance.block_number, + select: { + new_current_token_balance.address_hash, + new_current_token_balance.token_contract_address_hash, + new_current_token_balance.block_number, + address_token_balance.value, + over(min(address_token_balance.inserted_at), :w), + over(max(address_token_balance.updated_at), :w) + }, + # Prevent ShareLock deadlock by matching order of `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert` + order_by: [new_current_token_balance.address_hash, new_current_token_balance.token_contract_address_hash], + windows: [ + w: [partition_by: [address_token_balance.address_hash, address_token_balance.token_contract_address_hash]] + ] + ) + + {select_sql, parameters} = SQL.to_sql(:all, repo, new_current_token_balance_query) + + # No `ON CONFLICT` because `delete_address_current_token_balances` should have removed any conflicts. + insert_sql = """ + INSERT INTO address_current_token_balances (address_hash, token_contract_address_hash, block_number, value, inserted_at, updated_at) + #{select_sql} + RETURNING address_hash, token_contract_address_hash, block_number + """ + + with {:ok, + %Postgrex.Result{ + columns: ["address_hash", "token_contract_address_hash", "block_number"], + command: :insert, + rows: rows + }} <- SQL.query(repo, insert_sql, parameters, timeout: timeout) do + derived_address_current_token_balances = + Enum.map(rows, fn [address_hash_bytes, token_contract_address_hash_bytes, block_number] -> + {:ok, address_hash} = Hash.Address.load(address_hash_bytes) + {:ok, token_contract_address_hash} = Hash.Address.load(token_contract_address_hash_bytes) + + %{ + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash, + block_number: block_number + } + end) + + {:ok, derived_address_current_token_balances} + end + end + # `block_rewards` are linked to `blocks.hash`, but fetched by `blocks.number`, so when a block with the same number is # inserted, the old block rewards need to be deleted, so that the old and new rewards aren't combined. defp delete_rewards(repo, blocks_changes, %{timeout: timeout}) do