@ -15,6 +15,7 @@ defmodule Explorer.Chain do
preload : 2 ,
select : 2 ,
subquery : 1 ,
union : 2 ,
union_all : 2 ,
where : 2 ,
where : 3
@ -38,6 +39,7 @@ defmodule Explorer.Chain do
Import ,
InternalTransaction ,
Log ,
PendingBlockOperation ,
SmartContract ,
StakingPool ,
Token ,
@ -193,8 +195,66 @@ defmodule Explorer.Chain do
direction = Keyword . get ( options , :direction )
paging_options = Keyword . get ( options , :paging_options , @default_paging_options )
InternalTransaction
|> InternalTransaction . where_address_fields_match ( hash , direction )
if direction == nil do
query_to_address_hash_wrapped =
InternalTransaction
|> InternalTransaction . where_address_fields_match ( hash , :to_address_hash )
|> common_where_limit_order ( paging_options )
|> wrapped_union_subquery ( )
query_from_address_hash_wrapped =
InternalTransaction
|> InternalTransaction . where_address_fields_match ( hash , :from_address_hash )
|> common_where_limit_order ( paging_options )
|> wrapped_union_subquery ( )
query_created_contract_address_hash_wrapped =
InternalTransaction
|> InternalTransaction . where_address_fields_match ( hash , :created_contract_address_hash )
|> common_where_limit_order ( paging_options )
|> wrapped_union_subquery ( )
full_query =
query_to_address_hash_wrapped
|> union ( ^ query_from_address_hash_wrapped )
|> union ( ^ query_created_contract_address_hash_wrapped )
full_wrapped_query =
from (
q in subquery ( full_query ) ,
select : q
)
full_wrapped_query
|> order_by (
[ q ] ,
desc : q . block_number ,
desc : q . transaction_index ,
desc : q . index
)
|> preload ( transaction : :block )
|> join_associations ( necessity_by_association )
|> Repo . all ( )
else
InternalTransaction
|> InternalTransaction . where_nonpending_block ( )
|> InternalTransaction . where_address_fields_match ( hash , direction )
|> common_where_limit_order ( paging_options )
|> preload ( transaction : :block )
|> join_associations ( necessity_by_association )
|> Repo . all ( )
end
end
def wrapped_union_subquery ( query ) do
from (
q in subquery ( query ) ,
select : q
)
end
defp common_where_limit_order ( query , paging_options ) do
query
|> InternalTransaction . where_is_different_from_parent_transaction ( )
|> InternalTransaction . where_block_number_is_not_null ( )
|> page_internal_transaction ( paging_options )
@ -205,9 +265,6 @@ defmodule Explorer.Chain do
desc : it . transaction_index ,
desc : it . index
)
|> preload ( transaction : :block )
|> join_associations ( necessity_by_association )
|> Repo . all ( )
end
@doc """
@ -261,10 +318,10 @@ defmodule Explorer.Chain do
paging_options = Keyword . get ( options , :paging_options , @default_paging_options )
if Application . get_env ( :block_scout_web , BlockScoutWeb.Chain ) [ :has_emission_funds ] do
blocks_range = address_to_transactions_tasks_range_of_blocks ( address_hash , options )
rewards_task =
Task . async ( fn ->
Reward . fetch_emission_rewards_tuples ( address_hash , paging_options )
end )
Task . async ( fn -> Reward . fetch_emission_rewards_tuples ( address_hash , paging_options , blocks_range ) end )
[ rewards_task | address_to_transactions_tasks ( address_hash , options ) ]
|> wait_for_address_transactions ( )
@ -303,21 +360,72 @@ defmodule Explorer.Chain do
|> Enum . take ( paging_options . page_size )
end
defp address_to_transactions_tasks_query ( options ) do
options
|> Keyword . get ( :paging_options , @default_paging_options )
|> fetch_transactions ( )
end
defp address_to_transactions_tasks ( address_hash , options ) do
paging_options = Keyword . get ( options , :paging_options , @default_paging_options )
direction = Keyword . get ( options , :direction )
necessity_by_association = Keyword . get ( options , :necessity_by_association , %{ } )
base_query =
paging_options
|> fetch_transactions ( )
|> join_associations ( necessity_by_association )
base_query
options
|> address_to_transactions_tasks_query ( )
|> join_associations ( necessity_by_association )
|> Transaction . matching_address_queries_list ( direction , address_hash )
|> Enum . map ( fn query -> Task . async ( fn -> Repo . all ( query ) end ) end )
end
defp address_to_transactions_tasks_range_of_blocks ( address_hash , options ) do
direction = Keyword . get ( options , :direction )
extremums_list =
options
|> address_to_transactions_tasks_query ( )
|> Transaction . matching_address_queries_list ( direction , address_hash )
|> Enum . map ( fn query ->
max_query =
from (
q in subquery ( query ) ,
select : %{ min_block_number : min ( q . block_number ) , max_block_number : max ( q . block_number ) }
)
max_query
|> Repo . one! ( )
end )
extremums_list
|> Enum . reduce ( %{ min_block_number : nil , max_block_number : 0 } , fn %{
min_block_number : min_number ,
max_block_number : max_number
} ,
extremums_result ->
current_min_number = Map . get ( extremums_result , :min_block_number )
current_max_number = Map . get ( extremums_result , :max_block_number )
extremums_result =
if is_number ( current_min_number ) do
if is_number ( min_number ) and min_number > 0 and min_number < current_min_number do
extremums_result
|> Map . put ( :min_block_number , min_number )
else
extremums_result
end
else
extremums_result
|> Map . put ( :min_block_number , min_number )
end
if is_number ( max_number ) and max_number > 0 and max_number > current_max_number do
extremums_result
|> Map . put ( :max_block_number , max_number )
else
extremums_result
end
end )
end
defp wait_for_address_transactions ( tasks ) do
tasks
|> Task . yield_many ( :timer . seconds ( 20 ) )
@ -355,9 +463,9 @@ defmodule Explorer.Chain do
base_query =
from ( log in Log ,
inner_join : transaction in assoc ( log , :transaction ) ,
order_by : [ desc : transaction . block_number , desc : transaction . index ] ,
preload : [ :transaction , transaction : [ to_address : :smart_contract ] ] ,
inner_join : transaction in Transaction ,
on : transaction . hash == log . transaction_hash ,
order_by : [ desc : log . block_number , desc : log . index ] ,
where : transaction . block_number < ^ block_number ,
or_where : transaction . block_number == ^ block_number and transaction . index > ^ transaction_index ,
or_where :
@ -368,7 +476,19 @@ defmodule Explorer.Chain do
select : log
)
base_query
wrapped_query =
from (
log in subquery ( base_query ) ,
inner_join : transaction in Transaction ,
preload : [ :transaction , transaction : [ to_address : :smart_contract ] ] ,
where :
log . block_hash == transaction . block_hash and
log . block_number == transaction . block_number and
log . transaction_hash == transaction . hash ,
select : log
)
wrapped_query
|> filter_topic ( options )
|> Repo . all ( )
|> Enum . take ( paging_options . page_size )
@ -718,33 +838,25 @@ defmodule Explorer.Chain do
end
@doc """
Checks to see if the chain is down indexing based on the transaction from the oldest block having
an ` internal_transactions_indexed_at ` date .
Checks to see if the chain is down indexing based on the transaction from the
oldest block and the ` fetch_ internal_transactions` pending operation
"""
@spec finished_indexing? ( ) :: boolean ( )
def finished_indexing? do
transaction_exists =
Transaction
|> limit ( 1 )
|> Repo . one ( )
min_block_number_transaction = Repo . aggregate ( Transaction , :min , :block_number )
with { :transactions_exist , true } <- { :transactions_exist , Repo . exists? ( Transaction ) } ,
min_block_number when not is_nil ( min_block_number ) <- Repo . aggregate ( Transaction , :min , :block_number ) do
query =
from (
b in Block ,
join : pending_ops in assoc ( b , :pending_operations ) ,
where : pending_ops . fetch_internal_transactions ,
where : b . consensus and b . number == ^ min_block_number
)
if transaction_exists do
if min_block_number_transaction do
Transaction
|> where ( [ t ] , t . block_number == ^ min_block_number_transaction and is_nil ( t . internal_transactions_indexed_at ) )
|> limit ( 1 )
|> Repo . one ( )
|> case do
nil -> true
_ -> false
end
else
false
end
! Repo . exists? ( query )
else
true
{ :transactions_exist , false } -> true
nil -> false
end
end
@ -1322,6 +1434,20 @@ defmodule Explorer.Chain do
Repo . one! ( query )
end
@spec fetch_sum_coin_total_supply_minus_burnt ( ) :: non_neg_integer
def fetch_sum_coin_total_supply_minus_burnt do
{ :ok , burn_address_hash } = string_to_address_hash ( " 0x0000000000000000000000000000000000000000 " )
query =
from (
a0 in Address ,
select : fragment ( " SUM(a0.fetched_coin_balance) " ) ,
where : a0 . hash != ^ burn_address_hash
)
Repo . one! ( query ) || 0
end
@spec fetch_sum_coin_total_supply ( ) :: non_neg_integer
def fetch_sum_coin_total_supply do
query =
@ -1336,11 +1462,8 @@ defmodule Explorer.Chain do
@doc """
The number of ` t :Explorer.Chain.InternalTransaction . t / 0 ` .
iex > transaction =
... > :transaction |>
... > insert ( ) |>
... > with_block ( )
iex > insert ( :internal_transaction , index : 0 , transaction : transaction )
iex > transaction = :transaction |> insert ( ) |> with_block ( )
iex > insert ( :internal_transaction , index : 0 , transaction : transaction , block_hash : transaction . block_hash , block_index : 0 )
iex > Explorer.Chain . internal_transaction_count ( )
1
@ -1351,7 +1474,7 @@ defmodule Explorer.Chain do
"""
def internal_transaction_count do
Repo . one! ( from ( it in " internal_transactions " , select : fragment ( " COUNT(*) " ) ) )
Repo . aggregate ( InternalTransaction . where_nonpending_block ( ) , :count , :transaction_hash )
end
@doc """
@ -1627,18 +1750,20 @@ defmodule Explorer.Chain do
end
@doc """
Returns a stream of all blocks with unfetched internal transactions .
Returns a stream of all blocks with unfetched internal transactions , using
the ` pending_block_operation ` table .
Only blocks with consensus are returned .
iex > non_consensus = insert ( :block , consensus : false )
iex > insert ( :pending_block_operation , block : non_consensus , fetch_internal_transactions : true )
iex > unfetched = insert ( :block )
iex > fetched = insert ( :block , internal_transactions_indexed_at : DateTime . utc_now ( ) )
iex > to_be_refetched = insert ( :block , refetch_needed : true )
iex > insert ( :pending_block_operation , block : unfetched , fetch_internal_transactions : true )
iex > fetched = insert ( :block )
iex > insert ( :pending_block_operation , block : fetched , fetch_internal_transactions : false )
iex > { :ok , number_set } = Explorer.Chain . stream_blocks_with_unfetched_internal_transactions (
... > [ :number ] ,
... > MapSet . new ( ) ,
... > fn % Explorer.Chain.Block { number : number } , acc ->
... > fn number , acc ->
... > MapSet . put ( acc , number )
... > end
... > )
@ -1648,112 +1773,55 @@ defmodule Explorer.Chain do
true
iex > fetched . hash in number_set
false
iex > to_be_refetched . number in number_set
false
"""
@spec stream_blocks_with_unfetched_internal_transactions (
fields :: [
:consensus
| :difficulty
| :gas_limit
| :gas_used
| :hash
| :miner
| :miner_hash
| :nonce
| :number
| :parent_hash
| :size
| :timestamp
| :total_difficulty
| :transactions
| :internal_transactions_indexed_at
] ,
initial :: accumulator ,
reducer :: ( entry :: term ( ) , accumulator -> accumulator )
) :: { :ok , accumulator }
when accumulator : term ( )
def stream_blocks_with_unfetched_internal_transactions ( fields , initial , reducer ) when is_function ( reducer , 2 ) do
def stream_blocks_with_unfetched_internal_transactions ( initial , reducer ) when is_function ( reducer , 2 ) do
query =
from (
b in Block ,
join : pending_ops in assoc ( b , :pending_operations ) ,
where : pending_ops . fetch_internal_transactions ,
where : b . consensus ,
where : is_nil ( b . internal_transactions_indexed_at ) ,
where : not b . refetch_needed ,
select : ^ fields
select : b . number
)
Repo . stream_reduce ( query , initial , reducer )
end
@doc """
Returns a stream of all collated transactions with unfetched internal transactions .
def remove_nonconsensus_blocks_from_pending_ops ( block_hashes ) do
query =
from (
po in PendingBlockOperation ,
where : po . block_hash in ^ block_hashes
)
Only transactions that have been collated into a block are returned ; pending transactions not in a block are filtered
out .
{ _ , _ } = Repo . delete_all ( query )
iex > pending = insert ( :transaction )
iex > unfetched_collated =
... > :transaction |>
... > insert ( ) |>
... > with_block ( )
iex > fetched_collated =
... > :transaction |>
... > insert ( ) |>
... > with_block ( internal_transactions_indexed_at : DateTime . utc_now ( ) )
iex > { :ok , hash_set } = Explorer.Chain . stream_transactions_with_unfetched_internal_transactions (
... > [ :hash ] ,
... > MapSet . new ( ) ,
... > fn % Explorer.Chain.Transaction { hash : hash } , acc ->
... > MapSet . put ( acc , hash )
... > end
... > )
iex > pending . hash in hash_set
false
iex > unfetched_collated . hash in hash_set
true
iex > fetched_collated . hash in hash_set
false
:ok
end
"""
@spec stream_transactions_with_unfetched_internal_transactions (
fields :: [
:block_hash
| :internal_transactions_indexed_at
| :from_address_hash
| :gas
| :gas_price
| :hash
| :index
| :input
| :nonce
| :r
| :s
| :to_address_hash
| :v
| :value
] ,
initial :: accumulator ,
reducer :: ( entry :: term ( ) , accumulator -> accumulator )
) :: { :ok , accumulator }
when accumulator : term ( )
def stream_transactions_with_unfetched_internal_transactions ( fields , initial , reducer ) when is_function ( reducer , 2 ) do
def remove_nonconsensus_blocks_from_pending_ops do
query =
from (
t in Transac tion,
# exclude pending transactions and replaced transactions
where : not is_nil ( t . block_hash ) and is_nil ( t . internal_transactions_indexed_at ) ,
select : ^ fields
po in PendingBlockOperation ,
inner_join : block in Block ,
on : block . hash == po . block_hash ,
where : block . consensus == false
)
Repo . stream_reduce ( query , initial , reducer )
{ _ , _ } = Repo . delete_all ( query )
:ok
end
@spec stream_transactions_with_unfetched_created_contract_codes (
fields :: [
:block_hash
| :internal_transactions_indexed_at
| :created_contract_code_indexed_at
| :from_address_hash
| :gas
@ -1788,7 +1856,6 @@ defmodule Explorer.Chain do
@spec stream_mined_transactions (
fields :: [
:block_hash
| :internal_transactions_indexed_at
| :created_contract_code_indexed_at
| :from_address_hash
| :gas
@ -1820,7 +1887,6 @@ defmodule Explorer.Chain do
@spec stream_pending_transactions (
fields :: [
:block_hash
| :internal_transactions_indexed_at
| :created_contract_code_indexed_at
| :from_address_hash
| :gas
@ -2202,8 +2268,8 @@ defmodule Explorer.Chain do
## Options
* ` :necessity_by_association ` - use to load ` t :association / 0 ` as ` :required ` or ` :optional ` . If an association is
` :required ` , and the ` t :Explorer.Chain.Internal Transaction . t / 0 ` has no associated record for that association ,
then the ` t :Explorer.Chain.Internal Transaction . t / 0 ` will not be included in the list .
` :required ` , and the ` t :Explorer.Chain.Transaction . t / 0 ` has no associated record for that association ,
then the ` t :Explorer.Chain.Transaction . t / 0 ` will not be included in the list .
* ` :paging_options ` - a ` t :Explorer.PagingOptions . t / 0 ` used to specify the ` :page_size ` and
` :key ` ( a tuple of the lowest / oldest ` { block_number , index } ` ) and . Results will be the transactions older than
the ` block_number ` and ` index ` that are passed .
@ -2257,8 +2323,8 @@ defmodule Explorer.Chain do
## Options
* ` :necessity_by_association ` - use to load ` t :association / 0 ` as ` :required ` or ` :optional ` . If an association is
` :required ` , and the ` t :Explorer.Chain.Internal Transaction . t / 0 ` has no associated record for that association ,
then the ` t :Explorer.Chain.Internal Transaction . t / 0 ` will not be included in the list .
` :required ` , and the ` t :Explorer.Chain.Transaction . t / 0 ` has no associated record for that association ,
then the ` t :Explorer.Chain.Transaction . t / 0 ` will not be included in the list .
* ` :paging_options ` - a ` t :Explorer.PagingOptions . t / 0 ` used to specify the ` :page_size ` ( defaults to
` #{@default_paging_options.page_size}`) and `:key` (a tuple of the lowest/oldest `{inserted_at, hash}`) and.
Results will be the transactions older than the ` inserted_at ` and ` hash ` that are passed .
@ -2438,6 +2504,8 @@ defmodule Explorer.Chain do
|> for_parent_transaction ( hash )
|> join_associations ( necessity_by_association )
|> where_transaction_has_multiple_internal_transactions ( )
|> InternalTransaction . where_is_different_from_parent_transaction ( )
|> InternalTransaction . where_nonpending_block ( )
|> page_internal_transaction ( paging_options )
|> limit ( ^ paging_options . page_size )
|> order_by ( [ internal_transaction ] , asc : internal_transaction . index )
@ -2463,8 +2531,15 @@ defmodule Explorer.Chain do
necessity_by_association = Keyword . get ( options , :necessity_by_association , %{ } )
paging_options = Keyword . get ( options , :paging_options , @default_paging_options )
Log
|> join ( :inner , [ log ] , transaction in assoc ( log , :transaction ) )
log_with_transactions =
from ( log in Log ,
inner_join : transaction in Transaction ,
on :
transaction . block_hash == log . block_hash and transaction . block_number == log . block_number and
transaction . hash == log . transaction_hash
)
log_with_transactions
|> where ( [ _ , transaction ] , transaction . hash == ^ transaction_hash )
|> page_logs ( paging_options )
|> limit ( ^ paging_options . page_size )
@ -2495,7 +2570,11 @@ defmodule Explorer.Chain do
TokenTransfer
|> join ( :inner , [ token_transfer ] , transaction in assoc ( token_transfer , :transaction ) )
|> where ( [ _ , transaction ] , transaction . hash == ^ transaction_hash )
|> where (
[ token_transfer , transaction ] ,
transaction . hash == ^ transaction_hash and token_transfer . block_hash == transaction . block_hash and
token_transfer . block_number == transaction . block_number
)
|> TokenTransfer . page_token_transfer ( paging_options )
|> limit ( ^ paging_options . page_size )
|> order_by ( [ token_transfer ] , asc : token_transfer . inserted_at )
@ -2530,7 +2609,7 @@ defmodule Explorer.Chain do
def transaction_to_status ( % Transaction { status : nil } ) , do : :awaiting_internal_transactions
def transaction_to_status ( % Transaction { status : :ok } ) , do : :success
def transaction_to_status ( % Transaction { status : :error , internal_transactions_indexed_at : nil , error : nil } ) ,
def transaction_to_status ( % Transaction { status : :error , error : nil } ) ,
do : { :error , :awaiting_internal_transactions }
def transaction_to_status ( % Transaction { status : :error , error : error } ) when is_binary ( error ) , do : { :error , error }
@ -2922,7 +3001,7 @@ defmodule Explorer.Chain do
"""
@spec total_supply :: non_neg_integer ( ) | nil
def total_supply do
supply_module ( ) . total ( )
supply_module ( ) . total ( ) || 0
end
@doc """
@ -2967,21 +3046,33 @@ defmodule Explorer.Chain do
) :: { :ok , accumulator }
when accumulator : term ( )
def stream_unfetched_token_instances ( initial , reducer ) when is_function ( reducer , 2 ) do
nft_tokens =
from (
token in Token ,
where : token . type == ^ " ERC-721 " ,
select : token . contract_address_hash
)
query =
from (
token_transfer in TokenTransfer ,
inner_join : token in Token ,
inner_join : token in subquery ( nft_tokens ) ,
on : token . contract_address_hash == token_transfer . token_contract_address_hash ,
left_join : instance in Instance ,
on :
token_transfer . token_id == instance . token_id and
token_transfer . token_contract_address_hash == instance . token_contract_address_hash ,
where : token . type == ^ " ERC-721 " and is_nil ( instance . token_id ) and not is_nil ( token_transfer . token_id ) ,
distinct : [ token_transfer . token_contract_address_hash , token_transfer . token_id ] ,
where : is_nil ( instance . token_id ) and not is_nil ( token_transfer . token_id ) ,
select : %{ contract_address_hash : token_transfer . token_contract_address_hash , token_id : token_transfer . token_id }
)
Repo . stream_reduce ( query , initial , reducer )
distinct_query =
from (
q in subquery ( query ) ,
distinct : [ q . contract_address_hash , q . token_id ]
)
Repo . stream_reduce ( distinct_query , initial , reducer )
end
@doc """