Merge branch 'master' into fix-displayed-address-length

pull/1206/head
Andrew Cravenho 6 years ago committed by GitHub
commit ef5ad0df8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .credo.exs
  2. 1
      apps/block_scout_web/assets/js/lib/token_balance_dropdown.js
  3. 4
      apps/block_scout_web/config/config.exs
  4. 4
      apps/block_scout_web/test/block_scout_web/features/viewing_addresses_test.exs
  5. 4
      apps/ethereum_jsonrpc/config/config.exs
  6. 4
      apps/explorer/config/config.exs
  7. 174
      apps/explorer/lib/explorer/chain.ex
  8. 12
      apps/explorer/lib/explorer/chain/address/current_token_balance.ex
  9. 17
      apps/explorer/lib/explorer/chain/address/token_balance.ex
  10. 144
      apps/explorer/lib/explorer/chain/token_transfer.ex
  11. 16
      apps/explorer/lib/explorer/chain/transaction.ex
  12. 6
      apps/explorer/lib/explorer/counters/block_validation_counter.ex
  13. 11
      apps/explorer/lib/explorer/counters/token_holders_counter.ex
  14. 6
      apps/explorer/lib/explorer/counters/token_transfer_counter.ex
  15. 21
      apps/explorer/lib/explorer/repo.ex
  16. 8
      apps/explorer/priv/repo/migrations/20181212115448_add_indexes_to_token_transfers.exs
  17. 54
      apps/explorer/test/explorer/chain/address/current_token_balance_test.exs
  18. 186
      apps/explorer/test/explorer/chain/token_transfer_test.exs
  19. 163
      apps/explorer/test/explorer/chain_test.exs
  20. 4
      apps/indexer/config/config.exs
  21. 2
      apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex
  22. 4
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  23. 45
      apps/indexer/lib/indexer/block/invalid_consensus/supervisor.ex
  24. 96
      apps/indexer/lib/indexer/block/invalid_consensus/worker.ex
  25. 23
      apps/indexer/lib/indexer/block/realtime/consensus_ensurer.ex
  26. 23
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  27. 3
      apps/indexer/lib/indexer/block/supervisor.ex
  28. 11
      apps/indexer/lib/indexer/block/uncle/fetcher.ex
  29. 38
      apps/indexer/lib/indexer/buffered_task.ex
  30. 9
      apps/indexer/lib/indexer/coin_balance/fetcher.ex
  31. 3
      apps/indexer/lib/indexer/internal_transaction/fetcher.ex
  32. 4
      apps/indexer/lib/indexer/pending_transaction/fetcher.ex
  33. 87
      apps/indexer/test/indexer/block/invalid_consensus/worker_test.exs
  34. 12
      config/config.exs

@ -77,7 +77,7 @@
{Credo.Check.Design.AliasUsage, {Credo.Check.Design.AliasUsage,
excluded_namespaces: ~w(Block Blocks Import Socket SpandexDatadog Task), excluded_namespaces: ~w(Block Blocks Import Socket SpandexDatadog Task),
excluded_lastnames: excluded_lastnames:
~w(Address DateTime Exporter Fetcher Full Instrumenter Monitor Name Number Repo Spec Time Unit), ~w(Address DateTime Exporter Fetcher Full Instrumenter Logger Monitor Name Number Repo Spec Time Unit),
priority: :low}, priority: :low},
# For some checks, you can also set other parameters # For some checks, you can also set other parameters

@ -17,4 +17,3 @@ const tokenBalanceDropdown = (element) => {
export function loadTokenBalanceDropdown () { export function loadTokenBalanceDropdown () {
$('[data-token-balance-dropdown]').each((_index, element) => tokenBalanceDropdown(element)) $('[data-token-balance-dropdown]').each((_index, element) => tokenBalanceDropdown(element))
} }
loadTokenBalanceDropdown()

@ -47,8 +47,8 @@ config :ex_cldr,
config :logger, :block_scout_web, config :logger, :block_scout_web,
# keep synced with `config/config.exs` # keep synced with `config/config.exs`
format: "$time $metadata[$level] $message\n", format: "$dateT$time $metadata[$level] $message\n",
metadata: [:application, :request_id], metadata: ~w(application fetcher request_id)a,
metadata_filter: [application: :block_scout_web] metadata_filter: [application: :block_scout_web]
config :spandex_phoenix, tracer: BlockScoutWeb.Tracer config :spandex_phoenix, tracer: BlockScoutWeb.Tracer

@ -421,7 +421,7 @@ defmodule BlockScoutWeb.ViewingAddressesTest do
token_contract_address: contract_address token_contract_address: contract_address
) )
insert(:token_balance, address: lincoln, token_contract_address_hash: contract_address.hash) insert(:address_current_token_balance, address: lincoln, token_contract_address_hash: contract_address.hash)
contract_address_2 = insert(:contract_address) contract_address_2 = insert(:contract_address)
insert(:token, name: "token2", symbol: "T2", contract_address: contract_address_2, type: "ERC-20") insert(:token, name: "token2", symbol: "T2", contract_address: contract_address_2, type: "ERC-20")
@ -439,7 +439,7 @@ defmodule BlockScoutWeb.ViewingAddressesTest do
token_contract_address: contract_address_2 token_contract_address: contract_address_2
) )
insert(:token_balance, address: lincoln, token_contract_address_hash: contract_address_2.hash) insert(:address_current_token_balance, address: lincoln, token_contract_address_hash: contract_address_2.hash)
{:ok, lincoln: lincoln} {:ok, lincoln: lincoln}
end end

@ -16,8 +16,8 @@ config :ethereum_jsonrpc, EthereumJSONRPC.Tracer,
config :logger, :ethereum_jsonrpc, config :logger, :ethereum_jsonrpc,
# keep synced with `config/config.exs` # keep synced with `config/config.exs`
format: "$time $metadata[$level] $message\n", format: "$dateT$time $metadata[$level] $message\n",
metadata: [:application, :request_id], metadata: ~w(application fetcher request_id)a,
metadata_filter: [application: :ethereum_jsonrpc] metadata_filter: [application: :ethereum_jsonrpc]
# Import environment specific config. This must remain at the bottom # Import environment specific config. This must remain at the bottom

@ -49,8 +49,8 @@ config :explorer,
config :logger, :explorer, config :logger, :explorer,
# keep synced with `config/config.exs` # keep synced with `config/config.exs`
format: "$time $metadata[$level] $message\n", format: "$dateT$time $metadata[$level] $message\n",
metadata: [:application, :request_id], metadata: ~w(application fetcher request_id)a,
metadata_filter: [application: :explorer] metadata_filter: [application: :explorer]
config :spandex_ecto, SpandexEcto.EctoLogger, config :spandex_ecto, SpandexEcto.EctoLogger,

@ -174,7 +174,18 @@ defmodule Explorer.Chain do
end end
@doc """ @doc """
`t:Explorer.Chain.Transaction/0`s from `address`. Fetches the transactions related to the given address, including transactions
that only have the address in the `token_transfers` related table.
This query is divided into multiple subqueries intentionally in order to
improve the listing performance.
The `token_trasfers` table tends to grow exponentially, and the query results
with a `transactions` `join` statement takes too long.
To solve this the `transaction_hashes` are fetched in a separate query, and
paginated through the `block_number` already present in the `token_transfers`
table.
## Options ## Options
@ -196,9 +207,15 @@ defmodule Explorer.Chain do
necessity_by_association = Keyword.get(options, :necessity_by_association, %{}) necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
paging_options = Keyword.get(options, :paging_options, @default_paging_options) paging_options = Keyword.get(options, :paging_options, @default_paging_options)
{:ok, address_bytes} = Explorer.Chain.Hash.Address.dump(address_hash) transaction_hashes_from_token_transfers =
TokenTransfer.where_any_address_fields_match(direction, address_hash, paging_options)
token_transfers_dynamic = TokenTransfer.dynamic_any_address_fields_match(direction, address_bytes) token_transfers_query =
transaction_hashes_from_token_transfers
|> Transaction.where_transaction_hashes_match()
|> join_associations(necessity_by_association)
|> order_by([transaction], desc: transaction.block_number, desc: transaction.index)
|> Transaction.preload_token_transfers(address_hash)
base_query = base_query =
paging_options paging_options
@ -206,10 +223,6 @@ defmodule Explorer.Chain do
|> join_associations(necessity_by_association) |> join_associations(necessity_by_association)
|> Transaction.preload_token_transfers(address_hash) |> Transaction.preload_token_transfers(address_hash)
token_transfers_query =
base_query
|> from(where: ^token_transfers_dynamic)
from_address_query = from_address_query =
base_query base_query
|> where([t], t.from_address_hash == ^address_hash) |> where([t], t.from_address_hash == ^address_hash)
@ -955,7 +968,7 @@ defmodule Explorer.Chain do
@doc """ @doc """
Counts all of the block validations and groups by the `miner_hash`. Counts all of the block validations and groups by the `miner_hash`.
""" """
def group_block_validations_by_address do def each_address_block_validation_count(fun) when is_function(fun, 1) do
query = query =
from( from(
b in Block, b in Block,
@ -965,7 +978,7 @@ defmodule Explorer.Chain do
group_by: b.miner_hash group_by: b.miner_hash
) )
Repo.all(query) Repo.stream_each(query, fun)
end end
@doc """ @doc """
@ -1007,21 +1020,14 @@ defmodule Explorer.Chain do
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_unfetched_balances(initial, reducer) when is_function(reducer, 2) do def stream_unfetched_balances(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( query =
fn -> from(
query = balance in CoinBalance,
from( where: is_nil(balance.value_fetched_at),
balance in CoinBalance, select: %{address_hash: balance.address_hash, block_number: balance.block_number}
where: is_nil(balance.value_fetched_at), )
select: %{address_hash: balance.address_hash, block_number: balance.block_number}
)
query Repo.stream_reduce(query, initial, reducer)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1033,16 +1039,8 @@ defmodule Explorer.Chain do
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_unfetched_token_balances(initial, reducer) when is_function(reducer, 2) do def stream_unfetched_token_balances(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( TokenBalance.unfetched_token_balances()
fn -> |> Repo.stream_reduce(initial, reducer)
query = TokenBalance.unfetched_token_balances()
query
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1097,22 +1095,15 @@ defmodule Explorer.Chain do
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_transactions_with_unfetched_internal_transactions(fields, initial, reducer) when is_function(reducer, 2) do def stream_transactions_with_unfetched_internal_transactions(fields, initial, reducer) when is_function(reducer, 2) do
Repo.transaction( query =
fn -> from(
query = t in Transaction,
from( # exclude pending transactions
t in Transaction, where: not is_nil(t.block_hash) and is_nil(t.internal_transactions_indexed_at),
# exclude pending transactions select: ^fields
where: not is_nil(t.block_hash) and is_nil(t.internal_transactions_indexed_at), )
select: ^fields
)
query Repo.stream_reduce(query, initial, reducer)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1129,21 +1120,14 @@ defmodule Explorer.Chain do
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_unfetched_uncle_hashes(initial, reducer) when is_function(reducer, 2) do def stream_unfetched_uncle_hashes(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( query =
fn -> from(bsdr in Block.SecondDegreeRelation,
query = where: is_nil(bsdr.uncle_fetched_at),
from(bsdr in Block.SecondDegreeRelation, select: bsdr.uncle_hash,
where: is_nil(bsdr.uncle_fetched_at), group_by: bsdr.uncle_hash
select: bsdr.uncle_hash, )
group_by: bsdr.uncle_hash
)
query Repo.stream_reduce(query, initial, reducer)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1938,22 +1922,15 @@ defmodule Explorer.Chain do
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator) reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator)
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_uncataloged_token_contract_address_hashes(initial_acc, reducer) when is_function(reducer, 2) do def stream_uncataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( query =
fn -> from(
query = token in Token,
from( where: token.cataloged == false,
token in Token, select: token.contract_address_hash
where: token.cataloged == false, )
select: token.contract_address_hash
)
query Repo.stream_reduce(query, initial, reducer)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial_acc, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1964,16 +1941,10 @@ defmodule Explorer.Chain do
reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator) reducer :: (entry :: Hash.Address.t(), accumulator -> accumulator)
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_cataloged_token_contract_address_hashes(initial_acc, reducer) when is_function(reducer, 2) do def stream_cataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do
Repo.transaction( Chain.Token.cataloged_tokens()
fn -> |> order_by(asc: :updated_at)
Chain.Token.cataloged_tokens() |> Repo.stream_reduce(initial, reducer)
|> order_by(asc: :updated_at)
|> Repo.stream(timeout: :infinity)
|> Enum.reduce(initial_acc, reducer)
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -1992,14 +1963,7 @@ defmodule Explorer.Chain do
distinct: t.block_number distinct: t.block_number
) )
Repo.transaction( Repo.stream_reduce(query, [], &[&1 | &2])
fn ->
query
|> Repo.stream(timeout: :infinity)
|> Enum.reduce([], &[&1 | &2])
end,
timeout: :infinity
)
end end
@doc """ @doc """
@ -2083,7 +2047,7 @@ defmodule Explorer.Chain do
@spec fetch_last_token_balances(Hash.Address.t()) :: [] @spec fetch_last_token_balances(Hash.Address.t()) :: []
def fetch_last_token_balances(address_hash) do def fetch_last_token_balances(address_hash) do
address_hash address_hash
|> TokenBalance.last_token_balances() |> CurrentTokenBalance.last_token_balances()
|> Repo.all() |> Repo.all()
end end
@ -2156,4 +2120,22 @@ defmodule Explorer.Chain do
@spec data() :: Dataloader.Ecto.t() @spec data() :: Dataloader.Ecto.t()
def data, do: DataloaderEcto.new(Repo) def data, do: DataloaderEcto.new(Repo)
@doc """
Returns a list of block numbers with invalid consensus.
"""
@spec list_block_numbers_with_invalid_consensus :: [integer()]
def list_block_numbers_with_invalid_consensus do
query =
from(
block in Block,
join: parent in Block,
on: parent.hash == block.parent_hash,
where: block.consensus == true,
where: parent.consensus == false,
select: parent.number
)
Repo.all(query, timeout: :infinity)
end
end end

@ -87,6 +87,18 @@ defmodule Explorer.Chain.Address.CurrentTokenBalance do
|> limit(^paging_options.page_size) |> limit(^paging_options.page_size)
end end
@doc """
Builds an `Ecto.Query` to fetch the current token balances of the given address.
"""
def last_token_balances(address_hash) do
from(
tb in __MODULE__,
where: tb.address_hash == ^address_hash,
where: tb.value > 0,
preload: :token
)
end
defp token_holders_query(token_contract_address_hash) do defp token_holders_query(token_contract_address_hash) do
from( from(
tb in __MODULE__, tb in __MODULE__,

@ -65,23 +65,6 @@ defmodule Explorer.Chain.Address.TokenBalance do
{:ok, burn_address_hash} = Chain.string_to_address_hash("0x0000000000000000000000000000000000000000") {:ok, burn_address_hash} = Chain.string_to_address_hash("0x0000000000000000000000000000000000000000")
@burn_address_hash burn_address_hash @burn_address_hash burn_address_hash
@doc """
Builds an `Ecto.Query` to fetch the last token balances that have value greater than 0.
The last token balances from an Address is the last block indexed.
"""
def last_token_balances(address_hash) do
query =
from(
tb in TokenBalance,
where: tb.address_hash == ^address_hash,
distinct: :token_contract_address_hash,
order_by: [desc: :block_number]
)
from(tb in subquery(query), where: tb.value > 0, preload: :token)
end
@doc """ @doc """
Builds an `Ecto.Query` to group all tokens with their number of holders. Builds an `Ecto.Query` to group all tokens with their number of holders.
""" """

@ -25,7 +25,7 @@ defmodule Explorer.Chain.TokenTransfer do
use Ecto.Schema use Ecto.Schema
import Ecto.Changeset import Ecto.Changeset
import Ecto.Query, only: [from: 2, dynamic: 2, limit: 2, where: 3] import Ecto.Query, only: [from: 2, limit: 2, where: 3]
alias Explorer.Chain.{Address, Hash, Token, TokenTransfer, Transaction} alias Explorer.Chain.{Address, Hash, Token, TokenTransfer, Transaction}
alias Explorer.{PagingOptions, Repo} alias Explorer.{PagingOptions, Repo}
@ -148,55 +148,105 @@ defmodule Explorer.Chain.TokenTransfer do
end end
@doc """ @doc """
Builds a dynamic query expression to identify if there is a token transfer Fetches the transaction hashes from token transfers according
related to the hash. to the address hash.
""" """
def dynamic_any_address_fields_match(:to, address_bytes) do def where_any_address_fields_match(:to, address_hash, paging_options) do
dynamic( query =
[t], from(
t.hash == tt in TokenTransfer,
fragment( where: tt.to_address_hash == ^address_hash,
~s""" select: tt.transaction_hash
(SELECT tt.transaction_hash )
FROM "token_transfers" AS tt
WHERE (tt."to_address_hash" = ?) query
LIMIT 1) |> page_transaction_hashes_from_token_transfers(paging_options)
""", |> limit(^paging_options.page_size)
^address_bytes |> Repo.all()
)
)
end end
def dynamic_any_address_fields_match(:from, address_bytes) do def where_any_address_fields_match(:from, address_hash, paging_options) do
dynamic( query =
[t], from(
t.hash == tt in TokenTransfer,
fragment( where: tt.from_address_hash == ^address_hash,
~s""" select: tt.transaction_hash
(SELECT tt.transaction_hash )
FROM "token_transfers" AS tt
WHERE (tt."from_address_hash" = ?) query
LIMIT 1) |> page_transaction_hashes_from_token_transfers(paging_options)
""", |> limit(^paging_options.page_size)
^address_bytes |> Repo.all()
) end
)
def where_any_address_fields_match(_, address_hash, paging_options) do
{:ok, address_bytes} = Explorer.Chain.Hash.Address.dump(address_hash)
transaction_hashes_from_token_transfers_sql(address_bytes, paging_options)
end
defp transaction_hashes_from_token_transfers_sql(address_bytes, %PagingOptions{key: nil, page_size: page_size}) do
{:ok, %Postgrex.Result{rows: transaction_hashes_from_token_transfers}} =
Repo.query(
"""
SELECT transaction_hash
FROM
(
SELECT transaction_hash
FROM token_transfers
WHERE from_address_hash = $1
UNION
SELECT transaction_hash
FROM token_transfers
WHERE to_address_hash = $1
) as token_transfers_transaction_hashes
LIMIT $2
""",
[address_bytes, page_size]
)
List.flatten(transaction_hashes_from_token_transfers)
end end
def dynamic_any_address_fields_match(_, address_bytes) do defp transaction_hashes_from_token_transfers_sql(address_bytes, %PagingOptions{
dynamic( key: {block_number, _index},
[t], page_size: page_size
t.hash == }) do
fragment( {:ok, %Postgrex.Result{rows: transaction_hashes_from_token_transfers}} =
~s""" Repo.query(
(SELECT tt.transaction_hash """
FROM "token_transfers" AS tt SELECT transaction_hash
WHERE ((tt."to_address_hash" = ?) OR (tt."from_address_hash" = ?)) FROM
LIMIT 1) (
""", SELECT transaction_hash
^address_bytes, FROM token_transfers
^address_bytes WHERE from_address_hash = $1
) AND block_number < $2
UNION
SELECT transaction_hash
FROM token_transfers
WHERE to_address_hash = $1
AND block_number < $2
) as token_transfers_transaction_hashes
LIMIT $3
""",
[address_bytes, block_number, page_size]
)
List.flatten(transaction_hashes_from_token_transfers)
end
defp page_transaction_hashes_from_token_transfers(query, %PagingOptions{key: nil}), do: query
defp page_transaction_hashes_from_token_transfers(query, %PagingOptions{key: {block_number, _index}}) do
where(
query,
[tt],
tt.block_number < ^block_number
) )
end end
@ -226,7 +276,7 @@ defmodule Explorer.Chain.TokenTransfer do
@doc """ @doc """
Counts all the token transfers and groups by token contract address hash. Counts all the token transfers and groups by token contract address hash.
""" """
def count_token_transfers do def each_count(fun) when is_function(fun, 1) do
query = query =
from( from(
tt in TokenTransfer, tt in TokenTransfer,
@ -236,6 +286,6 @@ defmodule Explorer.Chain.TokenTransfer do
group_by: tt.token_contract_address_hash group_by: tt.token_contract_address_hash
) )
Repo.all(query) Repo.stream_each(query, fun)
end end
end end

@ -5,7 +5,7 @@ defmodule Explorer.Chain.Transaction do
require Logger require Logger
import Ecto.Query, only: [from: 2, preload: 3, subquery: 1, where: 3] import Ecto.Query, only: [from: 2, order_by: 3, preload: 3, subquery: 1, where: 3]
alias ABI.FunctionSelector alias ABI.FunctionSelector
@ -468,15 +468,15 @@ defmodule Explorer.Chain.Transaction do
end end
@doc """ @doc """
Adds to the given transaction's query a `where` with one of the conditions that the matched Builds a query that will check for transactions within the hashes params.
function returns.
`where_address_fields_match(query, address, address_field)` Be careful to not pass a large list, because this will lead to performance
- returns a query constraining the given address_hash to be equal to the given problems.
address field from transactions' table.
""" """
def where_address_fields_match(query, address_hash, address_field) do def where_transaction_hashes_match(transaction_hashes) do
where(query, [t], field(t, ^address_field) == ^address_hash) Transaction
|> where([t], t.hash == fragment("ANY (?)", ^transaction_hashes))
|> order_by([transaction], desc: transaction.block_number, desc: transaction.index)
end end
@collated_fields ~w(block_number cumulative_gas_used gas_used index)a @collated_fields ~w(block_number cumulative_gas_used gas_used index)a

@ -60,11 +60,9 @@ defmodule Explorer.Counters.BlockValidationCounter do
Consolidates the number of block validations grouped by `address_hash`. Consolidates the number of block validations grouped by `address_hash`.
""" """
def consolidate_blocks do def consolidate_blocks do
total_block_validations = Chain.group_block_validations_by_address() Chain.each_address_block_validation_count(fn {address_hash, total} ->
for {address_hash, total} <- total_block_validations do
insert_or_update_counter(address_hash, total) insert_or_update_counter(address_hash, total)
end end)
end end
@doc """ @doc """

@ -6,6 +6,7 @@ defmodule Explorer.Counters.TokenHoldersCounter do
""" """
alias Explorer.Chain.Address.TokenBalance alias Explorer.Chain.Address.TokenBalance
alias Explorer.Chain.Hash
alias Explorer.Repo alias Explorer.Repo
@table :token_holders_counter @table :token_holders_counter
@ -59,11 +60,9 @@ defmodule Explorer.Counters.TokenHoldersCounter do
""" """
def consolidate do def consolidate do
TokenBalance.tokens_grouped_by_number_of_holders() TokenBalance.tokens_grouped_by_number_of_holders()
|> Repo.all() |> Repo.stream_each(fn {%Hash{bytes: bytes}, number_of_holders} ->
|> Enum.map(fn {token, number_of_holders} -> insert_counter({bytes, number_of_holders})
{token.bytes, number_of_holders}
end) end)
|> insert_counter()
end end
defp schedule_next_consolidation do defp schedule_next_consolidation do
@ -76,8 +75,8 @@ defmodule Explorer.Counters.TokenHoldersCounter do
@doc """ @doc """
Fetches the token holders info for a specific token from the `:ets` table. Fetches the token holders info for a specific token from the `:ets` table.
""" """
def fetch(token_hash) do def fetch(%Hash{bytes: bytes}) do
do_fetch(:ets.lookup(table_name(), token_hash.bytes)) do_fetch(:ets.lookup(table_name(), bytes))
end end
defp do_fetch([{_, result}]), do: result defp do_fetch([{_, result}]), do: result

@ -51,11 +51,9 @@ defmodule Explorer.Counters.TokenTransferCounter do
Consolidates the number of token transfers grouped by token. Consolidates the number of token transfers grouped by token.
""" """
def consolidate do def consolidate do
total_token_transfers = TokenTransfer.count_token_transfers() TokenTransfer.each_count(fn {token_hash, total} ->
for {token_hash, total} <- total_token_transfers do
insert_or_update_counter(token_hash, total) insert_or_update_counter(token_hash, total)
end end)
end end
@doc """ @doc """

@ -20,7 +20,7 @@ defmodule Explorer.Repo do
returning = opts[:returning] returning = opts[:returning]
elements elements
|> Enum.chunk_every(1000) |> Enum.chunk_every(500)
|> Enum.reduce({0, []}, fn chunk, {total_count, acc} -> |> Enum.reduce({0, []}, fn chunk, {total_count, acc} ->
{count, inserted} = {count, inserted} =
try do try do
@ -65,4 +65,23 @@ defmodule Explorer.Repo do
end end
end) end)
end end
def stream_in_transaction(query, fun) when is_function(fun, 1) do
transaction(
fn ->
query
|> stream(timeout: :infinity)
|> fun.()
end,
timeout: :infinity
)
end
def stream_each(query, fun) when is_function(fun, 1) do
stream_in_transaction(query, &Enum.each(&1, fun))
end
def stream_reduce(query, initial, reducer) when is_function(reducer, 2) do
stream_in_transaction(query, &Enum.reduce(&1, initial, reducer))
end
end end

@ -0,0 +1,8 @@
defmodule Explorer.Repo.Migrations.AddIndexesToTokenTransfers do
use Ecto.Migration
def change do
create(index("token_transfers", [:from_address_hash]))
create(index("token_transfers", [:to_address_hash]))
end
end

@ -146,4 +146,58 @@ defmodule Explorer.Chain.Address.CurrentTokenBalanceTest do
assert result_paginated == [second_page.address_hash] assert result_paginated == [second_page.address_hash]
end end
end end
describe "last_token_balances/1" do
test "returns the current token balances of the given address" do
address = insert(:address)
current_token_balance = insert(:address_current_token_balance, address: address)
insert(:address_current_token_balance, address: build(:address))
token_balances =
address.hash
|> CurrentTokenBalance.last_token_balances()
|> Repo.all()
|> Enum.map(& &1.address_hash)
assert token_balances == [current_token_balance.address_hash]
end
test "returns an empty list when there are no token balances" do
address = insert(:address)
insert(:address_current_token_balance, address: build(:address))
token_balances =
address.hash
|> CurrentTokenBalance.last_token_balances()
|> Repo.all()
assert token_balances == []
end
test "does not consider tokens that have value 0" do
address = insert(:address)
current_token_balance_a =
insert(
:address_current_token_balance,
address: address,
value: 5000
)
insert(
:address_current_token_balance,
address: address,
value: 0
)
token_balances =
address.hash
|> CurrentTokenBalance.last_token_balances()
|> Repo.all()
|> Enum.map(& &1.address_hash)
assert token_balances == [current_token_balance_a.address_hash]
end
end
end end

@ -146,8 +146,8 @@ defmodule Explorer.Chain.TokenTransferTest do
end end
end end
describe "count_token_transfers/0" do describe "each_count/0" do
test "returns token transfers grouped by tokens" do test "streams token transfers grouped by tokens" do
token_contract_address = insert(:contract_address) token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address) token = insert(:token, contract_address: token_contract_address)
@ -172,7 +172,11 @@ defmodule Explorer.Chain.TokenTransferTest do
token: token token: token
) )
results = TokenTransfer.count_token_transfers() {:ok, agent_pid} = Agent.start_link(fn -> [] end)
TokenTransfer.each_count(fn entry -> Agent.update(agent_pid, &[entry | &1]) end)
results = Agent.get(agent_pid, fn entries -> Enum.reverse(entries) end)
assert length(results) == 1 assert length(results) == 1
assert List.first(results) == {token.contract_address_hash, 2} assert List.first(results) == {token.contract_address_hash, 2}
@ -247,4 +251,180 @@ defmodule Explorer.Chain.TokenTransferTest do
assert results == [] assert results == []
end end
end end
describe "where_any_address_fields_match/3" do
test "when to_address_hash match returns transactions hashes list" do
john = insert(:address)
paul = insert(:address)
contract_address = insert(:contract_address)
transaction =
:transaction
|> insert(
from_address: john,
from_address_hash: john.hash,
to_address: contract_address,
to_address_hash: contract_address.hash
)
|> with_block()
insert(
:token_transfer,
from_address: john,
to_address: paul,
transaction: transaction,
amount: 1
)
insert(
:token_transfer,
from_address: john,
to_address: paul,
transaction: transaction,
amount: 1
)
transactions_hashes = TokenTransfer.where_any_address_fields_match(:to, paul.hash, %PagingOptions{page_size: 1})
assert Enum.member?(transactions_hashes, transaction.hash) == true
end
test "when from_address_hash match returns transactions hashes list" do
john = insert(:address)
paul = insert(:address)
contract_address = insert(:contract_address)
transaction =
:transaction
|> insert(
from_address: john,
from_address_hash: john.hash,
to_address: contract_address,
to_address_hash: contract_address.hash
)
|> with_block()
insert(
:token_transfer,
from_address: john,
to_address: paul,
transaction: transaction,
amount: 1
)
insert(
:token_transfer,
from_address: john,
to_address: paul,
transaction: transaction,
amount: 1
)
transactions_hashes = TokenTransfer.where_any_address_fields_match(:from, john.hash, %PagingOptions{page_size: 1})
assert Enum.member?(transactions_hashes, transaction.hash) == true
end
test "when to_from_address_hash or from_address_hash match returns transactions hashes list" do
john = insert(:address)
paul = insert(:address)
contract_address = insert(:contract_address)
transaction_one =
:transaction
|> insert(
from_address: john,
from_address_hash: john.hash,
to_address: contract_address,
to_address_hash: contract_address.hash
)
|> with_block()
insert(
:token_transfer,
from_address: john,
to_address: paul,
transaction: transaction_one,
amount: 1
)
transaction_two =
:transaction
|> insert(
from_address: john,
from_address_hash: john.hash,
to_address: contract_address,
to_address_hash: contract_address.hash
)
|> with_block()
insert(
:token_transfer,
from_address: paul,
to_address: john,
transaction: transaction_two,
amount: 1
)
{:ok, transaction_one_bytes} = Explorer.Chain.Hash.Full.dump(transaction_one.hash)
{:ok, transaction_two_bytes} = Explorer.Chain.Hash.Full.dump(transaction_two.hash)
transactions_hashes = TokenTransfer.where_any_address_fields_match(nil, john.hash, %PagingOptions{page_size: 2})
assert Enum.member?(transactions_hashes, transaction_one_bytes) == true
assert Enum.member?(transactions_hashes, transaction_two_bytes) == true
end
test "paginates result from to_from_address_hash and from_address_hash match" do
john = insert(:address)
paul = insert(:address)
contract_address = insert(:contract_address)
transaction_one =
:transaction
|> insert(
from_address: paul,
from_address_hash: paul.hash,
to_address: contract_address,
to_address_hash: contract_address.hash
)
|> with_block(number: 1)
insert(
:token_transfer,
from_address: john,
to_address: paul,
transaction: transaction_one,
amount: 1
)
transaction_two =
:transaction
|> insert(
from_address: paul,
from_address_hash: paul.hash,
to_address: contract_address,
to_address_hash: contract_address.hash
)
|> with_block(number: 2)
insert(
:token_transfer,
from_address: paul,
to_address: john,
transaction: transaction_two,
amount: 1
)
{:ok, transaction_one_bytes} = Explorer.Chain.Hash.Full.dump(transaction_one.hash)
page_two =
TokenTransfer.where_any_address_fields_match(nil, john.hash, %PagingOptions{
page_size: 1,
key: {transaction_two.block_number, transaction_two.index}
})
assert Enum.member?(page_two, transaction_one_bytes) == true
end
end
end end

@ -271,6 +271,56 @@ defmodule Explorer.ChainTest do
assert Enum.count(transaction.token_transfers) == 2 assert Enum.count(transaction.token_transfers) == 2
end end
test "returns all transactions that the address is present only in the token transfers" do
john = insert(:address)
paul = insert(:address)
contract_address = insert(:contract_address)
transaction_one =
:transaction
|> insert(
from_address: john,
from_address_hash: john.hash,
to_address: contract_address,
to_address_hash: contract_address.hash
)
|> with_block()
insert(
:token_transfer,
from_address: john,
to_address: paul,
transaction: transaction_one,
amount: 1
)
transaction_two =
:transaction
|> insert(
from_address: john,
from_address_hash: john.hash,
to_address: contract_address,
to_address_hash: contract_address.hash
)
|> with_block()
insert(
:token_transfer,
from_address: john,
to_address: paul,
transaction: transaction_two,
amount: 1
)
transactions_hashes =
paul
|> Chain.address_to_transactions()
|> Enum.map(& &1.hash)
assert Enum.member?(transactions_hashes, transaction_one.hash) == true
assert Enum.member?(transactions_hashes, transaction_two.hash) == true
end
test "with transactions can be paginated" do test "with transactions can be paginated" do
address = insert(:address) address = insert(:address)
@ -1240,13 +1290,17 @@ defmodule Explorer.ChainTest do
end end
end end
describe "group_block_validations_by_address/0" do describe "each_address_block_validation_count/0" do
test "returns block validations grouped by the address that validated them (`address_hash`)" do test "streams block validation count grouped by the address that validated them (`address_hash`)" do
address = insert(:address) address = insert(:address)
insert(:block, miner: address, miner_hash: address.hash) insert(:block, miner: address, miner_hash: address.hash)
results = Chain.group_block_validations_by_address() {:ok, agent_pid} = Agent.start_link(fn -> [] end)
Chain.each_address_block_validation_count(fn entry -> Agent.update(agent_pid, &[entry | &1]) end)
results = Agent.get(agent_pid, &Enum.reverse/1)
assert length(results) == 1 assert length(results) == 1
assert results == [{address.hash, 1}] assert results == [{address.hash, 1}]
@ -2883,91 +2937,15 @@ defmodule Explorer.ChainTest do
describe "fetch_last_token_balances/1" do describe "fetch_last_token_balances/1" do
test "returns the token balances given the address hash" do test "returns the token balances given the address hash" do
address = insert(:address) address = insert(:address)
token_balance = insert(:token_balance, address: address) current_token_balance = insert(:address_current_token_balance, address: address)
insert(:token_balance, address: build(:address)) insert(:address_current_token_balance, address: build(:address))
token_balances = token_balances =
address.hash address.hash
|> Chain.fetch_last_token_balances() |> Chain.fetch_last_token_balances()
|> Enum.map(& &1.address_hash) |> Enum.map(& &1.address_hash)
assert token_balances == [token_balance.address_hash] assert token_balances == [current_token_balance.address_hash]
end
test "returns the value from the last block" do
address = insert(:address)
token_a = insert(:token, contract_address: build(:contract_address))
token_b = insert(:token, contract_address: build(:contract_address))
insert(
:token_balance,
address: address,
block_number: 1000,
token_contract_address_hash: token_a.contract_address_hash,
value: 5000
)
token_balance_a =
insert(
:token_balance,
address: address,
block_number: 1001,
token_contract_address_hash: token_a.contract_address_hash,
value: 4000
)
insert(
:token_balance,
address: address,
block_number: 1000,
token_contract_address_hash: token_b.contract_address_hash,
value: 3000
)
token_balance_b =
insert(
:token_balance,
address: address,
block_number: 1001,
token_contract_address_hash: token_b.contract_address_hash,
value: 2000
)
token_balances = Chain.fetch_last_token_balances(address.hash)
assert Enum.count(token_balances) == 2
assert Enum.map(token_balances, & &1.value) == [token_balance_a.value, token_balance_b.value]
end
test "returns an empty list when there are no token balances" do
address = insert(:address)
insert(:token_balance, address: build(:address))
assert Chain.fetch_last_token_balances(address.hash) == []
end
test "does not consider other blocks when the last block has the value 0" do
address = insert(:address)
token = insert(:token, contract_address: build(:contract_address))
insert(
:token_balance,
address: address,
block_number: 1000,
token_contract_address_hash: token.contract_address_hash,
value: 5000
)
insert(
:token_balance,
address: address,
block_number: 1001,
token_contract_address_hash: token.contract_address_hash,
value: 0
)
assert Chain.fetch_last_token_balances(address.hash) == []
end end
end end
@ -3176,4 +3154,25 @@ defmodule Explorer.ChainTest do
] ]
end end
end end
describe "list_block_numbers_with_invalid_consensus/0" do
test "returns a list of block numbers with invalid consensus" do
block1 = insert(:block)
block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false)
_block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number)
block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash)
block4 = insert(:block, parent_hash: block3.hash)
block5 = insert(:block, parent_hash: block4.hash)
block6_without_consensus = insert(:block, parent_hash: block5.hash, consensus: false)
block6 = insert(:block, parent_hash: block5.hash, number: block6_without_consensus.number)
block7 = insert(:block, parent_hash: block6.hash)
block8_with_invalid_consensus = insert(:block, parent_hash: block7.hash, consensus: false)
_block8 = insert(:block, parent_hash: block7.hash, number: block8_with_invalid_consensus.number)
block9 = insert(:block, parent_hash: block8_with_invalid_consensus.hash)
_block10 = insert(:block, parent_hash: block9.hash)
assert Chain.list_block_numbers_with_invalid_consensus() ==
[block2_with_invalid_consensus.number, block8_with_invalid_consensus.number]
end
end
end end

@ -18,8 +18,8 @@ config :indexer, Indexer.Tracer,
config :logger, :indexer, config :logger, :indexer,
# keep synced with `config/config.exs` # keep synced with `config/config.exs`
format: "$time $metadata[$level] $message\n", format: "$dateT$time $metadata[$level] $message\n",
metadata: [:application, :request_id], metadata: ~w(application fetcher request_id)a,
metadata_filter: [application: :indexer] metadata_filter: [application: :indexer]
# Import environment specific config. This must remain at the bottom # Import environment specific config. This must remain at the bottom

@ -53,6 +53,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do
@impl GenServer @impl GenServer
def init(named_arguments) do def init(named_arguments) do
Logger.metadata(fetcher: :block_catchup)
state = new(named_arguments) state = new(named_arguments)
send(self(), :catchup_index) send(self(), :catchup_index)

@ -58,6 +58,8 @@ defmodule Indexer.Block.Catchup.Fetcher do
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
} = state } = state
) do ) do
Logger.metadata(fetcher: :block_catchup)
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
case latest_block_number do case latest_block_number do
@ -172,6 +174,8 @@ defmodule Indexer.Block.Catchup.Fetcher do
_.._ = range, _.._ = range,
sequence sequence
) do ) do
Logger.metadata(fetcher: :block_catchup)
case fetch_and_import_range(block_fetcher, range) do case fetch_and_import_range(block_fetcher, range) do
{:ok, %{inserted: inserted, errors: errors}} -> {:ok, %{inserted: inserted, errors: errors}} ->
errors = cap_seq(sequence, errors, range) errors = cap_seq(sequence, errors, range)

@ -0,0 +1,45 @@
defmodule Indexer.Block.InvalidConsensus.Supervisor do
@moduledoc """
Supervises process for ensuring blocks with invalid consensus get queued for
indexing.
"""
use Supervisor
alias Indexer.Block.InvalidConsensus.Worker
def child_spec([]) do
child_spec([[]])
end
def child_spec([init_arguments]) do
child_spec([init_arguments, [name: __MODULE__]])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
spec = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
restart: :transient,
type: :supervisor
}
Supervisor.child_spec(spec, [])
end
def start_link(init_arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, init_arguments, gen_server_options)
end
@impl Supervisor
def init(_) do
children = [
{Worker, [[supervisor: self()], [name: Worker]]},
{Task.Supervisor, name: Indexer.Block.InvalidConsensus.TaskSupervisor}
]
opts = [strategy: :one_for_all]
Supervisor.init(children, opts)
end
end

@ -0,0 +1,96 @@
defmodule Indexer.Block.InvalidConsensus.Worker do
@moduledoc """
Finds blocks with invalid consensus and queues them up to be refetched. This
process does this once, after the application starts up.
A block has invalid consensus when it is referenced as the parent hash of a
block with consensus while not having consensus (consensus=false). Only one
block can have consensus at a given height (block number).
"""
use GenServer
require Logger
alias Explorer.Chain
alias Indexer.Block.Catchup.Fetcher
alias Indexer.Block.InvalidConsensus.TaskSupervisor
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
spec = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
restart: :transient,
type: :worker
}
Supervisor.child_spec(spec, [])
end
def start_link(init_arguments, gen_server_options \\ []) do
GenServer.start_link(__MODULE__, init_arguments, gen_server_options)
end
def init(opts) do
sup_pid = Keyword.fetch!(opts, :supervisor)
retry_interval = Keyword.get(opts, :retry_interval, 10_000)
send(self(), :scan)
state = %{
block_numbers: [],
retry_interval: retry_interval,
sup_pid: sup_pid,
task_ref: nil
}
{:ok, state}
end
def handle_info(:scan, state) do
block_numbers = Chain.list_block_numbers_with_invalid_consensus()
case block_numbers do
[] ->
Supervisor.stop(state.sup_pid, :normal)
{:noreply, state}
block_numbers ->
Process.send_after(self(), :push_front_blocks, state.retry_interval)
{:noreply, %{state | block_numbers: block_numbers}}
end
end
def handle_info(:push_front_blocks, %{block_numbers: block_numbers} = state) do
%Task{ref: ref} = async_push_front(block_numbers)
{:noreply, %{state | task_ref: ref}}
end
def handle_info({ref, :ok}, %{task_ref: ref, sup_pid: sup_pid}) do
Process.demonitor(ref, [:flush])
Supervisor.stop(sup_pid, :normal)
{:stop, :shutdown}
end
def handle_info({ref, {:error, reason}}, %{task_ref: ref, retry_interval: millis} = state) do
Logger.error(fn -> inspect(reason) end)
Process.demonitor(ref, [:flush])
Process.send_after(self(), :push_front_blocks, millis)
{:noreply, %{state | task_ref: nil}}
end
def handle_info({:DOWN, ref, :process, _, _}, %{task_ref: ref, retry_interval: millis} = state) do
Process.send_after(self(), :push_front_blocks, millis)
{:noreply, %{state | task_ref: nil}}
end
defp async_push_front(block_numbers) do
Task.Supervisor.async_nolink(TaskSupervisor, Fetcher, :push_front, [block_numbers])
end
end

@ -0,0 +1,23 @@
defmodule Indexer.Block.Realtime.ConsensusEnsurer do
@moduledoc """
Triggers a refetch if a given block doesn't have consensus.
"""
alias Explorer.Chain
alias Explorer.Chain.Hash
alias Indexer.Block.Realtime.Fetcher
def perform(%Hash{byte_count: unquote(Hash.Full.byte_count())} = block_hash, number, block_fetcher) do
case Chain.hash_to_block(block_hash) do
{:ok, %{consensus: true} = _block} ->
:ignore
_ ->
# trigger refetch if consensus=false or block was not found
Fetcher.fetch_and_import_block(number, block_fetcher, true)
end
:ok
end
end

@ -16,7 +16,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
alias EthereumJSONRPC.{FetchedBalances, Subscription} alias EthereumJSONRPC.{FetchedBalances, Subscription}
alias Explorer.Chain alias Explorer.Chain
alias Indexer.{AddressExtraction, Block, TokenBalances, Tracer} alias Indexer.{AddressExtraction, Block, TokenBalances, Tracer}
alias Indexer.Block.Realtime.TaskSupervisor alias Indexer.Block.Realtime.{ConsensusEnsurer, TaskSupervisor}
@behaviour Block.Fetcher @behaviour Block.Fetcher
@ -43,6 +43,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
@impl GenServer @impl GenServer
def init(%{block_fetcher: %Block.Fetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments}) def init(%{block_fetcher: %Block.Fetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments})
when is_list(subscribe_named_arguments) do when is_list(subscribe_named_arguments) do
Logger.metadata(fetcher: :block_realtime)
{:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: :realtime, callback_module: __MODULE__}}, {:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: :realtime, callback_module: __MODULE__}},
{:continue, {:init, subscribe_named_arguments}}} {:continue, {:init, subscribe_named_arguments}}}
end end
@ -157,6 +159,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
@decorate trace(name: "fetch", resource: "Indexer.Block.Realtime.Fetcher.fetch_and_import_block/3", tracer: Tracer) @decorate trace(name: "fetch", resource: "Indexer.Block.Realtime.Fetcher.fetch_and_import_block/3", tracer: Tracer)
def fetch_and_import_block(block_number_to_fetch, block_fetcher, reorg?, retry \\ 3) do def fetch_and_import_block(block_number_to_fetch, block_fetcher, reorg?, retry \\ 3) do
Logger.metadata(fetcher: :block_realtime)
if reorg? do if reorg? do
# give previous fetch attempt (for same block number) a chance to finish # give previous fetch attempt (for same block number) a chance to finish
# before fetching again, to reduce block consensus mistakes # before fetching again, to reduce block consensus mistakes
@ -169,15 +173,20 @@ defmodule Indexer.Block.Realtime.Fetcher do
@decorate span(tracer: Tracer) @decorate span(tracer: Tracer)
defp do_fetch_and_import_block(block_number_to_fetch, block_fetcher, retry) do defp do_fetch_and_import_block(block_number_to_fetch, block_fetcher, retry) do
case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do
{:ok, %{inserted: _, errors: []}} -> {:ok, %{inserted: inserted, errors: []}} ->
for block <- Map.get(inserted, :blocks, []) do
args = [block.parent_hash, block.number - 1, block_fetcher]
Task.Supervisor.start_child(TaskSupervisor, ConsensusEnsurer, :perform, args)
end
Logger.debug(fn -> Logger.debug(fn ->
["realtime indexer fetched and imported block ", to_string(block_number_to_fetch)] ["fetched and imported block ", to_string(block_number_to_fetch)]
end) end)
{:ok, %{inserted: _, errors: [_ | _] = errors}} -> {:ok, %{inserted: _, errors: [_ | _] = errors}} ->
Logger.error(fn -> Logger.error(fn ->
[ [
"realtime indexer failed to fetch block", "failed to fetch block ",
to_string(block_number_to_fetch), to_string(block_number_to_fetch),
": ", ": ",
inspect(errors), inspect(errors),
@ -188,7 +197,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
{:error, {step, reason}} -> {:error, {step, reason}} ->
Logger.error(fn -> Logger.error(fn ->
[ [
"realtime indexer failed to fetch ", "failed to fetch ",
to_string(step), to_string(step),
" for block ", " for block ",
to_string(block_number_to_fetch), to_string(block_number_to_fetch),
@ -209,7 +218,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
if retry_fetch_and_import_block(params) == :ignore do if retry_fetch_and_import_block(params) == :ignore do
Logger.error(fn -> Logger.error(fn ->
[ [
"realtime indexer failed to validate for block ", "failed to validate for block ",
to_string(block_number_to_fetch), to_string(block_number_to_fetch),
": ", ": ",
inspect(changesets), inspect(changesets),
@ -221,7 +230,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
{:error, {step, failed_value, _changes_so_far}} -> {:error, {step, failed_value, _changes_so_far}} ->
Logger.error(fn -> Logger.error(fn ->
[ [
"realtime indexer failed to insert ", "failed to insert ",
to_string(step), to_string(step),
" for block ", " for block ",
to_string(block_number_to_fetch), to_string(block_number_to_fetch),

@ -4,7 +4,7 @@ defmodule Indexer.Block.Supervisor do
""" """
alias Indexer.Block alias Indexer.Block
alias Indexer.Block.{Catchup, Realtime, Uncle} alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, Uncle}
use Supervisor use Supervisor
@ -28,6 +28,7 @@ defmodule Indexer.Block.Supervisor do
%{block_fetcher: block_fetcher, block_interval: block_interval, memory_monitor: memory_monitor}, %{block_fetcher: block_fetcher, block_interval: block_interval, memory_monitor: memory_monitor},
[name: Catchup.Supervisor] [name: Catchup.Supervisor]
]}, ]},
{InvalidConsensus.Supervisor, [[], [name: InvalidConsensus.Supervisor]]},
{Realtime.Supervisor, {Realtime.Supervisor,
[ [
%{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments}, %{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments},

@ -20,7 +20,8 @@ defmodule Indexer.Block.Uncle.Fetcher do
flush_interval: :timer.seconds(3), flush_interval: :timer.seconds(3),
max_batch_size: 10, max_batch_size: 10,
max_concurrency: 10, max_concurrency: 10,
task_supervisor: Indexer.Block.Uncle.TaskSupervisor task_supervisor: Indexer.Block.Uncle.TaskSupervisor,
metadata: [fetcher: :block_uncle]
] ]
@doc """ @doc """
@ -73,7 +74,7 @@ defmodule Indexer.Block.Uncle.Fetcher do
# the same block could be included as an uncle on multiple blocks, but we only want to fetch it once # the same block could be included as an uncle on multiple blocks, but we only want to fetch it once
unique_hashes = Enum.uniq(hashes) unique_hashes = Enum.uniq(hashes)
Logger.debug(fn -> "fetching #{length(unique_hashes)} uncle blocks" end) Logger.debug(fn -> "fetching #{length(unique_hashes)}" end)
case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do
{:ok, blocks} -> {:ok, blocks} ->
@ -81,7 +82,7 @@ defmodule Indexer.Block.Uncle.Fetcher do
{:error, reason} -> {:error, reason} ->
Logger.error(fn -> Logger.error(fn ->
["failed to fetch ", unique_hashes |> length |> to_string(), " uncle blocks: ", inspect(reason)] ["failed to fetch ", unique_hashes |> length |> to_string(), ": ", inspect(reason)]
end) end)
{:retry, unique_hashes} {:retry, unique_hashes}
@ -116,7 +117,7 @@ defmodule Indexer.Block.Uncle.Fetcher do
[ [
"failed to import ", "failed to import ",
original_entries |> length() |> to_string(), original_entries |> length() |> to_string(),
"uncle blocks in step ", " in step ",
inspect(step), inspect(step),
": ", ": ",
inspect(failed_value) inspect(failed_value)
@ -195,7 +196,7 @@ defmodule Indexer.Block.Uncle.Fetcher do
retried_entries |> length() |> to_string(), retried_entries |> length() |> to_string(),
"/", "/",
original_entries |> length() |> to_string(), original_entries |> length() |> to_string(),
" uncles: ", ": ",
errors_to_iodata(errors) errors_to_iodata(errors)
] ]
end) end)

@ -70,6 +70,7 @@ defmodule Indexer.BufferedTask do
flush_interval: nil, flush_interval: nil,
max_batch_size: nil, max_batch_size: nil,
max_concurrency: nil, max_concurrency: nil,
metadata: [],
current_buffer: [], current_buffer: [],
bound_queue: %BoundQueue{}, bound_queue: %BoundQueue{},
task_ref_to_batch: %{} task_ref_to_batch: %{}
@ -195,6 +196,7 @@ defmodule Indexer.BufferedTask do
Options are optional and are passed in the list that is second element of the tuple. Options are optional and are passed in the list that is second element of the tuple.
* `:name` - The registered name for the new process. * `:name` - The registered name for the new process.
* `:metadata` - `Logger.metadata/1` to det in teh `Indexer.BufferedTask` process and any child processes.
""" """
@spec start_link( @spec start_link(
@ -221,13 +223,17 @@ defmodule Indexer.BufferedTask do
shrinkable(opts) shrinkable(opts)
metadata = Keyword.get(opts, :metadata, [])
Logger.metadata(metadata)
state = %BufferedTask{ state = %BufferedTask{
callback_module: callback_module, callback_module: callback_module,
callback_module_state: Keyword.fetch!(opts, :state), callback_module_state: Keyword.fetch!(opts, :state),
task_supervisor: Keyword.fetch!(opts, :task_supervisor), task_supervisor: Keyword.fetch!(opts, :task_supervisor),
flush_interval: Keyword.fetch!(opts, :flush_interval), flush_interval: Keyword.fetch!(opts, :flush_interval),
max_batch_size: Keyword.fetch!(opts, :max_batch_size), max_batch_size: Keyword.fetch!(opts, :max_batch_size),
max_concurrency: Keyword.fetch!(opts, :max_concurrency) max_concurrency: Keyword.fetch!(opts, :max_concurrency),
metadata: metadata
} }
{:ok, state} {:ok, state}
@ -337,13 +343,16 @@ defmodule Indexer.BufferedTask do
callback_module: callback_module, callback_module: callback_module,
callback_module_state: callback_module_state, callback_module_state: callback_module_state,
max_batch_size: max_batch_size, max_batch_size: max_batch_size,
task_supervisor: task_supervisor task_supervisor: task_supervisor,
metadata: metadata
} = state } = state
) do ) do
parent = self() parent = self()
task = task =
Task.Supervisor.async(task_supervisor, fn -> Task.Supervisor.async(task_supervisor, fn ->
Logger.metadata(metadata)
{0, []} {0, []}
|> callback_module.init( |> callback_module.init(
fn fn
@ -463,16 +472,21 @@ defmodule Indexer.BufferedTask do
callback_module_state: callback_module_state, callback_module_state: callback_module_state,
max_concurrency: max_concurrency, max_concurrency: max_concurrency,
task_ref_to_batch: task_ref_to_batch, task_ref_to_batch: task_ref_to_batch,
task_supervisor: task_supervisor task_supervisor: task_supervisor,
metadata: metadata
} = state } = state
) do ) do
if Enum.count(task_ref_to_batch) < max_concurrency and not Enum.empty?(bound_queue) do if Enum.count(task_ref_to_batch) < max_concurrency and not Enum.empty?(bound_queue) do
{batch, new_state} = take_batch(state) {batch, new_state} = take_batch(state)
%Task{ref: ref} = %Task{ref: ref} =
Task.Supervisor.async_nolink(task_supervisor, callback_module, :run, [ Task.Supervisor.async_nolink(task_supervisor, __MODULE__, :log_run, [
batch, %{
callback_module_state metadata: metadata,
callback_module: callback_module,
batch: batch,
callback_module_state: callback_module_state
}
]) ])
%BufferedTask{new_state | task_ref_to_batch: Map.put(task_ref_to_batch, ref, batch)} %BufferedTask{new_state | task_ref_to_batch: Map.put(task_ref_to_batch, ref, batch)}
@ -481,6 +495,18 @@ defmodule Indexer.BufferedTask do
end end
end end
# only public so that `Task.Supervisor.async_nolink` can call it
@doc false
def log_run(%{
metadata: metadata,
callback_module: callback_module,
batch: batch,
callback_module_state: callback_module_state
}) do
Logger.metadata(metadata)
callback_module.run(batch, callback_module_state)
end
defp flush(%BufferedTask{current_buffer: []} = state) do defp flush(%BufferedTask{current_buffer: []} = state) do
state state
|> spawn_next_batch() |> spawn_next_batch()

@ -21,7 +21,8 @@ defmodule Indexer.CoinBalance.Fetcher do
flush_interval: :timer.seconds(3), flush_interval: :timer.seconds(3),
max_batch_size: 500, max_batch_size: 500,
max_concurrency: 4, max_concurrency: 4,
task_supervisor: Indexer.CoinBalance.TaskSupervisor task_supervisor: Indexer.CoinBalance.TaskSupervisor,
metadata: [fetcher: :coin_balance]
] ]
@doc """ @doc """
@ -73,7 +74,7 @@ defmodule Indexer.CoinBalance.Fetcher do
# `{address, block}`, so take unique params only # `{address, block}`, so take unique params only
unique_entries = Enum.uniq(entries) unique_entries = Enum.uniq(entries)
Logger.debug(fn -> "fetching #{length(unique_entries)} balances" end) Logger.debug(fn -> ["fetching ", unique_entries |> length() |> to_string()] end)
unique_entries unique_entries
|> Enum.map(&entry_to_params/1) |> Enum.map(&entry_to_params/1)
@ -84,7 +85,7 @@ defmodule Indexer.CoinBalance.Fetcher do
{:error, reason} -> {:error, reason} ->
Logger.error(fn -> Logger.error(fn ->
["failed to fetch ", unique_entries |> length() |> to_string(), " balances, ", inspect(reason)] ["failed to fetch ", unique_entries |> length() |> to_string(), ": ", inspect(reason)]
end) end)
{:retry, unique_entries} {:retry, unique_entries}
@ -141,7 +142,7 @@ defmodule Indexer.CoinBalance.Fetcher do
retried_entries |> length() |> to_string(), retried_entries |> length() |> to_string(),
"/", "/",
original_entries |> length() |> to_string(), original_entries |> length() |> to_string(),
" balances: ", ": ",
fetched_balance_errors_to_iodata(errors) fetched_balance_errors_to_iodata(errors)
] ]
end) end)

@ -23,7 +23,8 @@ defmodule Indexer.InternalTransaction.Fetcher do
flush_interval: :timer.seconds(3), flush_interval: :timer.seconds(3),
max_concurrency: @max_concurrency, max_concurrency: @max_concurrency,
max_batch_size: @max_batch_size, max_batch_size: @max_batch_size,
task_supervisor: Indexer.InternalTransaction.TaskSupervisor task_supervisor: Indexer.InternalTransaction.TaskSupervisor,
metadata: [fetcher: :internal_transaction]
] ]
@doc """ @doc """

@ -58,6 +58,8 @@ defmodule Indexer.PendingTransaction.Fetcher do
@impl GenServer @impl GenServer
def init(opts) when is_list(opts) do def init(opts) when is_list(opts) do
Logger.metadata(fetcher: :pending_transaction)
opts = opts =
:indexer :indexer
|> Application.get_all_env() |> Application.get_all_env()
@ -100,6 +102,8 @@ defmodule Indexer.PendingTransaction.Fetcher do
end end
defp task(%PendingTransaction.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = _state) do defp task(%PendingTransaction.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = _state) do
Logger.metadata(fetcher: :pending_transaction)
case fetch_pending_transactions(json_rpc_named_arguments) do case fetch_pending_transactions(json_rpc_named_arguments) do
{:ok, transactions_params} -> {:ok, transactions_params} ->
addresses_params = AddressExtraction.extract_addresses(%{transactions: transactions_params}, pending: true) addresses_params = AddressExtraction.extract_addresses(%{transactions: transactions_params}, pending: true)

@ -0,0 +1,87 @@
defmodule Indexer.Block.InvalidConsensus.WorkerTest do
use Explorer.DataCase
alias Indexer.Sequence
alias Indexer.Block.InvalidConsensus.{Worker, TaskSupervisor}
@moduletag :capture_log
describe "start_link/1" do
test "starts the worker" do
assert {:ok, _pid} = Worker.start_link(supervisor: self())
end
end
describe "init/1" do
test "sends message to self" do
pid = self()
assert {:ok, %{task_ref: nil, block_numbers: [], sup_pid: ^pid}} = Worker.init(supervisor: self())
assert_received :scan
end
end
describe "handle_info with :scan" do
test "sends shutdown to supervisor" do
state = %{task_ref: nil, block_numbers: [], sup_pid: self()}
Task.async(fn -> Worker.handle_info(:scan, state) end)
assert_receive {_, _, {:terminate, :normal}}
end
test "sends message to self when blocks with invalid consensus are found" do
block1 = insert(:block)
block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false)
_block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number)
_block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash)
block_number = block2_with_invalid_consensus.number
expected_state = %{task_ref: nil, block_numbers: [block_number], retry_interval: 1}
state = %{task_ref: nil, block_numbers: [], retry_interval: 1}
assert {:noreply, ^expected_state} = Worker.handle_info(:scan, state)
assert_receive :push_front_blocks
end
end
describe "handle_info with :push_front_blocks" do
test "starts a task" do
task_sup_pid = start_supervised!({Task.Supervisor, name: TaskSupervisor})
start_supervised!({Sequence, [[ranges: [], step: -1], [name: :block_catchup_sequencer]]})
state = %{task_ref: nil, block_numbers: [1]}
assert {:noreply, %{task_ref: task_ref}} = Worker.handle_info(:push_front_blocks, state)
assert is_reference(task_ref)
refute_receive {^task_ref, {:error, :queue_unavailable}}
assert_receive {^task_ref, :ok}
stop_supervised(task_sup_pid)
end
end
describe "handle_info with task ref tuple" do
test "sends shutdown to supervisor on success" do
ref = Process.monitor(self())
state = %{task_ref: ref, block_numbers: [], sup_pid: self()}
Task.async(fn -> assert Worker.handle_info({ref, :ok}, state) end)
assert_receive {_, _, {:terminate, :normal}}
end
test "sends message to self to try again on failure" do
ref = Process.monitor(self())
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1}
expected_state = %{state | task_ref: nil}
assert {:noreply, ^expected_state} = Worker.handle_info({ref, {:error, :queue_unavailable}}, state)
assert_receive :push_front_blocks
end
end
describe "handle_info with failed task" do
test "sends message to self to try again" do
ref = Process.monitor(self())
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1}
assert {:noreply, %{task_ref: nil}} = Worker.handle_info({:DOWN, ref, :process, self(), :EXIT}, state)
assert_receive :push_front_blocks
end
end
end

@ -31,20 +31,20 @@ config :logger,
config :logger, :console, config :logger, :console,
# Use same format for all loggers, even though the level should only ever be `:error` for `:error` backend # Use same format for all loggers, even though the level should only ever be `:error` for `:error` backend
format: "$time $metadata[$level] $message\n", format: "$dateT$time $metadata[$level] $message\n",
metadata: [:application, :request_id] metadata: ~w(application fetcher request_id)a
config :logger, :ecto, config :logger, :ecto,
# Use same format for all loggers, even though the level should only ever be `:error` for `:error` backend # Use same format for all loggers, even though the level should only ever be `:error` for `:error` backend
format: "$time $metadata[$level] $message\n", format: "$dateT$time $metadata[$level] $message\n",
metadata: [:application, :request_id], metadata: ~w(application fetcher request_id)a,
metadata_filter: [application: :ecto] metadata_filter: [application: :ecto]
config :logger, :error, config :logger, :error,
# Use same format for all loggers, even though the level should only ever be `:error` for `:error` backend # Use same format for all loggers, even though the level should only ever be `:error` for `:error` backend
format: "$time $metadata[$level] $message\n", format: "$dateT$time $metadata[$level] $message\n",
level: :error, level: :error,
metadata: [:application, :request_id] metadata: ~w(application fetcher request_id)a
# Import environment specific config. This must remain at the bottom # Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above. # of this file so it overrides the configuration defined above.

Loading…
Cancel
Save