@ -5,11 +5,10 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
require Ecto.Query
import Ecto.Query , only : [ from : 2 , select : 2 , s ubquery : 1 , update : 2 ]
import Ecto.Query , only : [ from : 2 , subquery : 1 ]
alias Ecto.Adapters.SQL
alias Ecto . { Changeset , Multi , Repo }
alias Explorer.Chain . { Address , Block , Hash , Import , InternalTransaction , Transaction }
alias Explorer.Chain . { Address , Block , Import , InternalTransaction , Log , TokenTransfer , Transaction }
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import.Runner
alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances
@ -45,39 +44,68 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Map . put_new ( :timeout , @timeout )
|> Map . put ( :timestamps , timestamps )
ordered_consensus_block_numbers = ordered_consensus_block_numbers ( changes_list )
where_invalid_neighbour = where_invalid_neighbour ( changes_list )
where_forked = where_forked ( changes_list )
hashes = Enum . map ( changes_list , & &1 . hash )
consensus_block_numbers = consensus_block_numbers ( changes_list )
# Enforce ShareLocks tables order (see docs: sharelocks.md)
multi
|> Multi . run ( :derive_transaction_forks , fn repo , _ ->
derive_transaction_forks ( %{
repo : repo ,
timeout : options [ Runner.Transaction.Forks . option_key ( ) ] [ :timeout ] || Runner.Transaction.Forks . timeout ( ) ,
timestamps : timestamps ,
where_forked : where_forked
|> Multi . run ( :lose_consensus , fn repo , _ ->
lose_consensus ( repo , hashes , consensus_block_numbers , changes_list , insert_options )
end )
|> Multi . run ( :blocks , fn repo , _ ->
# Note, needs to be executed after `lose_consensus` for lock acquisition
insert ( repo , changes_list , insert_options )
end )
|> Multi . run ( :uncle_fetched_block_second_degree_relations , fn repo , %{ blocks : blocks } when is_list ( blocks ) ->
update_block_second_degree_relations ( repo , hashes , %{
timeout :
options [ Runner.Block.SecondDegreeRelations . option_key ( ) ] [ :timeout ] ||
Runner.Block.SecondDegreeRelations . timeout ( ) ,
timestamps : timestamps
} )
end )
# MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table
|> Multi . run ( :delete_rewards , fn repo , _ ->
delete_rewards ( repo , changes_list , insert_options )
end )
|> Multi . run ( :fork_transactions , fn repo , _ ->
fork_transactions ( %{
repo : repo ,
timeout : options [ Runner.Transactions . option_key ( ) ] [ :timeout ] || Runner.Transactions . timeout ( ) ,
timestamps : timestamps ,
where_forked : where_forked
blocks_changes : changes_list
} )
end )
|> Multi . run ( :lose_consensus , fn repo , _ ->
lose_consensus ( repo , ordered_consensus_block_numbers , insert_options )
|> Multi . run ( :derive_transaction_forks , fn repo , %{ fork_transactions : transactions } ->
derive_transaction_forks ( %{
repo : repo ,
timeout : options [ Runner.Transaction.Forks . option_key ( ) ] [ :timeout ] || Runner.Transaction.Forks . timeout ( ) ,
timestamps : timestamps ,
transactions : transactions
} )
end )
|> Multi . run ( :remove_nonconsensus_logs , fn repo , %{ derive_transaction_forks : transactions } ->
remove_nonconsensus_logs ( repo , transactions , insert_options )
end )
|> Multi . run ( :acquire_internal_transactions , fn repo , %{ derive_transaction_forks : transactions } ->
acquire_internal_transactions ( repo , hashes , transactions )
end )
|> Multi . run ( :lose_invalid_neighbour_consensus , fn repo , _ ->
lose_invalid_neighbour_consensus ( repo , where_invalid_neighbour , insert_options )
|> Multi . run ( :remove_nonconsensus_internal_transactions , fn repo , %{ derive_transaction_forks : transactions } ->
remove_nonconsensus_internal_transactions ( repo , transactions , insert_options )
end )
|> Multi . run ( :internal_transaction_transaction_block_number , fn repo , _ ->
update_internal_transaction_block_number ( repo , hashes )
end )
|> Multi . run ( :acquire_contract_address_tokens , fn repo , _ ->
acquire_contract_address_tokens ( repo , consensus_block_numbers )
end )
|> Multi . run ( :remove_nonconsensus_token_transfers , fn repo , %{ derive_transaction_forks : transactions } ->
remove_nonconsensus_token_transfers ( repo , transactions , insert_options )
end )
|> Multi . run ( :delete_address_token_balances , fn repo , _ ->
delete_address_token_balances ( repo , ordered_consensus_block_numbers , insert_options )
delete_address_token_balances ( repo , consensus_block_numbers , insert_options )
end )
|> Multi . run ( :delete_address_current_token_balances , fn repo , _ ->
delete_address_current_token_balances ( repo , ordered_consensus_block_numbers , insert_options )
delete_address_current_token_balances ( repo , consensus_block_numbers , insert_options )
end )
|> Multi . run ( :derive_address_current_token_balances , fn repo ,
%{
@ -94,130 +122,131 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
deltas = CurrentTokenBalances . token_holder_count_deltas ( %{ deleted : deleted , inserted : inserted } )
Tokens . update_holder_counts_with_deltas ( repo , deltas , insert_options )
end )
|> Multi . run ( :delete_rewards , fn repo , _ ->
delete_rewards ( repo , changes_list , insert_options )
end )
|> Multi . run ( :blocks , fn repo , _ ->
insert ( repo , changes_list , insert_options )
end )
|> Multi . run ( :uncle_fetched_block_second_degree_relations , fn repo , %{ blocks : blocks } when is_list ( blocks ) ->
update_block_second_degree_relations (
repo ,
blocks ,
%{
timeout :
options [ Runner.Block.SecondDegreeRelations . option_key ( ) ] [ :timeout ] ||
Runner.Block.SecondDegreeRelations . timeout ( ) ,
timestamps : timestamps
}
)
end )
|> Multi . run (
:internal_transaction_transaction_block_number ,
fn repo , %{ blocks : blocks } ->
blocks_hashes = Enum . map ( blocks , & &1 . hash )
query =
from (
internal_transaction in InternalTransaction ,
join : transaction in Transaction ,
on : internal_transaction . transaction_hash == transaction . hash ,
join : block in Block ,
on : block . hash == transaction . block_hash ,
where : block . hash in ^ blocks_hashes ,
update : [
set : [
block_number : block . number
]
]
)
{ total , _ } = repo . update_all ( query , [ ] )
{ :ok , total }
end
)
end
@impl Runner
def timeout , do : @timeout
# sobelow_skip ["SQL.Query"]
defp derive_transaction_forks ( %{
repo : repo ,
timeout : timeout ,
timestamps : %{ inserted_at : inserted_at , updated_at : updated_at } ,
where_forked : where_forked
} ) do
defp acquire_contract_address_tokens ( repo , consensus_block_numbers ) do
query =
from ( transaction in where_forked ,
select : [
transaction . block_hash ,
transaction . index ,
transaction . hash ,
type ( ^ inserted_at , transaction . inserted_at ) ,
type ( ^ updated_at , transaction . updated_at )
from ( address_current_token_balance in Address.CurrentTokenBalance ,
where : address_current_token_balance . block_number in ^ consensus_block_numbers ,
select : address_current_token_balance . token_contract_address_hash
)
contract_address_hashes = repo . all ( query )
Tokens . acquire_contract_address_tokens ( repo , contract_address_hashes )
end
defp acquire_internal_transactions ( repo , hashes , forked_transaction_hashes ) do
query =
from ( internal_transaction in InternalTransaction ,
join : transaction in Transaction ,
on : internal_transaction . transaction_hash == transaction . hash ,
where : transaction . block_hash in ^ hashes ,
or_where : transaction . hash in ^ forked_transaction_hashes ,
select : { internal_transaction . transaction_hash , internal_transaction . index } ,
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md)
order_by : [
internal_transaction . transaction_hash ,
internal_transaction . index
] ,
# order so that row ShareLocks are grabbed in a consistent order with
# `Explorer.Chain.Import.Runner.Transactions.insert`
order_by : transaction . hash
# NOTE: find a better way to know the alias that ecto gives to token
lock : " FOR UPDATE OF i0 "
)
{ select_sql , parameters } = SQL . to_sql ( :all , repo , query )
insert_sql = """
INSERT INTO transaction_forks ( uncle_hash , index , hash , inserted_at , updated_at )
#{select_sql}
ON CONFLICT ( uncle_hash , index )
DO UPDATE SET hash = EXCLUDED . hash
WHERE EXCLUDED . hash <> transaction_forks . hash
RETURNING uncle_hash , hash
"""
with { :ok , % Postgrex.Result { columns : [ " uncle_hash " , " hash " ] , command : :insert , rows : rows } } <-
SQL . query (
repo ,
insert_sql ,
parameters ,
timeout : timeout
) do
derived_transaction_forks = Enum . map ( rows , fn [ uncle_hash , hash ] -> %{ uncle_hash : uncle_hash , hash : hash } end )
{ :ok , derived_transaction_forks }
end
{ :ok , repo . all ( query ) }
end
defp fork_transactions ( %{
repo : repo ,
timeout : timeout ,
timestamps : %{ updated_at : updated_at } ,
where_forked : where_forked
blocks_changes : blocks_changes
} ) do
query =
where_forked
|> update (
set : [
block_hash : nil ,
block_number : nil ,
gas_used : nil ,
cumulative_gas_used : nil ,
index : nil ,
internal_transactions_indexed_at : nil ,
status : nil ,
error : nil ,
updated_at : ^ updated_at
]
from (
transaction in where_forked ( blocks_changes ) ,
select : %{
block_hash : transaction . block_hash ,
index : transaction . index ,
hash : transaction . hash
} ,
# Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by : [ asc : :hash ] ,
lock : " FOR UPDATE "
)
|> select ( [ :hash ] )
try do
{ _ , result } = repo . update_all ( query , [ ] , timeout : timeout )
update_query =
from (
t in Transaction ,
join : s in subquery ( query ) ,
on : t . hash == s . hash ,
update : [
set : [
block_hash : nil ,
block_number : nil ,
gas_used : nil ,
cumulative_gas_used : nil ,
index : nil ,
internal_transactions_indexed_at : nil ,
status : nil ,
error : nil ,
updated_at : ^ updated_at
]
] ,
select : %{
block_hash : s . block_hash ,
index : s . index ,
hash : s . hash
}
)
{ :ok , result }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error } }
end
{ _num , transactions } = repo . update_all ( update_query , [ ] , timeout : timeout )
{ :ok , transactions }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error } }
end
defp derive_transaction_forks ( %{
repo : repo ,
timeout : timeout ,
timestamps : %{ inserted_at : inserted_at , updated_at : updated_at } ,
transactions : transactions
} ) do
transaction_forks =
transactions
|> Enum . map ( fn transaction ->
%{
uncle_hash : transaction . block_hash ,
index : transaction . index ,
hash : transaction . hash ,
inserted_at : inserted_at ,
updated_at : updated_at
}
end )
# Enforce Fork ShareLocks order (see docs: sharelocks.md)
|> Enum . sort_by ( & { &1 . uncle_hash , &1 . index } )
{ _total , forked_transaction } =
repo . insert_all (
Transaction.Fork ,
transaction_forks ,
conflict_target : [ :uncle_hash , :index ] ,
on_conflict :
from (
transaction_fork in Transaction.Fork ,
update : [ set : [ hash : fragment ( " EXCLUDED.hash " ) ] ] ,
where : fragment ( " EXCLUDED.hash <> ? " , transaction_fork . hash )
) ,
returning : [ :hash ] ,
timeout : timeout
)
{ :ok , Enum . map ( forked_transaction , & &1 . hash ) }
end
@spec insert ( Repo . t ( ) , [ map ( ) ] , %{
@ -228,8 +257,8 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
defp insert ( repo , changes_list , %{ timeout : timeout , timestamps : timestamps } = options ) when is_list ( changes_list ) do
on_conflict = Map . get_lazy ( options , :on_conflict , & default_on_conflict / 0 )
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum . sort_by ( changes_list , & { &1 . number , &1 . hash } )
# Enforce Block ShareLocks order (see docs: sharelocks.md)
ordered_changes_list = Enum . sort_by ( changes_list , & &1 . hash )
Import . insert_changes_list (
repo ,
@ -277,85 +306,135 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
)
end
defp ordered_ consensus_block_numbers( blocks_changes ) when is_list ( blocks_changes ) do
defp consensus_block_numbers ( blocks_changes ) when is_list ( blocks_changes ) do
blocks_changes
|> Enum . reduce ( MapSet . new ( ) , fn
%{ consensus : true , number : number } , acc ->
MapSet . put ( acc , number )
%{ consensus : false } , acc ->
acc
end )
|> Enum . sort ( )
|> Enum . filter ( & &1 . consensus )
|> Enum . map ( & &1 . number )
end
defp lose_consensus ( _ , [ ] , _ ) , do : { :ok , [ ] }
defp lose_consensus ( repo , hashes , consensus_block_numbers , changes_list , %{
timeout : timeout ,
timestamps : %{ updated_at : updated_at }
} ) do
acquire_query =
from (
block in where_invalid_neighbour ( changes_list ) ,
or_where : block . number in ^ consensus_block_numbers ,
# we also need to acquire blocks that will be upserted here, for ordering
or_where : block . hash in ^ hashes ,
select : block . hash ,
# Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by : [ asc : block . hash ] ,
lock : " FOR UPDATE "
)
defp lose_consensus ( repo , ordered_consensus_block_number , %{ timeout : timeout , timestamps : %{ updated_at : updated_at } } )
when is_list ( ordered_consensus_block_number ) do
query =
{ _ , removed_consensus_block_hashes } =
repo . update_all (
from (
block in Block ,
join : s in subquery ( acquire_query ) ,
on : block . hash == s . hash ,
# we don't want to remove consensus from blocks that will be upserted
where : block . hash not in ^ hashes ,
select : block . hash
) ,
[ set : [ consensus : false , updated_at : updated_at ] ] ,
timeout : timeout
)
{ :ok , removed_consensus_block_hashes }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , consensus_block_numbers : consensus_block_numbers } }
end
defp remove_nonconsensus_token_transfers ( repo , forked_transaction_hashes , %{ timeout : timeout } ) do
ordered_token_transfers =
from (
block in Block ,
where : block . number in ^ ordered_consensus_block_number ,
update : [
set : [
consensus : false ,
updated_at : ^ updated_at
]
token_transfer in TokenTransfer ,
where : token_transfer . transaction_hash in ^ forked_transaction_hashes ,
select : token_transfer . transaction_hash ,
# Enforce TokenTransfer ShareLocks order (see docs: sharelocks.md)
order_by : [
token_transfer . transaction_hash ,
token_transfer . log_index
] ,
select : [ :hash , :number ]
lock : " FOR UPDATE "
)
try do
{ _ , result } = repo . update_all ( query , [ ] , timeout : timeout )
query =
from ( token_transfer in TokenTransfer ,
select : map ( token_transfer , [ :transaction_hash , :log_index ] ) ,
inner_join : ordered_token_transfer in subquery ( ordered_token_transfers ) ,
on : ordered_token_transfer . transaction_hash == token_transfer . transaction_hash
)
{ :ok , result }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , consensus_block_numbers : ordered_consensus_block_number } }
end
{ _count , deleted_token_transfers } = repo . delete_all ( query , timeout : timeout )
{ :ok , deleted_token_transfers }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , transactions : forked_transaction_hashes } }
end
defp lose_invalid_neighbour_consensus ( repo , where_invalid_neighbour , %{
timeout : timeout ,
timestamps : %{ updated_at : updated_at }
} ) do
defp remove_nonconsensus_internal_transactions ( repo , forked_transaction_hashes , %{ timeout : timeout } ) do
query =
from ( internal_transaction in InternalTransaction ,
where : internal_transaction . transaction_hash in ^ forked_transaction_hashes ,
select : map ( internal_transaction , [ :transaction_hash , :index ] )
)
# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md)
{ _count , deleted_internal_transactions } = repo . delete_all ( query , timeout : timeout )
{ :ok , deleted_internal_transactions }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , transactions : forked_transaction_hashes } }
end
defp remove_nonconsensus_logs ( repo , forked_transaction_hashes , %{ timeout : timeout } ) do
ordered_logs =
from (
block in where_invalid_neighbour ,
update : [
set : [
consensus : false ,
updated_at : ^ updated_at
]
log in Log ,
where : log . transaction_hash in ^ forked_transaction_hashes ,
select : log . transaction_hash ,
# Enforce Log ShareLocks order (see docs: sharelocks.md)
order_by : [
log . transaction_hash ,
log . index
] ,
select : [ :hash , :number ]
lock : " FOR UPDATE "
)
try do
{ _ , result } = repo . update_all ( query , [ ] , timeout : timeout )
query =
from ( log in Log ,
select : map ( log , [ :transaction_hash , :index ] ) ,
inner_join : ordered_log in subquery ( ordered_logs ) ,
on : ordered_log . transaction_hash == log . transaction_hash
)
{ :ok , result }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , where_invalid_neighbour : where_invalid_neighbour } }
end
{ _count , deleted_logs } = repo . delete_all ( query , timeout : timeout )
{ :ok , deleted_logs }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , transactions : forked_transaction_hashes } }
end
defp delete_address_token_balances ( _ , [ ] , _ ) , do : { :ok , [ ] }
defp delete_address_token_balances ( repo , ordered_ consensus_block_numbers, %{ timeout : timeout } ) do
defp delete_address_token_balances ( repo , consensus_block_numbers , %{ timeout : timeout } ) do
ordered_query =
from ( address_token_balance in Address.TokenBalance ,
where : address_token_balance . block_number in ^ ordered_ consensus_block_numbers,
where : address_token_balance . block_number in ^ consensus_block_numbers ,
select : map ( address_token_balance , [ :address_hash , :token_contract_address_hash , :block_number ] ) ,
# MUST match order in `Explorer.Chain.Import.Runner.Address.TokenBalances.insert` to prevent ShareLock ordering deadlocks.
# Enforce TokenBalance ShareLocks order (see docs: sharelocks.md)
order_by : [
address_token_balance . address_hash ,
address_token_balance . token_contract_address_hash ,
address_token_balance . block_number
] ,
# ensures rows remains locked while outer query is joining to it
lock : " FOR UPDATE "
)
@ -376,23 +455,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
{ :ok , deleted_address_token_balances }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , block_numbers : ordered_ consensus_block_numbers} }
{ :error , %{ exception : postgrex_error , block_numbers : consensus_block_numbers } }
end
end
defp delete_address_current_token_balances ( _ , [ ] , _ ) , do : { :ok , [ ] }
defp delete_address_current_token_balances ( repo , ordered_ consensus_block_numbers, %{ timeout : timeout } ) do
defp delete_address_current_token_balances ( repo , consensus_block_numbers , %{ timeout : timeout } ) do
ordered_query =
from ( address_current_token_balance in Address.CurrentTokenBalance ,
where : address_current_token_balance . block_number in ^ ordered_ consensus_block_numbers,
where : address_current_token_balance . block_number in ^ consensus_block_numbers ,
select : map ( address_current_token_balance , [ :address_hash , :token_contract_address_hash ] ) ,
# MUST match order in `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert` to prevent ShareLock ordering deadlocks.
# Enforce CurrentTokenBalance ShareLocks order (see docs: sharelocks.md)
order_by : [
address_current_token_balance . address_hash ,
address_current_token_balance . token_contract_address_hash
] ,
# ensures row remains locked while outer query is joining to it
lock : " FOR UPDATE "
)
@ -420,13 +498,12 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
{ :ok , deleted_address_current_token_balances }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , block_numbers : ordered_ consensus_block_numbers} }
{ :error , %{ exception : postgrex_error , block_numbers : consensus_block_numbers } }
end
end
defp derive_address_current_token_balances ( _ , [ ] , _ ) , do : { :ok , [ ] }
# sobelow_skip ["SQL.Query"]
defp derive_address_current_token_balances ( repo , deleted_address_current_token_balances , %{ timeout : timeout } )
when is_list ( deleted_address_current_token_balances ) do
initial_query =
@ -460,57 +537,39 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
address_token_balance . address_hash == new_current_token_balance . address_hash and
address_token_balance . token_contract_address_hash == new_current_token_balance . token_contract_address_hash and
address_token_balance . block_number == new_current_token_balance . block_number ,
select : {
new_current_token_balance . address_hash ,
new_current_token_balance . token_contract_address_hash ,
new_current_token_balance . block_number ,
address_token_balance . value ,
over ( min ( address_token_balance . inserted_at ) , :w ) ,
over ( max ( address_token_balance . updated_at ) , :w )
select : % {
address_hash : new_current_token_balance . address_hash ,
token_contract_address_hash : new_current_token_balance . token_contract_address_hash ,
block_number : new_current_token_balance . block_number ,
value : address_token_balance . value ,
inserted_at : over ( min ( address_token_balance . inserted_at ) , :w ) ,
updated_at : over ( max ( address_token_balance . updated_at ) , :w )
} ,
# Prevent ShareLock deadlock by matching order of `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert`
order_by : [ new_current_token_balance . address_hash , new_current_token_balance . token_contract_address_hash ] ,
windows : [
w : [ partition_by : [ address_token_balance . address_hash , address_token_balance . token_contract_address_hash ] ]
]
)
{ select_sql , parameters } = SQL . to_sql ( :all , repo , new_current_token_balance_query )
# No `ON CONFLICT` because `delete_address_current_token_balances` should have removed any conflicts.
insert_sql = """
INSERT INTO address_current_token_balances ( address_hash , token_contract_address_hash , block_number , value , inserted_at , updated_at )
#{select_sql}
RETURNING address_hash , token_contract_address_hash , block_number , value
"""
with { :ok ,
% Postgrex.Result {
columns : [
" address_hash " ,
" token_contract_address_hash " ,
" block_number " ,
# needed for `update_tokens_holder_count`
" value "
] ,
command : :insert ,
rows : rows
} } <- SQL . query ( repo , insert_sql , parameters , timeout : timeout ) do
derived_address_current_token_balances =
Enum . map ( rows , fn [ address_hash_bytes , token_contract_address_hash_bytes , block_number , value ] ->
{ :ok , address_hash } = Hash.Address . load ( address_hash_bytes )
{ :ok , token_contract_address_hash } = Hash.Address . load ( token_contract_address_hash_bytes )
%{
address_hash : address_hash ,
token_contract_address_hash : token_contract_address_hash ,
block_number : block_number ,
value : value
}
end )
{ :ok , derived_address_current_token_balances }
end
ordered_current_token_balance =
new_current_token_balance_query
|> repo . all ( )
# Enforce CurrentTokenBalance ShareLocks order (see docs: sharelocks.md)
|> Enum . sort_by ( & { &1 . address_hash , &1 . token_contract_address_hash } )
{ _total , result } =
repo . insert_all (
Address.CurrentTokenBalance ,
ordered_current_token_balance ,
# No `ON CONFLICT` because `delete_address_current_token_balances`
# should have removed any conflicts.
returning : [ :address_hash , :token_contract_address_hash , :block_number , :value ] ,
timeout : timeout
)
derived_address_current_token_balances =
Enum . map ( result , & Map . take ( &1 , [ :address_hash , :token_contract_address_hash , :block_number , :value ] ) )
{ :ok , derived_address_current_token_balances }
end
# `block_rewards` are linked to `blocks.hash`, but fetched by `blocks.number`, so when a block with the same number is
@ -528,11 +587,24 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
query =
from ( reward in Reward ,
inner_join : block in assoc ( reward , :block ) ,
where : block . hash in ^ hashes or block . number in ^ numbers
where : block . hash in ^ hashes or block . number in ^ numbers ,
# Enforce Reward ShareLocks order (see docs: sharelocks.md)
order_by : [ asc : :address_hash , asc : :address_type , asc : :block_hash ] ,
# NOTE: find a better way to know the alias that ecto gives to token
lock : " FOR UPDATE OF b0 "
)
delete_query =
from ( r in Reward ,
join : s in subquery ( query ) ,
on :
r . address_hash == s . address_hash and
r . address_type == s . address_type and
r . block_hash == s . block_hash
)
try do
{ count , nil } = repo . delete_all ( query , timeout : timeout )
{ count , nil } = repo . delete_all ( delete_ query, timeout : timeout )
{ :ok , count }
rescue
@ -541,34 +613,56 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end
end
defp update_block_second_degree_relations ( repo , blocks , %{ timeout : timeout , timestamps : %{ updated_at : updated_at } } )
when is_list ( blocks ) do
ordered_uncle_hashes =
blocks
|> MapSet . new ( & &1 . hash )
|> Enum . sort ( )
defp update_block_second_degree_relations ( repo , uncle_hashes , %{
timeout : timeout ,
timestamps : %{ updated_at : updated_at }
} )
when is_list ( uncle_hashes ) do
query =
from (
bsdr in Block.SecondDegreeRelation ,
where : bsdr . uncle_hash in ^ ordered_uncle_hashes ,
update : [
set : [
uncle_fetched_at : ^ updated_at
]
]
where : bsdr . uncle_hash in ^ uncle_hashes ,
# Enforce SeconDegreeRelation ShareLocks order (see docs: sharelocks.md)
order_by : [ asc : :nephew_hash , asc : :uncle_hash ] ,
lock : " FOR UPDATE "
)
update_query =
from (
b in Block.SecondDegreeRelation ,
join : s in subquery ( query ) ,
on : b . nephew_hash == s . nephew_hash and b . uncle_hash == s . uncle_hash ,
update : [ set : [ uncle_fetched_at : ^ updated_at ] ]
)
try do
{ _ , result } = repo . update_all ( query , [ ] , timeout : timeout )
{ _ , result } = repo . update_all ( update_ query, [ ] , timeout : timeout )
{ :ok , result }
rescue
postgrex_error in Postgrex.Error ->
{ :error , %{ exception : postgrex_error , uncle_hashes : ordered_ uncle_hashes} }
{ :error , %{ exception : postgrex_error , uncle_hashes : uncle_hashes } }
end
end
defp update_internal_transaction_block_number ( repo , blocks_hashes ) when is_list ( blocks_hashes ) do
query =
from (
internal_transaction in InternalTransaction ,
join : transaction in Transaction ,
on : internal_transaction . transaction_hash == transaction . hash ,
join : block in Block ,
on : block . hash == transaction . block_hash ,
where : block . hash in ^ blocks_hashes ,
update : [ set : [ block_number : block . number ] ]
)
# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md)
{ total , _ } = repo . update_all ( query , [ ] )
{ :ok , total }
end
defp where_forked ( blocks_changes ) when is_list ( blocks_changes ) do
initial = from ( t in Transaction , where : false )