Separate Catch, Realtime, Supervisor, and BlockFetcher states

BlockFetcher.Catchup and BlockFetcher.Realtime get their own structs,
which use BlockFetcher struct.  This better emphasizes that Catchup and
Realtime are both block fetchers.  The BlockFetcher.Catchup and
BlockFetcher.Realtime structs move into Supervisor's struct.
pull/489/head
Luke Imhoff 6 years ago
parent 2333cc6cf5
commit 11735b1289
  1. 58
      apps/indexer/lib/indexer/block_fetcher.ex
  2. 68
      apps/indexer/lib/indexer/block_fetcher/catchup.ex
  3. 47
      apps/indexer/lib/indexer/block_fetcher/realtime.ex
  4. 33
      apps/indexer/lib/indexer/block_fetcher/supervisor.ex
  5. 351
      apps/indexer/test/indexer/block_fetcher_test.exs
  6. 349
      apps/indexer/test/indexer/supervisor_test.exs

@ -8,10 +8,10 @@ defmodule Indexer.BlockFetcher do
import Indexer, only: [debug: 1] import Indexer, only: [debug: 1]
alias Explorer.Chain alias Explorer.Chain
alias Indexer.{AddressExtraction, BalanceFetcher, BoundInterval, InternalTransactionFetcher, Sequence} alias Indexer.{AddressExtraction, BalanceFetcher, InternalTransactionFetcher, Sequence}
# dialyzer thinks that Logger.debug functions always have no_local_return # dialyzer thinks that Logger.debug functions always have no_local_return
@dialyzer {:nowarn_function, import_range: 4} @dialyzer {:nowarn_function, import_range: 2}
# These are all the *default* values for options. # These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`. # DO NOT use them directly in the code. Get options from `state`.
@ -19,21 +19,17 @@ defmodule Indexer.BlockFetcher do
@blocks_batch_size 10 @blocks_batch_size 10
@blocks_concurrency 10 @blocks_concurrency 10
# milliseconds
@block_interval 5_000
@receipts_batch_size 250 @receipts_batch_size 250
@receipts_concurrency 10 @receipts_concurrency 10
defstruct json_rpc_named_arguments: [], @enforce_keys ~w(json_rpc_named_arguments)a
catchup_task: nil, defstruct json_rpc_named_arguments: nil,
catchup_bound_interval: nil,
realtime_task_by_ref: %{},
realtime_interval: nil,
blocks_batch_size: @blocks_batch_size, blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency, blocks_concurrency: @blocks_concurrency,
broadcast: nil,
receipts_batch_size: @receipts_batch_size, receipts_batch_size: @receipts_batch_size,
receipts_concurrency: @receipts_concurrency receipts_concurrency: @receipts_concurrency,
sequence: nil
@doc false @doc false
def default_blocks_batch_size, do: @blocks_batch_size def default_blocks_batch_size, do: @blocks_batch_size
@ -53,8 +49,6 @@ defmodule Indexer.BlockFetcher do
Defaults to #{@blocks_concurrency}. So upto `blocks_concurrency * block_batch_size` (defaults to Defaults to #{@blocks_concurrency}. So upto `blocks_concurrency * block_batch_size` (defaults to
`#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all `#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all
connections. connections.
* `:block_interval` - The number of milliseconds between new blocks being published. Defaults to
`#{@block_interval}` milliseconds.
* `:receipts_batch_size` - The number of receipts to request in one call to the JSONRPC. Defaults to * `:receipts_batch_size` - The number of receipts to request in one call to the JSONRPC. Defaults to
`#{@receipts_batch_size}`. Receipt requests also include the logs for when the transaction was collated into the `#{@receipts_batch_size}`. Receipt requests also include the logs for when the transaction was collated into the
block. *These logs are not paginated.* block. *These logs are not paginated.*
@ -65,24 +59,17 @@ defmodule Indexer.BlockFetcher do
JSONRPC at once over all connections. *Each transaction only has one receipt.* JSONRPC at once over all connections. *Each transaction only has one receipt.*
""" """
def new(named_arguments) when is_list(named_arguments) do def new(named_arguments) when is_list(named_arguments) do
interval = div(named_arguments[:block_interval] || @block_interval, 2) struct!(__MODULE__, named_arguments)
state = struct!(__MODULE__, Keyword.delete(named_arguments, :block_interval))
%__MODULE__{
state
| json_rpc_named_arguments: Keyword.fetch!(named_arguments, :json_rpc_named_arguments),
catchup_bound_interval: BoundInterval.within(interval..(interval * 10)),
realtime_interval: interval
}
end end
def stream_import(%__MODULE__{} = state, seq, indexer_mode, task_opts) do def stream_import(%__MODULE__{blocks_concurrency: blocks_concurrency, sequence: sequence} = state)
seq when is_pid(sequence) do
sequence
|> Sequence.build_stream() |> Sequence.build_stream()
|> Task.async_stream( |> Task.async_stream(
&import_range(&1, state, seq, indexer_mode), &import_range(state, &1),
Keyword.merge(task_opts, timeout: :infinity) max_concurrency: blocks_concurrency,
timeout: :infinity
) )
|> Stream.run() |> Stream.run()
end end
@ -126,13 +113,13 @@ defmodule Indexer.BlockFetcher do
end) end)
end end
defp insert(seq, range, indexer_mode, options) when is_list(options) do defp insert(%__MODULE__{broadcast: broadcast, sequence: sequence}, options) when is_list(options) do
{address_hash_to_fetched_balance_block_number, import_options} = {address_hash_to_fetched_balance_block_number, import_options} =
pop_address_hash_to_fetched_balance_block_number(options) pop_address_hash_to_fetched_balance_block_number(options)
transaction_hash_to_block_number = get_transaction_hash_to_block_number(import_options) transaction_hash_to_block_number = get_transaction_hash_to_block_number(import_options)
options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index) options_with_broadcast = Keyword.merge(import_options, broadcast: broadcast)
with {:ok, results} <- Chain.import(options_with_broadcast) do with {:ok, results} <- Chain.import(options_with_broadcast) do
async_import_remaining_block_data( async_import_remaining_block_data(
@ -144,11 +131,13 @@ defmodule Indexer.BlockFetcher do
{:ok, results} {:ok, results}
else else
{:error, step, failed_value, _changes_so_far} = error -> {:error, step, failed_value, _changes_so_far} = error ->
range = Keyword.fetch!(options, :range)
debug(fn -> debug(fn ->
"failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying" "failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying"
end) end)
:ok = Sequence.queue(seq, range) :ok = Sequence.queue(sequence, range)
error error
end end
@ -202,10 +191,10 @@ defmodule Indexer.BlockFetcher do
|> InternalTransactionFetcher.async_fetch(10_000) |> InternalTransactionFetcher.async_fetch(10_000)
end end
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/3` # Run at state.blocks_concurrency max_concurrency when called by `stream_import/1`
# Only public for testing # Only public for testing
@doc false @doc false
def import_range(range, %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq, indexer_mode) do def import_range(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments, sequence: seq} = state, range) do
with {:blocks, {:ok, next, result}} <- with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{blocks: blocks, transactions: transactions_without_receipts} = result, %{blocks: blocks, transactions: transactions_without_receipts} = result,
@ -222,9 +211,8 @@ defmodule Indexer.BlockFetcher do
}) })
insert( insert(
seq, state,
range, range: range,
indexer_mode,
addresses: [params: addresses], addresses: [params: addresses],
blocks: [params: blocks], blocks: [params: blocks],
logs: [params: logs], logs: [params: logs],

@ -6,22 +6,44 @@ defmodule Indexer.BlockFetcher.Catchup do
require Logger require Logger
import Indexer, only: [debug: 1] import Indexer, only: [debug: 1]
import Indexer.BlockFetcher, only: [stream_import: 4] import Indexer.BlockFetcher, only: [stream_import: 1]
alias Explorer.Chain alias Explorer.Chain
alias Indexer.{BlockFetcher, BoundInterval, Sequence} alias Indexer.{BlockFetcher, BoundInterval, Sequence}
@enforce_keys ~w(block_fetcher bound_interval)a
defstruct ~w(block_fetcher bound_interval task)a
def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do
block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: true}
minimum_interval = div(block_interval, 2)
%__MODULE__{
block_fetcher: block_fetcher,
bound_interval: BoundInterval.within(minimum_interval..(minimum_interval * 10))
}
end
@doc """ @doc """
Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0`
""" """
@spec put(%BlockFetcher{catchup_task: nil}) :: %BlockFetcher{catchup_task: Task.t()} @spec put(%BlockFetcher.Supervisor{catchup: %__MODULE__{task: nil}}) :: %BlockFetcher.Supervisor{
def put(%BlockFetcher{catchup_task: nil} = state) do catchup: %__MODULE__{task: Task.t()}
catchup_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) }
def put(%BlockFetcher.Supervisor{catchup: %__MODULUE__{task: nil} = state} = supervisor_state) do
%BlockFetcher{state | catchup_task: catchup_task} put_in(
supervisor_state.catchup.task,
Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state])
)
end end
def task(%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state) do def task(
%__MODULE__{
block_fetcher:
%BlockFetcher{blocks_batch_size: blocks_batch_size, json_rpc_named_arguments: json_rpc_named_arguments} =
block_fetcher
} = state
) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
case latest_block_number do case latest_block_number do
@ -48,10 +70,10 @@ defmodule Indexer.BlockFetcher.Catchup do
:ok :ok
_ -> _ ->
{:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size) {:ok, sequence} = Sequence.start_link(ranges: missing_ranges, step: -1 * blocks_batch_size)
Sequence.cap(seq) Sequence.cap(sequence)
stream_import(state, seq, :catchup_index, max_concurrency: state.blocks_concurrency) stream_import(%BlockFetcher{block_fetcher | sequence: sequence})
end end
%{first_block_number: first, missing_block_count: missing_block_count} %{first_block_number: first, missing_block_count: missing_block_count}
@ -60,28 +82,30 @@ defmodule Indexer.BlockFetcher.Catchup do
def handle_success( def handle_success(
{ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}}, {ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}},
%BlockFetcher{ %BlockFetcher.Supervisor{
catchup_bound_interval: catchup_bound_interval, catchup: %__MODULE__{
catchup_task: %Task{ref: ref} bound_interval: bound_interval,
} = state task: %Task{ref: ref}
}
} = supervisor_state
) )
when is_integer(missing_block_count) do when is_integer(missing_block_count) do
new_catchup_bound_interval = new_bound_interval =
case missing_block_count do case missing_block_count do
0 -> 0 ->
Logger.info("Index already caught up in #{first_block_number}-0") Logger.info("Index already caught up in #{first_block_number}-0")
BoundInterval.increase(catchup_bound_interval) BoundInterval.increase(bound_interval)
_ -> _ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0") Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0")
BoundInterval.decrease(catchup_bound_interval) BoundInterval.decrease(bound_interval)
end end
Process.demonitor(ref, [:flush]) Process.demonitor(ref, [:flush])
interval = new_catchup_bound_interval.current interval = new_bound_interval.current
Logger.info(fn -> Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms" "Checking if index needs to catch up in #{interval}ms"
@ -89,17 +113,19 @@ defmodule Indexer.BlockFetcher.Catchup do
Process.send_after(self(), :catchup_index, interval) Process.send_after(self(), :catchup_index, interval)
%BlockFetcher{state | catchup_bound_interval: new_catchup_bound_interval, catchup_task: nil} update_in(supervisor_state.catchup, fn state ->
%__MODULE__{state | bound_interval: new_bound_interval, task: nil}
end)
end end
def handle_failure( def handle_failure(
{:DOWN, ref, :process, pid, reason}, {:DOWN, ref, :process, pid, reason},
%BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = state %BlockFetcher.Supervisor{catchup: %__MODULE__{task: %Task{pid: pid, ref: ref}}} = supervisor_state
) do ) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end) Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)
send(self(), :catchup_index) send(self(), :catchup_index)
%BlockFetcher{state | catchup_task: nil} put_in(supervisor_state.catchup.task, nil)
end end
end end

@ -5,29 +5,46 @@ defmodule Indexer.BlockFetcher.Realtime do
require Logger require Logger
import Indexer.BlockFetcher, only: [stream_import: 4] import Indexer.BlockFetcher, only: [stream_import: 1]
alias Indexer.{BlockFetcher, Sequence} alias Indexer.{BlockFetcher, Sequence}
@enforce_keys ~w(block_fetcher interval)a
defstruct block_fetcher: nil,
interval: nil,
task_by_ref: %{}
def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do
block_fetcher = %BlockFetcher{
block_fetcher | blocks_concurrency: 1, broadcast: true}
interval = div(block_interval, 2)
%__MODULE__{block_fetcher: block_fetcher, interval: interval}
end
@doc """ @doc """
Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` `realtime_task_by_ref`. Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` `realtime_task_by_ref`.
""" """
def put(%BlockFetcher{} = state) do def put(%BlockFetcher.Supervisor{realtime: %__MODULE__{} = state} = supervisor_state) do
%Task{ref: ref} = realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) %Task{ref: ref} = task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state])
put_in(state.realtime_task_by_ref[ref], realtime_task) put_in(supervisor_state.realtime.task_by_ref[ref], task)
end end
def task(%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state) do def task(%__MODULE__{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher}) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
{:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) {:ok, sequence} = Sequence.start_link(first: latest_block_number, step: 2)
stream_import(state, seq, :realtime_index, max_concurrency: 1) stream_import(%BlockFetcher{block_fetcher | sequence: sequence})
end end
def handle_success({ref, :ok = result}, %BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state) do def handle_success(
{realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) {ref, :ok = result},
%BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state
) do
{task, running_task_by_ref} = Map.pop(task_by_ref, ref)
case realtime_task do case task do
nil -> nil ->
Logger.error(fn -> Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
@ -40,16 +57,16 @@ defmodule Indexer.BlockFetcher.Realtime do
Process.demonitor(ref, [:flush]) Process.demonitor(ref, [:flush])
%BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref} put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref)
end end
def handle_failure( def handle_failure(
{:DOWN, ref, :process, pid, reason}, {:DOWN, ref, :process, pid, reason},
%BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state %BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state
) do ) do
{realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) {task, running_task_by_ref} = Map.pop(task_by_ref, ref)
case realtime_task do case task do
nil -> nil ->
Logger.error(fn -> Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
@ -64,6 +81,6 @@ defmodule Indexer.BlockFetcher.Realtime do
end) end)
end end
%BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref} put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref)
end end
end end

@ -11,6 +11,12 @@ defmodule Indexer.BlockFetcher.Supervisor do
alias Indexer.BlockFetcher alias Indexer.BlockFetcher
alias Indexer.BlockFetcher.{Catchup, Realtime} alias Indexer.BlockFetcher.{Catchup, Realtime}
# milliseconds
@block_interval 5_000
@enforce_keys ~w(catchup realtime)a
defstruct ~w(catchup realtime)a
def child_spec(arg) do def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker` # The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout # instead of `:supervisor` and use the wrong shutdown timeout
@ -29,39 +35,50 @@ defmodule Indexer.BlockFetcher.Supervisor do
@impl GenServer @impl GenServer
def init(named_arguments) do def init(named_arguments) do
state = BlockFetcher.new(named_arguments) state = new(named_arguments)
send(self(), :catchup_index) send(self(), :catchup_index)
{:ok, _} = :timer.send_interval(state.realtime_interval, :realtime_index) {:ok, _} = :timer.send_interval(state.realtime.interval, :realtime_index)
{:ok, state} {:ok, state}
end end
defp new(named_arguments) do
{given_block_interval, block_fetcher_named_arguments} = Keyword.pop(named_arguments, :block_interval)
block_fetcher = struct!(BlockFetcher, block_fetcher_named_arguments)
block_interval = given_block_interval || @block_interval
%__MODULE__{
catchup: Catchup.new(%{block_fetcher: block_fetcher, block_interval: block_interval}),
realtime: Realtime.new(%{block_fetcher: block_fetcher, block_interval: block_interval})
}
end
@impl GenServer @impl GenServer
def handle_info(:catchup_index, %BlockFetcher{} = state) do def handle_info(:catchup_index, %__MODULE__{} = state) do
{:noreply, Catchup.put(state)} {:noreply, Catchup.put(state)}
end end
def handle_info({ref, _} = message, %BlockFetcher{catchup_task: %Task{ref: ref}} = state) do def handle_info({ref, _} = message, %__MODULE__{catchup: %Catchup{task: %Task{ref: ref}}} = state) do
{:noreply, Catchup.handle_success(message, state)} {:noreply, Catchup.handle_success(message, state)}
end end
def handle_info( def handle_info(
{:DOWN, ref, :process, pid, _} = message, {:DOWN, ref, :process, pid, _} = message,
%BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = state %__MODULE__{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = state
) do ) do
{:noreply, Catchup.handle_failure(message, state)} {:noreply, Catchup.handle_failure(message, state)}
end end
def handle_info(:realtime_index, %BlockFetcher{} = state) do def handle_info(:realtime_index, %__MODULE__{} = state) do
{:noreply, Realtime.put(state)} {:noreply, Realtime.put(state)}
end end
def handle_info({ref, :ok} = message, %BlockFetcher{} = state) when is_reference(ref) do def handle_info({ref, :ok} = message, %__MODULE__{} = state) when is_reference(ref) do
{:noreply, Realtime.handle_success(message, state)} {:noreply, Realtime.handle_success(message, state)}
end end
def handle_info({:DOWN, _, :process, _, _} = message, %BlockFetcher{} = state) do def handle_info({:DOWN, _, :process, _, _} = message, %__MODULE__{} = state) do
{:noreply, Realtime.handle_failure(message, state)} {:noreply, Realtime.handle_failure(message, state)}
end end
end end

@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcher.SupervisorTest do defmodule Indexer.BlockFetcherTest do
# `async: false` due to use of named GenServer # `async: false` due to use of named GenServer
use EthereumJSONRPC.Case, async: false use EthereumJSONRPC.Case, async: false
use Explorer.DataCase use Explorer.DataCase
@ -13,7 +13,6 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
BalanceFetcher, BalanceFetcher,
AddressBalanceFetcherCase, AddressBalanceFetcherCase,
BlockFetcher, BlockFetcher,
BoundInterval,
BufferedTask, BufferedTask,
InternalTransactionFetcher, InternalTransactionFetcher,
InternalTransactionFetcherCase, InternalTransactionFetcherCase,
@ -46,342 +45,17 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
# ON blocks.hash = transactions.block_hash) as blocks # ON blocks.hash = transactions.block_hash) as blocks
@first_full_block_number 37 @first_full_block_number 37
describe "start_link/1" do describe "import_range/2" do
test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Parity ->
block_number = 3_416_888
block_quantity = integer_to_quantity(block_number)
EthereumJSONRPC.Mox
|> stub(:json_rpc, fn
# latest block number to seed starting block number for genesis and realtime tasks
%{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok,
%{
"author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"difficulty" => "0xfffffffffffffffffffffffffffffffe",
"extraData" => "0xd583010a068650617269747986312e32362e32826c69",
"gasLimit" => "0x7a1200",
"gasUsed" => "0x0",
"hash" => "0x627baabf5a17c0cfc547b6903ac5e19eaa91f30d9141be1034e3768f6adbc94e",
"logsBloom" =>
"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"number" => block_quantity,
"parentHash" => "0x006edcaa1e6fde822908783bc4ef1ad3675532d542fce53537557391cfe34c3c",
"receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"sealFields" => [
"0x841240b30d",
"0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01"
],
"sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"signature" =>
"58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01",
"size" => "0x243",
"stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7",
"step" => "306230029",
"timestamp" => "0x5b437f41",
"totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb",
"transactions" => [],
"transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"uncles" => []
}}
[%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options ->
{:ok,
Enum.map(requests, fn %{id: id, params: [block_quantity, true]} ->
%{
id: id,
jsonrpc: "2.0",
result: %{
"author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"difficulty" => "0xfffffffffffffffffffffffffffffffe",
"extraData" => "0xd583010a068650617269747986312e32362e32826c69",
"gasLimit" => "0x7a1200",
"gasUsed" => "0x0",
"hash" =>
Explorer.Factory.block_hash()
|> to_string(),
"logsBloom" =>
"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"number" => block_quantity,
"parentHash" =>
Explorer.Factory.block_hash()
|> to_string(),
"receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"sealFields" => [
"0x841240b30d",
"0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01"
],
"sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"signature" =>
"58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01",
"size" => "0x243",
"stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7",
"step" => "306230029",
"timestamp" => "0x5b437f41",
"totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb",
"transactions" => [],
"transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"uncles" => []
}
}
end)}
[%{method: "eth_getBalance"} | _] = requests, _options ->
{:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)}
end)
EthereumJSONRPC.Geth ->
block_number = 5_950_901
block_quantity = integer_to_quantity(block_number)
EthereumJSONRPC.Mox
|> stub(:json_rpc, fn
%{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok,
%{
"difficulty" => "0xc2550dc5bfc5d",
"extraData" => "0x65746865726d696e652d657538",
"gasLimit" => "0x7a121d",
"gasUsed" => "0x6cc04b",
"hash" => "0x71f484056fec687fd469989426c94c469ff08a28eae9a1865359d64557bb99f6",
"logsBloom" =>
"0x900840000041000850020000002800020800840900200210041006005028810880231200c1a0800001003a00011813005102000020800207080210000020014c00888640001040300c180008000084001000010018010040001118181400a06000280428024010081100015008080814141000644404040a8021101010040001001022000000000880420004008000180004000a01002080890010000a0601001a0000410244421002c0000100920100020004000020c10402004080008000203001000200c4001a000002000c0000000100200410090bc52e080900108230000110010082120200000004e01002000500001009e14001002051000040830080",
"miner" => "0xea674fdde714fd979de3edf0f56aa9716b898ec8",
"mixHash" => "0x555275cd0ab4c3b2fe3936843ee25bb67da05ef7dcf17216bc0e382d21d139a0",
"nonce" => "0xa49e42a024600113",
"number" => block_quantity,
"parentHash" => "0xb4357733c59cc6f785542d072a205f4e195f7198f544ea5e01c1b90ef0f914a5",
"receiptsRoot" => "0x17baf8de366fecc1be494bff245be6357ac60a5fe786099dba89983778c8421e",
"sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"size" => "0x6c7b",
"stateRoot" => "0x79345c692a0bf363e95c37750336c534309b3f3fe8b59712ac1527118070f488",
"timestamp" => "0x5b475377",
"totalDifficulty" => "0x120258e22c69502fc88",
"transactions" => ["0xa4b58d1d1473f4891d9ff91f624dba73611bf1f6e9a60d3ca2dcfc75d2ab185c"],
"transactionsRoot" => "0x5972b7988f667d7e86679322641117e503ea2c1bc5a27822a8a8120fe53f2c8b",
"uncles" => []
}}
[%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options ->
{:ok,
Enum.map(requests, fn %{id: id, params: [block_quantity, true]} ->
%{
id: id,
jsonrpc: "2.0",
result: %{
"difficulty" => "0xc22479024e55f",
"extraData" => "0x73656f3130",
"gasLimit" => "0x7a121d",
"gasUsed" => "0x7a0527",
"hash" =>
Explorer.Factory.block_hash()
|> to_string(),
"logsBloom" =>
"0x006a044c050a6759208088200009808898246808402123144ac15801c09a2672990130000042500000cc6090b063f195352095a88018194112101a02640000a0109c03c40568440b853a800a60044408604bb49d1d604c802008000884520208496608a520992e0f4b41a94188088920c1995107db4696c03839a911500084001009884100605084c4542953b08101103080254c34c802a00042a62f811340400d22080d000c0e39927ca481800c8024048425462000150850500205a224810041904023a80c00dc01040203000086020111210403081096822008c12500a2060a54834800400851210122c481a04a24b5284e9900a08110c180011001c03100",
"miner" => "0xb2930b35844a230f00e51431acae96fe543a0347",
"mixHash" => "0x5e07a58028d2cee7ddbefe245e6d7b5232d997b66cc906b18ad9ad51535ced24",
"nonce" => "0x3d88ebe8031aadf6",
"number" => block_quantity,
"parentHash" =>
Explorer.Factory.block_hash()
|> to_string(),
"receiptsRoot" => "0x5294a8b56be40c0c198aa443664e801bb926d49878f96151849f3ddd0cb5e76d",
"sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"size" => "0x4796",
"stateRoot" => "0x3755d4b5c9ae3cd58d7a856a46fbe8fb69f0ba93d81e831cd68feb8b61bc3009",
"timestamp" => "0x5b475393",
"totalDifficulty" => "0x120259a450e2527e1e7",
"transactions" => [],
"transactionsRoot" => "0xa71969ed649cd1f21846ab7b4029e79662941cc34cd473aa4590e666920ad2f4",
"uncles" => []
}
}
end)}
[%{method: "eth_getBalance"} | _] = requests, _options ->
{:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)}
end)
variant_name ->
raise ArgumentError, "Unsupported variant name (#{variant_name})"
end
end
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
default_blocks_batch_size = BlockFetcher.default_blocks_batch_size()
assert latest_block_number > default_blocks_batch_size
assert Repo.aggregate(Block, :count, :hash) == 0
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]})
first_catchup_block_number = latest_block_number - 1
wait_for_results(fn ->
Repo.one!(from(block in Block, where: block.number == ^first_catchup_block_number))
end)
assert Repo.aggregate(Block, :count, :hash) >= 1
previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size
wait_for_results(fn ->
Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number))
end)
assert Repo.aggregate(Block, :count, :hash) >= default_blocks_batch_size
end
end
describe "handle_info(:catchup_index, state)" do
setup context do
# force to use `Mox`, so we can manipulate `lastest_block_number`
put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox)
end
setup :state
test "increases catchup_bound_interval if no blocks missing", %{
json_rpc_named_arguments: json_rpc_named_arguments,
state: state
} do
insert(:block, number: 0)
insert(:block, number: 1)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x1"}}
end)
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
# from `setup :state`
assert_received :catchup_index
assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup_bound_interval.current > catchup_index_state.catchup_bound_interval.current
end
test "decreases catchup_bound_interval if blocks missing", %{
json_rpc_named_arguments: json_rpc_named_arguments,
state: state
} do
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x1"}}
end)
|> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: %{
"difficulty" => "0x0",
"gasLimit" => "0x0",
"gasUsed" => "0x0",
"hash" =>
Explorer.Factory.block_hash()
|> to_string(),
"miner" => "0xb2930b35844a230f00e51431acae96fe543a0347",
"number" => "0x0",
"parentHash" =>
Explorer.Factory.block_hash()
|> to_string(),
"size" => "0x0",
"timestamp" => "0x0",
"totalDifficulty" => "0x0",
"transactions" => []
}
}
]}
end)
|> stub(:json_rpc, fn [
%{
id: id,
method: "eth_getBalance",
params: ["0xb2930b35844a230f00e51431acae96fe543a0347", "0x0"]
}
],
_options ->
{:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]}
end)
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
# from `setup :state`
assert_received :catchup_index
assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
# 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for
# catchup_index
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 1}} = message
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup_bound_interval.current == message_state.catchup_bound_interval.minimum
# When not at minimum it is decreased
above_minimum_state = update_in(catchup_index_state.catchup_bound_interval, &BoundInterval.increase/1)
assert above_minimum_state.catchup_bound_interval.current > message_state.catchup_bound_interval.minimum
assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state)
assert above_minimum_message_state.catchup_bound_interval.current <
above_minimum_state.catchup_bound_interval.current
end
end
describe "import_range/4" do
setup :state
setup %{json_rpc_named_arguments: json_rpc_named_arguments} do setup %{json_rpc_named_arguments: json_rpc_named_arguments} do
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
{:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments)
%{state: state} %{block_fetcher: BlockFetcher.new(broadcast: false, json_rpc_named_arguments: json_rpc_named_arguments)}
end end
test "with single element range that is valid imports one block", %{ test "with single element range that is valid imports one block", %{
json_rpc_named_arguments: json_rpc_named_arguments, block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher
state: state
} do } do
block_number = 0 block_number = 0
@ -496,6 +170,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
end end
{:ok, sequence} = Sequence.start_link(first: 0, step: 1) {:ok, sequence} = Sequence.start_link(first: 0, step: 1)
sequenced_block_fetcher = %BlockFetcher{block_fetcher | sequence: sequence}
%{address_hash: address_hash, block_hash: block_hash} = %{address_hash: address_hash, block_hash: block_hash} =
case Keyword.fetch!(json_rpc_named_arguments, :variant) do case Keyword.fetch!(json_rpc_named_arguments, :variant) do
@ -532,7 +207,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
end end
log_bad_gateway( log_bad_gateway(
fn -> BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) end, fn -> BlockFetcher.import_range(sequenced_block_fetcher, block_number..block_number) end,
fn result -> fn result ->
assert {:ok, assert {:ok,
%{ %{
@ -560,8 +235,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
# Implement when a full block is found for Ethereum Mainnet and remove :no_geth tag # Implement when a full block is found for Ethereum Mainnet and remove :no_geth tag
@tag :no_geth @tag :no_geth
test "can import range with all synchronous imported schemas", %{ test "can import range with all synchronous imported schemas", %{
json_rpc_named_arguments: json_rpc_named_arguments, block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher
state: state
} do } do
block_number = @first_full_block_number block_number = @first_full_block_number
@ -733,6 +407,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
end end
{:ok, sequence} = Sequence.start_link(first: 0, step: 1) {:ok, sequence} = Sequence.start_link(first: 0, step: 1)
sequenced_block_fetcher = %BlockFetcher{block_fetcher | sequence: sequence}
case Keyword.fetch!(json_rpc_named_arguments, :variant) do case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Geth -> EthereumJSONRPC.Geth ->
@ -792,7 +467,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
154, 143, 4, 28, 171, 95, 190, 255, 254, 174, 75, 182>> 154, 143, 4, 28, 171, 95, 190, 255, 254, 174, 75, 182>>
} }
] ]
}} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) }} = BlockFetcher.import_range(sequenced_block_fetcher, block_number..block_number)
wait_for_tasks(InternalTransactionFetcher) wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(BalanceFetcher) wait_for_tasks(BalanceFetcher)
@ -879,7 +554,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>>
} }
] ]
}} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) }} = BlockFetcher.import_range(block_fetcher, block_number..block_number)
wait_for_tasks(InternalTransactionFetcher) wait_for_tasks(InternalTransactionFetcher)
wait_for_tasks(BalanceFetcher) wait_for_tasks(BalanceFetcher)
@ -905,12 +580,6 @@ defmodule Indexer.BlockFetcher.SupervisorTest do
end end
end end
defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do
{:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments)
%{state: state}
end
defp wait_until(timeout, producer) do defp wait_until(timeout, producer) do
parent = self() parent = self()
ref = make_ref() ref = make_ref()

@ -0,0 +1,349 @@
defmodule Indexer.BlockFetcher.SupervisorTest do
# `async: false` due to use of named GenServer
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
import Mox
import EthereumJSONRPC, only: [integer_to_quantity: 1]
alias Explorer.Chain.Block
alias Indexer.{AddressBalanceFetcherCase, BlockFetcher, BoundInterval, InternalTransactionFetcherCase}
alias Indexer.BlockFetcher.Catchup
@moduletag capture_log: true
# MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to
# use expectations and stubs from test's pid.
setup :set_mox_global
setup :verify_on_exit!
describe "start_link/1" do
test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Parity ->
block_number = 3_416_888
block_quantity = integer_to_quantity(block_number)
EthereumJSONRPC.Mox
|> stub(:json_rpc, fn
# latest block number to seed starting block number for genesis and realtime tasks
%{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok,
%{
"author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"difficulty" => "0xfffffffffffffffffffffffffffffffe",
"extraData" => "0xd583010a068650617269747986312e32362e32826c69",
"gasLimit" => "0x7a1200",
"gasUsed" => "0x0",
"hash" => "0x627baabf5a17c0cfc547b6903ac5e19eaa91f30d9141be1034e3768f6adbc94e",
"logsBloom" =>
"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"number" => block_quantity,
"parentHash" => "0x006edcaa1e6fde822908783bc4ef1ad3675532d542fce53537557391cfe34c3c",
"receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"sealFields" => [
"0x841240b30d",
"0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01"
],
"sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"signature" =>
"58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01",
"size" => "0x243",
"stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7",
"step" => "306230029",
"timestamp" => "0x5b437f41",
"totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb",
"transactions" => [],
"transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"uncles" => []
}}
[%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options ->
{:ok,
Enum.map(requests, fn %{id: id, params: [block_quantity, true]} ->
%{
id: id,
jsonrpc: "2.0",
result: %{
"author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"difficulty" => "0xfffffffffffffffffffffffffffffffe",
"extraData" => "0xd583010a068650617269747986312e32362e32826c69",
"gasLimit" => "0x7a1200",
"gasUsed" => "0x0",
"hash" =>
Explorer.Factory.block_hash()
|> to_string(),
"logsBloom" =>
"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"number" => block_quantity,
"parentHash" =>
Explorer.Factory.block_hash()
|> to_string(),
"receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"sealFields" => [
"0x841240b30d",
"0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01"
],
"sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"signature" =>
"58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01",
"size" => "0x243",
"stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7",
"step" => "306230029",
"timestamp" => "0x5b437f41",
"totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb",
"transactions" => [],
"transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"uncles" => []
}
}
end)}
[%{method: "eth_getBalance"} | _] = requests, _options ->
{:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)}
end)
EthereumJSONRPC.Geth ->
block_number = 5_950_901
block_quantity = integer_to_quantity(block_number)
EthereumJSONRPC.Mox
|> stub(:json_rpc, fn
%{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok,
%{
"difficulty" => "0xc2550dc5bfc5d",
"extraData" => "0x65746865726d696e652d657538",
"gasLimit" => "0x7a121d",
"gasUsed" => "0x6cc04b",
"hash" => "0x71f484056fec687fd469989426c94c469ff08a28eae9a1865359d64557bb99f6",
"logsBloom" =>
"0x900840000041000850020000002800020800840900200210041006005028810880231200c1a0800001003a00011813005102000020800207080210000020014c00888640001040300c180008000084001000010018010040001118181400a06000280428024010081100015008080814141000644404040a8021101010040001001022000000000880420004008000180004000a01002080890010000a0601001a0000410244421002c0000100920100020004000020c10402004080008000203001000200c4001a000002000c0000000100200410090bc52e080900108230000110010082120200000004e01002000500001009e14001002051000040830080",
"miner" => "0xea674fdde714fd979de3edf0f56aa9716b898ec8",
"mixHash" => "0x555275cd0ab4c3b2fe3936843ee25bb67da05ef7dcf17216bc0e382d21d139a0",
"nonce" => "0xa49e42a024600113",
"number" => block_quantity,
"parentHash" => "0xb4357733c59cc6f785542d072a205f4e195f7198f544ea5e01c1b90ef0f914a5",
"receiptsRoot" => "0x17baf8de366fecc1be494bff245be6357ac60a5fe786099dba89983778c8421e",
"sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"size" => "0x6c7b",
"stateRoot" => "0x79345c692a0bf363e95c37750336c534309b3f3fe8b59712ac1527118070f488",
"timestamp" => "0x5b475377",
"totalDifficulty" => "0x120258e22c69502fc88",
"transactions" => ["0xa4b58d1d1473f4891d9ff91f624dba73611bf1f6e9a60d3ca2dcfc75d2ab185c"],
"transactionsRoot" => "0x5972b7988f667d7e86679322641117e503ea2c1bc5a27822a8a8120fe53f2c8b",
"uncles" => []
}}
[%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options ->
{:ok,
Enum.map(requests, fn %{id: id, params: [block_quantity, true]} ->
%{
id: id,
jsonrpc: "2.0",
result: %{
"difficulty" => "0xc22479024e55f",
"extraData" => "0x73656f3130",
"gasLimit" => "0x7a121d",
"gasUsed" => "0x7a0527",
"hash" =>
Explorer.Factory.block_hash()
|> to_string(),
"logsBloom" =>
"0x006a044c050a6759208088200009808898246808402123144ac15801c09a2672990130000042500000cc6090b063f195352095a88018194112101a02640000a0109c03c40568440b853a800a60044408604bb49d1d604c802008000884520208496608a520992e0f4b41a94188088920c1995107db4696c03839a911500084001009884100605084c4542953b08101103080254c34c802a00042a62f811340400d22080d000c0e39927ca481800c8024048425462000150850500205a224810041904023a80c00dc01040203000086020111210403081096822008c12500a2060a54834800400851210122c481a04a24b5284e9900a08110c180011001c03100",
"miner" => "0xb2930b35844a230f00e51431acae96fe543a0347",
"mixHash" => "0x5e07a58028d2cee7ddbefe245e6d7b5232d997b66cc906b18ad9ad51535ced24",
"nonce" => "0x3d88ebe8031aadf6",
"number" => block_quantity,
"parentHash" =>
Explorer.Factory.block_hash()
|> to_string(),
"receiptsRoot" => "0x5294a8b56be40c0c198aa443664e801bb926d49878f96151849f3ddd0cb5e76d",
"sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"size" => "0x4796",
"stateRoot" => "0x3755d4b5c9ae3cd58d7a856a46fbe8fb69f0ba93d81e831cd68feb8b61bc3009",
"timestamp" => "0x5b475393",
"totalDifficulty" => "0x120259a450e2527e1e7",
"transactions" => [],
"transactionsRoot" => "0xa71969ed649cd1f21846ab7b4029e79662941cc34cd473aa4590e666920ad2f4",
"uncles" => []
}
}
end)}
[%{method: "eth_getBalance"} | _] = requests, _options ->
{:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)}
end)
variant_name ->
raise ArgumentError, "Unsupported variant name (#{variant_name})"
end
end
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
default_blocks_batch_size = BlockFetcher.default_blocks_batch_size()
assert latest_block_number > default_blocks_batch_size
assert Repo.aggregate(Block, :count, :hash) == 0
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]})
first_catchup_block_number = latest_block_number - 1
wait_for_results(fn ->
Repo.one!(from(block in Block, where: block.number == ^first_catchup_block_number))
end)
assert Repo.aggregate(Block, :count, :hash) >= 1
previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size
wait_for_results(fn ->
Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number))
end)
assert Repo.aggregate(Block, :count, :hash) >= default_blocks_batch_size
end
end
describe "handle_info(:catchup_index, state)" do
setup context do
# force to use `Mox`, so we can manipulate `lastest_block_number`
put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox)
end
setup :state
test "increases catchup_bound_interval if no blocks missing", %{
json_rpc_named_arguments: json_rpc_named_arguments,
state: state
} do
insert(:block, number: 0)
insert(:block, number: 1)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x1"}}
end)
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
# from `setup :state`
assert_received :catchup_index
assert {:noreply,
%BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} =
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup.bound_interval.current > catchup_index_state.catchup.bound_interval.current
end
test "decreases catchup_bound_interval if blocks missing", %{
json_rpc_named_arguments: json_rpc_named_arguments,
state: state
} do
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x1"}}
end)
|> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: %{
"difficulty" => "0x0",
"gasLimit" => "0x0",
"gasUsed" => "0x0",
"hash" =>
Explorer.Factory.block_hash()
|> to_string(),
"miner" => "0xb2930b35844a230f00e51431acae96fe543a0347",
"number" => "0x0",
"parentHash" =>
Explorer.Factory.block_hash()
|> to_string(),
"size" => "0x0",
"timestamp" => "0x0",
"totalDifficulty" => "0x0",
"transactions" => []
}
}
]}
end)
|> stub(:json_rpc, fn [
%{
id: id,
method: "eth_getBalance",
params: ["0xb2930b35844a230f00e51431acae96fe543a0347", "0x0"]
}
],
_options ->
{:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]}
end)
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
# from `setup :state`
assert_received :catchup_index
assert {:noreply,
%BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} =
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
# 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for
# catchup_index
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 1}} = message
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup.bound_interval.current == message_state.catchup.bound_interval.minimum
# When not at minimum it is decreased
above_minimum_state = update_in(catchup_index_state.catchup.bound_interval, &BoundInterval.increase/1)
assert above_minimum_state.catchup.bound_interval.current > message_state.catchup.bound_interval.minimum
assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state)
assert above_minimum_message_state.catchup.bound_interval.current <
above_minimum_state.catchup.bound_interval.current
end
end
defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do
{:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments)
%{state: state}
end
end
Loading…
Cancel
Save