|
|
|
@ -149,25 +149,25 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
:derive_transaction_forks |
|
|
|
|
) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:acquire_contract_address_tokens, fn repo, _ -> |
|
|
|
|
|> Multi.run(:acquire_contract_address_tokens, fn repo, %{lose_consensus: non_consensus_blocks} -> |
|
|
|
|
Instrumenter.block_import_stage_runner( |
|
|
|
|
fn -> acquire_contract_address_tokens(repo, consensus_block_numbers) end, |
|
|
|
|
fn -> acquire_contract_address_tokens(repo, non_consensus_blocks) end, |
|
|
|
|
:address_referencing, |
|
|
|
|
:blocks, |
|
|
|
|
:acquire_contract_address_tokens |
|
|
|
|
) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:delete_address_token_balances, fn repo, _ -> |
|
|
|
|
|> Multi.run(:delete_address_token_balances, fn repo, %{lose_consensus: non_consensus_blocks} -> |
|
|
|
|
Instrumenter.block_import_stage_runner( |
|
|
|
|
fn -> delete_address_token_balances(repo, consensus_block_numbers, insert_options) end, |
|
|
|
|
fn -> delete_address_token_balances(repo, non_consensus_blocks, insert_options) end, |
|
|
|
|
:address_referencing, |
|
|
|
|
:blocks, |
|
|
|
|
:delete_address_token_balances |
|
|
|
|
) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:delete_address_current_token_balances, fn repo, _ -> |
|
|
|
|
|> Multi.run(:delete_address_current_token_balances, fn repo, %{lose_consensus: non_consensus_blocks} -> |
|
|
|
|
Instrumenter.block_import_stage_runner( |
|
|
|
|
fn -> delete_address_current_token_balances(repo, consensus_block_numbers, insert_options) end, |
|
|
|
|
fn -> delete_address_current_token_balances(repo, non_consensus_blocks, insert_options) end, |
|
|
|
|
:address_referencing, |
|
|
|
|
:blocks, |
|
|
|
|
:delete_address_current_token_balances |
|
|
|
@ -205,10 +205,12 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
@impl Runner |
|
|
|
|
def timeout, do: @timeout |
|
|
|
|
|
|
|
|
|
defp acquire_contract_address_tokens(repo, consensus_block_numbers) do |
|
|
|
|
defp acquire_contract_address_tokens(repo, non_consensus_blocks) do |
|
|
|
|
non_consensus_block_numbers = Enum.map(non_consensus_blocks, fn {number, _hash} -> number end) |
|
|
|
|
|
|
|
|
|
query = |
|
|
|
|
from(ctb in Address.CurrentTokenBalance, |
|
|
|
|
where: ctb.block_number in ^consensus_block_numbers, |
|
|
|
|
where: ctb.block_number in ^non_consensus_block_numbers, |
|
|
|
|
select: {ctb.token_contract_address_hash, ctb.token_id}, |
|
|
|
|
distinct: [ctb.token_contract_address_hash, ctb.token_id] |
|
|
|
|
) |
|
|
|
@ -447,10 +449,12 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
|
|
|
|
|
defp delete_address_token_balances(_, [], _), do: {:ok, []} |
|
|
|
|
|
|
|
|
|
defp delete_address_token_balances(repo, consensus_block_numbers, %{timeout: timeout}) do |
|
|
|
|
defp delete_address_token_balances(repo, non_consensus_blocks, %{timeout: timeout}) do |
|
|
|
|
non_consensus_block_numbers = Enum.map(non_consensus_blocks, fn {number, _hash} -> number end) |
|
|
|
|
|
|
|
|
|
ordered_query = |
|
|
|
|
from(tb in Address.TokenBalance, |
|
|
|
|
where: tb.block_number in ^consensus_block_numbers, |
|
|
|
|
where: tb.block_number in ^non_consensus_block_numbers, |
|
|
|
|
select: map(tb, [:address_hash, :token_contract_address_hash, :token_id, :block_number]), |
|
|
|
|
# Enforce TokenBalance ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
order_by: [ |
|
|
|
@ -482,16 +486,18 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
{:ok, deleted_address_token_balances} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, block_numbers: consensus_block_numbers}} |
|
|
|
|
{:error, %{exception: postgrex_error, block_numbers: non_consensus_block_numbers}} |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp delete_address_current_token_balances(_, [], _), do: {:ok, []} |
|
|
|
|
|
|
|
|
|
defp delete_address_current_token_balances(repo, consensus_block_numbers, %{timeout: timeout}) do |
|
|
|
|
defp delete_address_current_token_balances(repo, non_consensus_blocks, %{timeout: timeout}) do |
|
|
|
|
non_consensus_block_numbers = Enum.map(non_consensus_blocks, fn {number, _hash} -> number end) |
|
|
|
|
|
|
|
|
|
ordered_query = |
|
|
|
|
from(ctb in Address.CurrentTokenBalance, |
|
|
|
|
where: ctb.block_number in ^consensus_block_numbers, |
|
|
|
|
where: ctb.block_number in ^non_consensus_block_numbers, |
|
|
|
|
select: map(ctb, [:address_hash, :token_contract_address_hash, :token_id]), |
|
|
|
|
# Enforce CurrentTokenBalance ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
order_by: [ |
|
|
|
@ -529,7 +535,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
{:ok, deleted_address_current_token_balances} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, block_numbers: consensus_block_numbers}} |
|
|
|
|
{:error, %{exception: postgrex_error, block_numbers: non_consensus_block_numbers}} |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|