Indexer.Block.Uncle.Fetcher

* Use `Explorer.Chain.stream_unfetched_uncle_hashes` to initialize queue.
* Have `Indexer.Block.Catchup.Fetcher` and `Indexer.Block.Realtime.Fetcher`
  queue uncles to fetch by calling
  `Indexer.Block.Uncle.Fetcher.async_fetch_blocks`.
pull/802/head
Luke Imhoff 6 years ago
parent 12f789a96d
commit 8e8b4b5508
  1. 5
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  2. 19
      apps/indexer/lib/indexer/block/fetcher.ex
  3. 6
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  4. 5
      apps/indexer/lib/indexer/block/supervisor.ex
  5. 154
      apps/indexer/lib/indexer/block/uncle/fetcher.ex
  6. 38
      apps/indexer/lib/indexer/block/uncle/supervisor.ex
  7. 115
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  8. 12
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs
  9. 126
      apps/indexer/test/indexer/block/uncle/fetcher_test.exs
  10. 18
      apps/indexer/test/support/indexer/block/uncle/supervisor/case.ex

@ -120,6 +120,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
defp async_import_remaining_block_data( defp async_import_remaining_block_data(
%{ %{
block_second_degree_relations: block_second_degree_relations,
transactions: transaction_hashes, transactions: transaction_hashes,
addresses: address_hashes, addresses: address_hashes,
tokens: tokens, tokens: tokens,
@ -149,6 +150,10 @@ defmodule Indexer.Block.Catchup.Fetcher do
|> Token.Fetcher.async_fetch() |> Token.Fetcher.async_fetch()
TokenBalance.Fetcher.async_fetch(token_balances) TokenBalance.Fetcher.async_fetch(token_balances)
block_second_degree_relations
|> Enum.map(& &1.uncle_hash)
|> Block.Uncle.Fetcher.async_fetch_blocks()
end end
defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence) defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)

@ -92,7 +92,11 @@ defmodule Indexer.Block.Fetcher do
when broadcast in ~w(true false)a and callback_module != nil do when broadcast in ~w(true false)a and callback_module != nil do
with {:blocks, {:ok, next, result}} <- with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{blocks: blocks, transactions: transactions_without_receipts, block_second_degree_relations: block_second_degree_relations} = result, %{
blocks: blocks,
transactions: transactions_without_receipts,
block_second_degree_relations: block_second_degree_relations
} = result,
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)}, {:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params, %{logs: logs, receipts: receipts} = receipt_params,
transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts), transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts),
@ -112,10 +116,9 @@ defmodule Indexer.Block.Fetcher do
}), }),
token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}), token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}),
{:ok, inserted} <- {:ok, inserted} <-
import_range( __MODULE__.import(
state, state,
%{ %{
range: range,
addresses: %{params: addresses}, addresses: %{params: addresses},
balances: %{params: coin_balances_params_set}, balances: %{params: coin_balances_params_set},
token_balances: %{params: token_balances}, token_balances: %{params: token_balances},
@ -136,11 +139,11 @@ defmodule Indexer.Block.Fetcher do
end end
end end
defp import_range( def import(
%__MODULE__{broadcast: broadcast, callback_module: callback_module} = state, %__MODULE__{broadcast: broadcast, callback_module: callback_module} = state,
options options
) )
when is_map(options) do when is_map(options) do
{address_hash_to_fetched_balance_block_number, import_options} = {address_hash_to_fetched_balance_block_number, import_options} =
pop_address_hash_to_fetched_balance_block_number(options) pop_address_hash_to_fetched_balance_block_number(options)

@ -173,10 +173,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
end end
end end
defp async_import_remaining_block_data(%{tokens: tokens}) do defp async_import_remaining_block_data(%{block_second_degree_relations: block_second_degree_relations, tokens: tokens}) do
tokens tokens
|> Enum.map(& &1.contract_address_hash) |> Enum.map(& &1.contract_address_hash)
|> Token.Fetcher.async_fetch() |> Token.Fetcher.async_fetch()
block_second_degree_relations
|> Enum.map(& &1.uncle_hash)
|> Block.Uncle.Fetcher.async_fetch_blocks()
end end
defp internal_transactions( defp internal_transactions(

@ -4,7 +4,7 @@ defmodule Indexer.Block.Supervisor do
""" """
alias Indexer.Block alias Indexer.Block
alias Indexer.Block.{Catchup, Realtime} alias Indexer.Block.{Catchup, Realtime, Uncle}
use Supervisor use Supervisor
@ -27,7 +27,8 @@ defmodule Indexer.Block.Supervisor do
[ [
%{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments}, %{block_fetcher: block_fetcher, subscribe_named_arguments: subscribe_named_arguments},
[name: Realtime.Supervisor] [name: Realtime.Supervisor]
]} ]},
{Uncle.Supervisor, [[block_fetcher: block_fetcher], [name: Uncle.Supervisor]]}
], ],
strategy: :one_for_one strategy: :one_for_one
) )

@ -0,0 +1,154 @@
defmodule Indexer.Block.Uncle.Fetcher do
@moduledoc """
Fetches `t:Explorer.Chain.Block.t/0` by `hash` and updates `t:Explorer.Chain.Block.SecondDegreeRelation.t/0`
`uncle_fetched_at` where the `uncle_hash` matches `hash`.
"""
require Logger
alias Explorer.Chain
alias Explorer.Chain.Hash
alias Indexer.{AddressExtraction, Block, BufferedTask}
@behaviour Block.Fetcher
@behaviour BufferedTask
@defaults [
flush_interval: :timer.seconds(3),
max_batch_size: 10,
max_concurrency: 10,
init_chunk_size: 1000,
task_supervisor: Indexer.Block.Uncle.TaskSupervisor
]
@doc """
Asynchronously fetches `t:Explorer.Chain.Block.t/0` for the given `hashes` and updates
`t:Explorer.Chain.Block.SecondDegreeRelation.t/0` `block_fetched_at`.
"""
@spec async_fetch_blocks([Hash.Full.t()]) :: :ok
def async_fetch_blocks(block_hashes) when is_list(block_hashes) do
BufferedTask.buffer(
__MODULE__,
block_hashes
|> Enum.map(&to_string/1)
|> Enum.uniq()
)
end
@doc false
def child_spec([init_options, gen_server_options]) when is_list(init_options) do
{state, mergeable_init_options} = Keyword.pop(init_options, :block_fetcher)
unless state do
raise ArgumentError,
":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <>
"to allow for json_rpc calls when running."
end
merged_init_options =
@defaults
|> Keyword.merge(mergeable_init_options)
|> Keyword.put(:state, %Block.Fetcher{state | broadcast: false, callback_module: __MODULE__})
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_unfetched_uncle_hashes(initial, fn uncle_hash, acc ->
uncle_hash
|> to_string()
|> reducer.(acc)
end)
final
end
@impl BufferedTask
def run(hashes, _retries, %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher) do
# the same block could be included as an uncle on multiple blocks, but we only want to fetch it once
unique_hashes = Enum.uniq(hashes)
Logger.debug(fn -> "fetching #{length(unique_hashes)} uncle blocks" end)
case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do
{:ok,
%{
blocks: blocks_params,
transactions: transactions_params,
block_second_degree_relations: block_second_degree_relations_params
}} ->
addresses_params =
AddressExtraction.extract_addresses(%{blocks: blocks_params, transactions: transactions_params})
{:ok, _} =
Block.Fetcher.import(block_fetcher, %{
addresses: %{params: addresses_params},
blocks: %{params: blocks_params},
block_second_degree_relations: %{params: block_second_degree_relations_params},
transactions: %{params: transactions_params, on_conflict: :nothing}
})
:ok
{:error, reason} ->
Logger.debug(fn -> "failed to fetch #{length(unique_hashes)} uncle blocks, #{inspect(reason)}" end)
{:retry, unique_hashes}
end
end
@impl Block.Fetcher
def import(_, options) when is_map(options) do
with {:ok, %{block_second_degree_relations: block_second_degree_relations}} = ok <-
options
|> uncle_blocks()
|> fork_transactions()
|> Chain.import() do
# * CoinBalance.Fetcher.async_fetch_balances is not called because uncles don't affect balances
# * InternalTransaction.Fetcher.async_fetch is not called because internal transactions are based on transaction
# hash, which is shared with transaction on consensus blocks.
# * Token.Fetcher.async_fetch is not called because the tokens only matter on consensus blocks
# * TokenBalance.Fetcher.async_fetch is not called because it uses block numbers from consensus, not uncles
block_second_degree_relations
|> Enum.map(& &1.uncle_hash)
|> Block.Uncle.Fetcher.async_fetch_blocks()
ok
end
end
defp uncle_blocks(chain_import_options) do
put_in(chain_import_options, [:blocks, :params, Access.all(), :consensus], false)
end
defp fork_transactions(chain_import_options) do
transactions_params = chain_import_options[:transactions][:params] || []
chain_import_options
|> put_in([:transactions, :params], forked_transactions_params(transactions_params))
|> put_in([Access.key(:transaction_forks, %{}), :params], transaction_forks_params(transactions_params))
end
defp forked_transactions_params(transactions_params) do
# With no block_hash, there will be a collision for the same hash when a transaction is used in more than 1 uncle,
# so use MapSet to prevent duplicate row errors.
MapSet.new(transactions_params, fn transaction_params ->
Map.merge(transaction_params, %{
block_hash: nil,
block_number: nil,
index: nil,
gas_used: nil,
cumulative_gas_used: nil,
status: nil
})
end)
end
defp transaction_forks_params(transactions_params) do
Enum.map(transactions_params, fn %{block_hash: uncle_hash, index: index, hash: hash} ->
%{uncle_hash: uncle_hash, index: index, hash: hash}
end)
end
end

@ -0,0 +1,38 @@
defmodule Indexer.Block.Uncle.Supervisor do
@moduledoc """
Supervises `Indexer.Block.Uncle.Fetcher`.
"""
use Supervisor
alias Indexer.Block.Uncle.Fetcher
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, gen_server_options)
end
@impl Supervisor
def init(fetcher_arguments) do
Supervisor.init(
[
{Task.Supervisor, name: Indexer.Block.Uncle.TaskSupervisor},
{Fetcher, [fetcher_arguments, [name: Fetcher]]}
],
strategy: :rest_for_one
)
end
end

@ -0,0 +1,115 @@
defmodule Indexer.Block.Catchup.FetcherTest do
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
import Mox
alias Indexer.{Block, CoinBalance, InternalTransaction, Token, TokenBalance}
alias Indexer.Block.Catchup.Fetcher
@moduletag capture_log: true
# MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to
# use expectations and stubs from test's pid.
setup :set_mox_global
setup :verify_on_exit!
setup do
# Uncle don't occur on POA chains, so there's no way to test this using the public addresses, so mox-only testing
%{
json_rpc_named_arguments: [
transport: EthereumJSONRPC.Mox,
transport_options: [],
# Which one does not matter, so pick one
variant: EthereumJSONRPC.Parity
]
}
end
describe "import/1" do
test "fetches uncles asynchronously", %{json_rpc_named_arguments: json_rpc_named_arguments} do
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
parent = self()
pid =
spawn_link(fn ->
receive do
{:"$gen_call", from, {:buffer, uncles}} ->
GenServer.reply(from, :ok)
send(parent, {:uncles, uncles})
end
end)
Process.register(pid, Block.Uncle.Fetcher)
nephew_hash = block_hash() |> to_string()
uncle_hash = block_hash() |> to_string()
miner_hash = address_hash() |> to_string()
block_number = 0
assert {:ok, _} =
Fetcher.import(%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}, %{
addresses: %{
params: [
%{hash: miner_hash}
]
},
address_hash_to_fetched_balance_block_number: %{miner_hash => block_number},
balances: %{
params: [
%{
address_hash: miner_hash,
block_number: block_number
}
]
},
blocks: %{
params: [
%{
difficulty: 0,
gas_limit: 21000,
gas_used: 21000,
miner_hash: miner_hash,
nonce: 0,
number: block_number,
parent_hash:
block_hash()
|> to_string(),
size: 0,
timestamp: DateTime.utc_now(),
total_difficulty: 0,
hash: nephew_hash
}
]
},
block_second_degree_relations: %{
params: [
%{
nephew_hash: nephew_hash,
uncle_hash: uncle_hash
}
]
},
tokens: %{
params: [],
on_conflict: :nothing
},
token_balances: %{
params: []
},
transactions: %{
params: [],
on_conflict: :nothing
},
transaction_hash_to_block_number: %{}
})
assert_receive {:uncles, [^uncle_hash]}
end
end
end

@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcher.RealtimeTest do defmodule Indexer.Block.Realtime.FetcherTest do
use EthereumJSONRPC.Case, async: false use EthereumJSONRPC.Case, async: false
use Explorer.DataCase use Explorer.DataCase
@ -7,7 +7,7 @@ defmodule Indexer.BlockFetcher.RealtimeTest do
alias Explorer.Chain alias Explorer.Chain
alias Explorer.Chain.Address alias Explorer.Chain.Address
alias Indexer.{Sequence, Token} alias Indexer.{Sequence, Token}
alias Indexer.Block.Realtime alias Indexer.Block.{Realtime, Uncle}
@moduletag capture_log: true @moduletag capture_log: true
@ -36,7 +36,7 @@ defmodule Indexer.BlockFetcher.RealtimeTest do
%{block_fetcher: block_fetcher, json_rpc_named_arguments: core_json_rpc_named_arguments} %{block_fetcher: block_fetcher, json_rpc_named_arguments: core_json_rpc_named_arguments}
end end
describe "Indexer.BlockFetcher.stream_import/1" do describe "Indexer.Block.Fetcher.fetch_and_import_range/1" do
@tag :no_geth @tag :no_geth
test "in range with internal transactions", %{ test "in range with internal transactions", %{
block_fetcher: %Indexer.Block.Fetcher{} = block_fetcher, block_fetcher: %Indexer.Block.Fetcher{} = block_fetcher,
@ -47,6 +47,10 @@ defmodule Indexer.BlockFetcher.RealtimeTest do
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Uncle.Supervisor.Case.start_supervised!(
block_fetcher: %Indexer.Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
)
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
EthereumJSONRPC.Mox EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [ |> expect(:json_rpc, fn [
@ -122,7 +126,7 @@ defmodule Indexer.BlockFetcher.RealtimeTest do
} }
], ],
"transactionsRoot" => "0xd7c39a93eafe0bdcbd1324c13dcd674bed8c9fa8adbf8f95bf6a59788985da6f", "transactionsRoot" => "0xd7c39a93eafe0bdcbd1324c13dcd674bed8c9fa8adbf8f95bf6a59788985da6f",
"uncles" => [] "uncles" => ["0xa4ec735cabe1510b5ae081b30f17222580b4588dbec52830529753a688b046cd"]
} }
}, },
%{ %{

@ -0,0 +1,126 @@
defmodule Indexer.Block.Uncle.FetcherTest do
# MUST be `async: false` so that {:shared, pid} is set for connection to allow CoinBalanceFetcher's self-send to have
# connection allowed immediately.
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
alias Explorer.Chain
alias Indexer.Block
import Mox
@moduletag :capture_log
# MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to
# use expectations and stubs from test's pid.
setup :set_mox_global
setup :verify_on_exit!
setup do
# Uncle don't occur on POA chains, so there's no way to test this using the public addresses, so mox-only testing
%{
json_rpc_named_arguments: [
transport: EthereumJSONRPC.Mox,
transport_options: [],
# Which one does not matter, so pick one
variant: EthereumJSONRPC.Parity
]
}
end
describe "child_spec/1" do
test "raises ArgumentError is `json_rpc_named_arguments is not provided" do
assert_raise ArgumentError,
":json_rpc_named_arguments must be provided to `Elixir.Indexer.Block.Uncle.Fetcher.child_spec " <>
"to allow for json_rpc calls when running.",
fn ->
start_supervised({Block.Uncle.Fetcher, [[], []]})
end
end
end
describe "init/1" do
test "fetched unfetched uncle hashes", %{json_rpc_named_arguments: json_rpc_named_arguments} do
assert %Chain.Block.SecondDegreeRelation{nephew_hash: nephew_hash, uncle_hash: uncle_hash, uncle: nil} =
:block_second_degree_relation
|> insert()
|> Repo.preload([:nephew, :uncle])
uncle_hash_data = to_string(uncle_hash)
uncle_uncle_hash_data = to_string(block_hash())
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [%{method: "eth_getBlockByHash", params: [^uncle_hash_data, true]}], _ ->
number_quantity = "0x0"
{:ok,
[
%{
result: %{
"author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"difficulty" => "0xfffffffffffffffffffffffffffffffe",
"extraData" => "0xd583010a068650617269747986312e32362e32826c69",
"gasLimit" => "0x7a1200",
"gasUsed" => "0x0",
"hash" => uncle_hash_data,
"logsBloom" => "0x",
"miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"number" => number_quantity,
"parentHash" => "0x006edcaa1e6fde822908783bc4ef1ad3675532d542fce53537557391cfe34c3c",
"size" => "0x243",
"timestamp" => "0x5b437f41",
"totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb",
"transactions" => [
%{
"blockHash" => uncle_hash_data,
"blockNumber" => number_quantity,
"chainId" => "0x4d",
"condition" => nil,
"creates" => "0xffc87239eb0267bc3ca2cd51d12fbf278e02ccb4",
"from" => "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
"gas" => "0x47b760",
"gasPrice" => "0x174876e800",
"hash" => "0x3a3eb134e6792ce9403ea4188e5e79693de9e4c94e499db132be086400da79e6",
"input" => "0x",
"nonce" => "0x0",
"r" => "0xad3733df250c87556335ffe46c23e34dbaffde93097ef92f52c88632a40f0c75",
"s" => "0x72caddc0371451a58de2ca6ab64e0f586ccdb9465ff54e1c82564940e89291e3",
"standardV" => "0x0",
"to" => nil,
"transactionIndex" => "0x0",
"v" => "0xbd",
"value" => "0x0"
}
],
"uncles" => [uncle_uncle_hash_data]
}
}
]}
end)
Block.Uncle.Supervisor.Case.start_supervised!(
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
)
wait(fn ->
Repo.one!(
from(bsdr in Chain.Block.SecondDegreeRelation,
where: bsdr.nephew_hash == ^nephew_hash and not is_nil(bsdr.uncle_fetched_at)
)
)
end)
refute is_nil(Repo.get(Chain.Block, uncle_hash))
assert Repo.aggregate(Chain.Transaction.Fork, :count, :hash) == 1
end
end
defp wait(producer) do
producer.()
rescue
Ecto.NoResultsError ->
Process.sleep(100)
wait(producer)
end
end

@ -0,0 +1,18 @@
defmodule Indexer.Block.Uncle.Supervisor.Case do
alias Indexer.Block
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
flush_interval: 50,
init_chunk_size: 1,
max_batch_size: 1,
max_concurrency: 1
)
[merged_fetcher_arguments]
|> Block.Uncle.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
end
end
Loading…
Cancel
Save