|
|
|
@ -46,32 +46,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
|
|
|
|
|
hashes = Enum.map(changes_list, & &1.hash) |
|
|
|
|
consensus_block_numbers = consensus_block_numbers(changes_list) |
|
|
|
|
where_invalid_neighbour = where_invalid_neighbour(changes_list) |
|
|
|
|
|
|
|
|
|
# Enforce ShareLocks tables order (see docs: sharelocks.md) |
|
|
|
|
multi |
|
|
|
|
|> Multi.run(:acquire_blocks, fn repo, _ -> |
|
|
|
|
acquire_blocks(repo, hashes, consensus_block_numbers, where_invalid_neighbour) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:lose_consensus, fn repo, _ -> |
|
|
|
|
lose_consensus(repo, consensus_block_numbers, insert_options) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:lose_invalid_neighbour_consensus, fn repo, _ -> |
|
|
|
|
lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, insert_options) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:nonconsensus_block_numbers, fn _repo, |
|
|
|
|
%{ |
|
|
|
|
lose_consensus: lost_consensus_blocks, |
|
|
|
|
lose_invalid_neighbour_consensus: lost_consensus_neighbours |
|
|
|
|
} -> |
|
|
|
|
nonconsensus_block_numbers = |
|
|
|
|
(lost_consensus_blocks ++ lost_consensus_neighbours) |
|
|
|
|
|> Enum.sort() |
|
|
|
|
|> Enum.dedup() |
|
|
|
|
|
|
|
|
|
{:ok, nonconsensus_block_numbers} |
|
|
|
|
lose_consensus(repo, hashes, consensus_block_numbers, changes_list, insert_options) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:blocks, fn repo, _ -> |
|
|
|
|
# Note, needs to be executed after `lose_consensus` for lock acquisition |
|
|
|
|
insert(repo, changes_list, insert_options) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:uncle_fetched_block_second_degree_relations, fn repo, %{blocks: blocks} when is_list(blocks) -> |
|
|
|
@ -101,27 +83,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
transactions: transactions |
|
|
|
|
}) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:remove_nonconsensus_logs, fn repo, |
|
|
|
|
%{ |
|
|
|
|
nonconsensus_block_numbers: nonconsensus_block_numbers, |
|
|
|
|
fork_transactions: transactions |
|
|
|
|
} -> |
|
|
|
|
remove_nonconsensus_logs(repo, nonconsensus_block_numbers, transactions, insert_options) |
|
|
|
|
|> Multi.run(:remove_nonconsensus_logs, fn repo, %{derive_transaction_forks: transactions} -> |
|
|
|
|
remove_nonconsensus_logs(repo, transactions, insert_options) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:acquire_internal_transactions, fn repo, |
|
|
|
|
%{ |
|
|
|
|
nonconsensus_block_numbers: nonconsensus_block_numbers, |
|
|
|
|
fork_transactions: transactions |
|
|
|
|
} -> |
|
|
|
|
acquire_internal_transactions(repo, nonconsensus_block_numbers, hashes, transactions) |
|
|
|
|
|> Multi.run(:acquire_internal_transactions, fn repo, %{derive_transaction_forks: transactions} -> |
|
|
|
|
acquire_internal_transactions(repo, hashes, transactions) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:remove_nonconsensus_internal_transactions, fn repo, |
|
|
|
|
%{ |
|
|
|
|
nonconsensus_block_numbers: |
|
|
|
|
nonconsensus_block_numbers, |
|
|
|
|
fork_transactions: transactions |
|
|
|
|
} -> |
|
|
|
|
remove_nonconsensus_internal_transactions(repo, nonconsensus_block_numbers, transactions, insert_options) |
|
|
|
|
|> Multi.run(:remove_nonconsensus_internal_transactions, fn repo, %{derive_transaction_forks: transactions} -> |
|
|
|
|
remove_nonconsensus_internal_transactions(repo, transactions, insert_options) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:internal_transaction_transaction_block_number, fn repo, _ -> |
|
|
|
|
update_internal_transaction_block_number(repo, hashes) |
|
|
|
@ -129,9 +98,8 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
|> Multi.run(:acquire_contract_address_tokens, fn repo, _ -> |
|
|
|
|
acquire_contract_address_tokens(repo, consensus_block_numbers) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:remove_nonconsensus_token_transfers, fn repo, |
|
|
|
|
%{nonconsensus_block_numbers: nonconsensus_block_numbers} -> |
|
|
|
|
remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, insert_options) |
|
|
|
|
|> Multi.run(:remove_nonconsensus_token_transfers, fn repo, %{derive_transaction_forks: transactions} -> |
|
|
|
|
remove_nonconsensus_token_transfers(repo, transactions, insert_options) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:delete_address_token_balances, fn repo, _ -> |
|
|
|
|
delete_address_token_balances(repo, consensus_block_numbers, insert_options) |
|
|
|
@ -159,22 +127,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
@impl Runner |
|
|
|
|
def timeout, do: @timeout |
|
|
|
|
|
|
|
|
|
defp acquire_blocks(repo, hashes, consensus_block_numbers, where_invalid_neighbour) do |
|
|
|
|
query = |
|
|
|
|
from( |
|
|
|
|
block in where_invalid_neighbour, |
|
|
|
|
or_where: block.number in ^consensus_block_numbers, |
|
|
|
|
or_where: block.hash in ^hashes, |
|
|
|
|
select: block.hash, |
|
|
|
|
# Enforce Block ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
order_by: [asc: block.hash], |
|
|
|
|
lock: "FOR UPDATE" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
blocks = repo.all(query) |
|
|
|
|
{:ok, blocks} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp acquire_contract_address_tokens(repo, consensus_block_numbers) do |
|
|
|
|
query = |
|
|
|
|
from(address_current_token_balance in Address.CurrentTokenBalance, |
|
|
|
@ -187,15 +139,12 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
Tokens.acquire_contract_address_tokens(repo, contract_address_hashes) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp acquire_internal_transactions(repo, nonconsensus_block_numbers, hashes, forked_transactions) do |
|
|
|
|
forked_transaction_hashes = Enum.map(forked_transactions, & &1.hash) |
|
|
|
|
|
|
|
|
|
defp acquire_internal_transactions(repo, hashes, forked_transaction_hashes) do |
|
|
|
|
query = |
|
|
|
|
from(internal_transaction in InternalTransaction, |
|
|
|
|
join: transaction in Transaction, |
|
|
|
|
on: internal_transaction.transaction_hash == transaction.hash, |
|
|
|
|
where: transaction.block_number in ^nonconsensus_block_numbers, |
|
|
|
|
or_where: transaction.block_hash in ^hashes, |
|
|
|
|
where: transaction.block_hash in ^hashes, |
|
|
|
|
or_where: transaction.hash in ^forked_transaction_hashes, |
|
|
|
|
select: {internal_transaction.transaction_hash, internal_transaction.index}, |
|
|
|
|
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) |
|
|
|
@ -229,14 +178,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
lock: "FOR UPDATE" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
transactions = repo.all(query) |
|
|
|
|
|
|
|
|
|
hashes = Enum.map(transactions, & &1.hash) |
|
|
|
|
|
|
|
|
|
update_query = |
|
|
|
|
from( |
|
|
|
|
t in Transaction, |
|
|
|
|
where: t.hash in ^hashes, |
|
|
|
|
join: s in subquery(query), |
|
|
|
|
on: t.hash == s.hash, |
|
|
|
|
update: [ |
|
|
|
|
set: [ |
|
|
|
|
block_hash: nil, |
|
|
|
@ -250,17 +196,19 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
updated_at: ^updated_at |
|
|
|
|
] |
|
|
|
|
], |
|
|
|
|
select: t.hash |
|
|
|
|
select: %{ |
|
|
|
|
block_hash: s.block_hash, |
|
|
|
|
index: s.index, |
|
|
|
|
hash: s.hash |
|
|
|
|
} |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
try do |
|
|
|
|
{_num, _res} = repo.update_all(update_query, [], timeout: timeout) |
|
|
|
|
{_num, transactions} = repo.update_all(update_query, [], timeout: timeout) |
|
|
|
|
|
|
|
|
|
{:ok, transactions} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error}} |
|
|
|
|
end |
|
|
|
|
{:ok, transactions} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error}} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp derive_transaction_forks(%{ |
|
|
|
@ -283,7 +231,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
# Enforce Fork ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
|> Enum.sort_by(&{&1.uncle_hash, &1.index}) |
|
|
|
|
|
|
|
|
|
{_total, result} = |
|
|
|
|
{_total, forked_transaction} = |
|
|
|
|
repo.insert_all( |
|
|
|
|
Transaction.Fork, |
|
|
|
|
transaction_forks, |
|
|
|
@ -294,11 +242,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
update: [set: [hash: fragment("EXCLUDED.hash")]], |
|
|
|
|
where: fragment("EXCLUDED.hash <> ?", transaction_fork.hash) |
|
|
|
|
), |
|
|
|
|
returning: [:uncle_hash, :hash], |
|
|
|
|
returning: [:hash], |
|
|
|
|
timeout: timeout |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
{:ok, result} |
|
|
|
|
{:ok, Enum.map(forked_transaction, & &1.hash)} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
@spec insert(Repo.t(), [map()], %{ |
|
|
|
@ -364,47 +312,48 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
|> Enum.map(& &1.number) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp lose_consensus(_, [], _), do: {:ok, []} |
|
|
|
|
|
|
|
|
|
defp lose_consensus(repo, consensus_block_number, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) |
|
|
|
|
when is_list(consensus_block_number) do |
|
|
|
|
# ShareLocks order already enforced by `acquire_blocks` (see docs: sharelocks.md) |
|
|
|
|
{_, result} = |
|
|
|
|
repo.update_all( |
|
|
|
|
from(block in Block, where: block.number in ^consensus_block_number, select: block.number), |
|
|
|
|
[set: [consensus: false, updated_at: updated_at]], |
|
|
|
|
timeout: timeout |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
{:ok, result} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, consensus_block_numbers: consensus_block_number}} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, %{ |
|
|
|
|
defp lose_consensus(repo, hashes, consensus_block_numbers, changes_list, %{ |
|
|
|
|
timeout: timeout, |
|
|
|
|
timestamps: %{updated_at: updated_at} |
|
|
|
|
}) do |
|
|
|
|
# ShareLocks order already enforced by `acquire_blocks` (see docs: sharelocks.md) |
|
|
|
|
{_, result} = |
|
|
|
|
acquire_query = |
|
|
|
|
from( |
|
|
|
|
block in where_invalid_neighbour(changes_list), |
|
|
|
|
or_where: block.number in ^consensus_block_numbers, |
|
|
|
|
# we also need to acquire blocks that will be upserted here, for ordering |
|
|
|
|
or_where: block.hash in ^hashes, |
|
|
|
|
select: block.hash, |
|
|
|
|
# Enforce Block ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
order_by: [asc: block.hash], |
|
|
|
|
lock: "FOR UPDATE" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
{_, removed_consensus_block_hashes} = |
|
|
|
|
repo.update_all( |
|
|
|
|
from(block in where_invalid_neighbour, select: block.number), |
|
|
|
|
from( |
|
|
|
|
block in Block, |
|
|
|
|
join: s in subquery(acquire_query), |
|
|
|
|
on: block.hash == s.hash, |
|
|
|
|
# we don't want to remove consensus from blocks that will be upserted |
|
|
|
|
where: block.hash not in ^hashes, |
|
|
|
|
select: block.hash |
|
|
|
|
), |
|
|
|
|
[set: [consensus: false, updated_at: updated_at]], |
|
|
|
|
timeout: timeout |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
{:ok, result} |
|
|
|
|
{:ok, removed_consensus_block_hashes} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, where_invalid_neighbour: where_invalid_neighbour}} |
|
|
|
|
{:error, %{exception: postgrex_error, consensus_block_numbers: consensus_block_numbers}} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, %{timeout: timeout}) do |
|
|
|
|
defp remove_nonconsensus_token_transfers(repo, forked_transaction_hashes, %{timeout: timeout}) do |
|
|
|
|
ordered_token_transfers = |
|
|
|
|
from(token_transfer in TokenTransfer, |
|
|
|
|
where: token_transfer.block_number in ^nonconsensus_block_numbers, |
|
|
|
|
select: map(token_transfer, [:transaction_hash, :log_index]), |
|
|
|
|
from( |
|
|
|
|
token_transfer in TokenTransfer, |
|
|
|
|
where: token_transfer.transaction_hash in ^forked_transaction_hashes, |
|
|
|
|
select: token_transfer.transaction_hash, |
|
|
|
|
# Enforce TokenTransfer ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
order_by: [ |
|
|
|
|
token_transfer.transaction_hash, |
|
|
|
@ -417,91 +366,60 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |
|
|
|
|
from(token_transfer in TokenTransfer, |
|
|
|
|
select: map(token_transfer, [:transaction_hash, :log_index]), |
|
|
|
|
inner_join: ordered_token_transfer in subquery(ordered_token_transfers), |
|
|
|
|
on: |
|
|
|
|
ordered_token_transfer.transaction_hash == |
|
|
|
|
token_transfer.transaction_hash and |
|
|
|
|
ordered_token_transfer.log_index == token_transfer.log_index |
|
|
|
|
on: ordered_token_transfer.transaction_hash == token_transfer.transaction_hash |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
try do |
|
|
|
|
{_count, deleted_token_transfers} = repo.delete_all(query, timeout: timeout) |
|
|
|
|
{_count, deleted_token_transfers} = repo.delete_all(query, timeout: timeout) |
|
|
|
|
|
|
|
|
|
{:ok, deleted_token_transfers} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}} |
|
|
|
|
end |
|
|
|
|
{:ok, deleted_token_transfers} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, transactions: forked_transaction_hashes}} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp remove_nonconsensus_internal_transactions(repo, nonconsensus_block_numbers, forked_transactions, %{ |
|
|
|
|
timeout: timeout |
|
|
|
|
}) do |
|
|
|
|
forked_transaction_hashes = Enum.map(forked_transactions, & &1.hash) |
|
|
|
|
|
|
|
|
|
transaction_query = |
|
|
|
|
from(transaction in Transaction, |
|
|
|
|
where: transaction.block_number in ^nonconsensus_block_numbers, |
|
|
|
|
or_where: transaction.hash in ^forked_transaction_hashes, |
|
|
|
|
select: map(transaction, [:hash]) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
defp remove_nonconsensus_internal_transactions(repo, forked_transaction_hashes, %{timeout: timeout}) do |
|
|
|
|
query = |
|
|
|
|
from(internal_transaction in InternalTransaction, |
|
|
|
|
inner_join: transaction in subquery(transaction_query), |
|
|
|
|
on: internal_transaction.transaction_hash == transaction.hash, |
|
|
|
|
where: internal_transaction.transaction_hash in ^forked_transaction_hashes, |
|
|
|
|
select: map(internal_transaction, [:transaction_hash, :index]) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
try do |
|
|
|
|
# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md) |
|
|
|
|
{_count, deleted_internal_transactions} = repo.delete_all(query, timeout: timeout) |
|
|
|
|
# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md) |
|
|
|
|
{_count, deleted_internal_transactions} = repo.delete_all(query, timeout: timeout) |
|
|
|
|
|
|
|
|
|
{:ok, deleted_internal_transactions} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}} |
|
|
|
|
end |
|
|
|
|
{:ok, deleted_internal_transactions} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, transactions: forked_transaction_hashes}} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp remove_nonconsensus_logs(repo, nonconsensus_block_numbers, forked_transactions, %{timeout: timeout}) do |
|
|
|
|
forked_transaction_hashes = Enum.map(forked_transactions, & &1.hash) |
|
|
|
|
|
|
|
|
|
transaction_query = |
|
|
|
|
from(transaction in Transaction, |
|
|
|
|
where: transaction.block_number in ^nonconsensus_block_numbers, |
|
|
|
|
or_where: transaction.hash in ^forked_transaction_hashes, |
|
|
|
|
select: map(transaction, [:hash]), |
|
|
|
|
order_by: transaction.hash |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
defp remove_nonconsensus_logs(repo, forked_transaction_hashes, %{timeout: timeout}) do |
|
|
|
|
ordered_logs = |
|
|
|
|
from(log in Log, |
|
|
|
|
inner_join: transaction in subquery(transaction_query), |
|
|
|
|
on: log.transaction_hash == transaction.hash, |
|
|
|
|
select: map(log, [:transaction_hash, :index]), |
|
|
|
|
from( |
|
|
|
|
log in Log, |
|
|
|
|
where: log.transaction_hash in ^forked_transaction_hashes, |
|
|
|
|
select: log.transaction_hash, |
|
|
|
|
# Enforce Log ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
order_by: [ |
|
|
|
|
log.transaction_hash, |
|
|
|
|
log.index |
|
|
|
|
], |
|
|
|
|
lock: "FOR UPDATE OF l0" |
|
|
|
|
lock: "FOR UPDATE" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
query = |
|
|
|
|
from(log in Log, |
|
|
|
|
select: map(log, [:transaction_hash, :index]), |
|
|
|
|
inner_join: ordered_log in subquery(ordered_logs), |
|
|
|
|
on: ordered_log.transaction_hash == log.transaction_hash and ordered_log.index == log.index |
|
|
|
|
on: ordered_log.transaction_hash == log.transaction_hash |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
try do |
|
|
|
|
{_count, deleted_logs} = repo.delete_all(query, timeout: timeout) |
|
|
|
|
{_count, deleted_logs} = repo.delete_all(query, timeout: timeout) |
|
|
|
|
|
|
|
|
|
{:ok, deleted_logs} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}} |
|
|
|
|
end |
|
|
|
|
{:ok, deleted_logs} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, transactions: forked_transaction_hashes}} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp delete_address_token_balances(_, [], _), do: {:ok, []} |
|
|
|
|