@ -9,7 +9,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
alias Ecto.Adapters.SQL
alias Ecto.Adapters.SQL
alias Ecto . { Changeset , Multi , Repo }
alias Ecto . { Changeset , Multi , Repo }
alias Explorer.Chain . { Address , Block , Hash , Import , InternalTransaction , Transaction , TokenTransfer }
alias Explorer.Chain . { Address , Block , Hash , Import , InternalTransaction , Log , TokenTransfer , Transaction }
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import.Runner
alias Explorer.Chain.Import.Runner
alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances
alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances
@ -58,15 +58,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
where_forked : where_forked
where_forked : where_forked
} )
} )
end )
end )
# MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table
|> Multi . run ( :fork_transactions , fn repo , _ ->
fork_transactions ( %{
repo : repo ,
timeout : options [ Runner.Transactions . option_key ( ) ] [ :timeout ] || Runner.Transactions . timeout ( ) ,
timestamps : timestamps ,
where_forked : where_forked
} )
end )
|> Multi . run ( :lose_consensus , fn repo , _ ->
|> Multi . run ( :lose_consensus , fn repo , _ ->
lose_consensus ( repo , ordered_consensus_block_numbers , insert_options )
lose_consensus ( repo , ordered_consensus_block_numbers , insert_options )
end )
end )
@ -86,7 +77,20 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end )
end )
|> Enum . sort ( )
|> Enum . sort ( )
remove_nonconsensus_data ( repo , nonconsensus_block_numbers , insert_options )
remove_nonconsensus_data (
repo ,
nonconsensus_block_numbers ,
insert_options
)
end )
# MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table
|> Multi . run ( :fork_transactions , fn repo , _ ->
fork_transactions ( %{
repo : repo ,
timeout : options [ Runner.Transactions . option_key ( ) ] [ :timeout ] || Runner.Transactions . timeout ( ) ,
timestamps : timestamps ,
where_forked : where_forked
} )
end )
end )
|> Multi . run ( :delete_address_token_balances , fn repo , _ ->
|> Multi . run ( :delete_address_token_balances , fn repo , _ ->
delete_address_token_balances ( repo , ordered_consensus_block_numbers , insert_options )
delete_address_token_balances ( repo , ordered_consensus_block_numbers , insert_options )
@ -357,10 +361,15 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end
end
end
end
defp remove_nonconsensus_data ( repo , nonconsensus_block_numbers , insert_options ) do
defp remove_nonconsensus_data (
repo ,
nonconsensus_block_numbers ,
insert_options
) do
with { :ok , deleted_token_transfers } <-
with { :ok , deleted_token_transfers } <-
remove_nonconsensus_token_transfers ( repo , nonconsensus_block_numbers , insert_options ) do
remove_nonconsensus_token_transfers ( repo , nonconsensus_block_numbers , insert_options ) ,
{ :ok , %{ token_transfers : deleted_token_transfers } }
{ :ok , deleted_logs } <- remove_nonconsensus_logs ( repo , nonconsensus_block_numbers , insert_options ) do
{ :ok , %{ token_transfers : deleted_token_transfers , logs : deleted_logs } }
end
end
end
end
@ -381,8 +390,8 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
select : map ( token_transfer , [ :transaction_hash , :log_index ] ) ,
select : map ( token_transfer , [ :transaction_hash , :log_index ] ) ,
inner_join : ordered_token_transfer in subquery ( ordered_token_transfers ) ,
inner_join : ordered_token_transfer in subquery ( ordered_token_transfers ) ,
on :
on :
ordered_token_transfer . transaction_hash ==
ordered_token_transfer . transaction_hash ==
token_transfer . transaction_hash and
token_transfer . transaction_hash and
ordered_token_transfer . log_index == token_transfer . log_index
ordered_token_transfer . log_index == token_transfer . log_index
)
)
@ -395,6 +404,44 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
{ :error , %{ exception : postgrex_error , block_numbers : nonconsensus_block_numbers } }
{ :error , %{ exception : postgrex_error , block_numbers : nonconsensus_block_numbers } }
end
end
end
end
defp remove_nonconsensus_logs ( repo , nonconsensus_block_numbers , %{ timeout : timeout } ) do
transaction_query =
from ( transaction in Transaction ,
where : transaction . block_number in ^ nonconsensus_block_numbers ,
select : map ( transaction , [ :hash ] ) ,
order_by : transaction . hash
)
ordered_logs =
from ( log in Log ,
inner_join : transaction in subquery ( transaction_query ) ,
on : log . transaction_hash == transaction . hash ,
select : map ( log , [ :transaction_hash , :index ] ) ,
order_by : [
log . transaction_hash ,
log . index
] ,
lock : " FOR UPDATE "
)
query =
from ( log in Log ,
select : map ( log , [ :transaction_hash , :index ] ) ,
inner_join : ordered_log in subquery ( ordered_logs ) ,
on : ordered_log . transaction_hash == log . transaction_hash and ordered_log . index == log . index
)
try do
{ _count , deleted_logs } = repo . delete_all ( query , timeout : timeout )
{ :ok , deleted_logs }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , block_numbers : nonconsensus_block_numbers } }
end
end
defp delete_address_token_balances ( _ , [ ] , _ ) , do : { :ok , [ ] }
defp delete_address_token_balances ( _ , [ ] , _ ) , do : { :ok , [ ] }
defp delete_address_token_balances ( repo , ordered_consensus_block_numbers , %{ timeout : timeout } ) do
defp delete_address_token_balances ( repo , ordered_consensus_block_numbers , %{ timeout : timeout } ) do