@ -8,7 +8,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
import Ecto.Query , only : [ from : 2 ]
alias Ecto . { Multi , Repo }
alias Explorer.Chain . { Data , Hash , Import , Transaction }
alias Explorer.Chain . { Block , Data , Hash , Import , Transaction }
alias Explorer.Chain.Import.Runner.TokenTransfers
@behaviour Import.Runner
@ -42,9 +42,13 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
|> Map . put ( :timestamps , timestamps )
|> Map . put ( :token_transfer_transaction_hash_set , token_transfer_transaction_hash_set ( options ) )
Multi . run ( multi , :transactions , fn repo , _ ->
multi
|> Multi . run ( :transactions , fn repo , _ ->
insert ( repo , changes_list , insert_options )
end )
|> Multi . run ( :recollated_transactions , fn repo , %{ transactions : transactions } ->
discard_blocks_for_recollated_transactions ( repo , transactions , insert_options )
end )
end
@impl Import.Runner
@ -87,7 +91,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
on_conflict : on_conflict ,
for : Transaction ,
returning :
~w( block_number index hash internal_transactions_indexed_at block_hash nonce from_address_hash created_contract_address_hash )a ,
~w( block_number index hash internal_transactions_indexed_at block_hash old_block_hash nonce from_address_hash created_contract_address_hash )a ,
timeout : timeout ,
timestamps : timestamps
)
@ -99,6 +103,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
update : [
set : [
block_hash : fragment ( " EXCLUDED.block_hash " ) ,
old_block_hash : transaction . block_hash ,
block_number : fragment ( " EXCLUDED.block_number " ) ,
created_contract_address_hash : fragment ( " EXCLUDED.created_contract_address_hash " ) ,
cumulative_gas_used : fragment ( " EXCLUDED.cumulative_gas_used " ) ,
@ -179,4 +184,43 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
end
defp put_internal_transactions_indexed_at? ( _ , _ ) , do : false
defp discard_blocks_for_recollated_transactions ( repo , transactions , %{
timeout : timeout ,
timestamps : %{ updated_at : updated_at }
} )
when is_list ( transactions ) do
ordered_block_hashes =
transactions
|> Enum . filter ( fn %{ block_hash : block_hash , old_block_hash : old_block_hash } ->
not is_nil ( old_block_hash ) and block_hash != old_block_hash
end )
|> MapSet . new ( & &1 . old_block_hash )
|> Enum . sort ( )
if Enum . empty? ( ordered_block_hashes ) do
{ :ok , [ ] }
else
query =
from (
block in Block ,
where : block . hash in ^ ordered_block_hashes ,
update : [
set : [
consensus : false ,
updated_at : ^ updated_at
]
]
)
try do
{ _ , result } = repo . update_all ( query , [ ] , timeout : timeout )
{ :ok , result }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , block_hashes : ordered_block_hashes } }
end
end
end
end