From 11735b1289eba675dc40f141403d6c2f79910a23 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 30 Jul 2018 12:00:26 -0500 Subject: [PATCH] Separate Catch, Realtime, Supervisor, and BlockFetcher states BlockFetcher.Catchup and BlockFetcher.Realtime get their own structs, which use BlockFetcher struct. This better emphasizes that Catchup and Realtime are both block fetchers. The BlockFetcher.Catchup and BlockFetcher.Realtime structs move into Supervisor's struct. --- apps/indexer/lib/indexer/block_fetcher.ex | 58 ++- .../lib/indexer/block_fetcher/catchup.ex | 68 ++-- .../lib/indexer/block_fetcher/realtime.ex | 47 ++- .../lib/indexer/block_fetcher/supervisor.ex | 33 +- ...rvisor_test.exs => block_fetcher_test.exs} | 351 +----------------- apps/indexer/test/indexer/supervisor_test.exs | 349 +++++++++++++++++ 6 files changed, 486 insertions(+), 420 deletions(-) rename apps/indexer/test/indexer/{block_fetcher/supervisor_test.exs => block_fetcher_test.exs} (60%) create mode 100644 apps/indexer/test/indexer/supervisor_test.exs diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 30c5d755eb..6c9a6fe2af 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -8,10 +8,10 @@ defmodule Indexer.BlockFetcher do import Indexer, only: [debug: 1] alias Explorer.Chain - alias Indexer.{AddressExtraction, BalanceFetcher, BoundInterval, InternalTransactionFetcher, Sequence} + alias Indexer.{AddressExtraction, BalanceFetcher, InternalTransactionFetcher, Sequence} # dialyzer thinks that Logger.debug functions always have no_local_return - @dialyzer {:nowarn_function, import_range: 4} + @dialyzer {:nowarn_function, import_range: 2} # These are all the *default* values for options. # DO NOT use them directly in the code. Get options from `state`. @@ -19,21 +19,17 @@ defmodule Indexer.BlockFetcher do @blocks_batch_size 10 @blocks_concurrency 10 - # milliseconds - @block_interval 5_000 - @receipts_batch_size 250 @receipts_concurrency 10 - defstruct json_rpc_named_arguments: [], - catchup_task: nil, - catchup_bound_interval: nil, - realtime_task_by_ref: %{}, - realtime_interval: nil, + @enforce_keys ~w(json_rpc_named_arguments)a + defstruct json_rpc_named_arguments: nil, blocks_batch_size: @blocks_batch_size, blocks_concurrency: @blocks_concurrency, + broadcast: nil, receipts_batch_size: @receipts_batch_size, - receipts_concurrency: @receipts_concurrency + receipts_concurrency: @receipts_concurrency, + sequence: nil @doc false def default_blocks_batch_size, do: @blocks_batch_size @@ -53,8 +49,6 @@ defmodule Indexer.BlockFetcher do Defaults to #{@blocks_concurrency}. So upto `blocks_concurrency * block_batch_size` (defaults to `#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all connections. - * `:block_interval` - The number of milliseconds between new blocks being published. Defaults to - `#{@block_interval}` milliseconds. * `:receipts_batch_size` - The number of receipts to request in one call to the JSONRPC. Defaults to `#{@receipts_batch_size}`. Receipt requests also include the logs for when the transaction was collated into the block. *These logs are not paginated.* @@ -65,24 +59,17 @@ defmodule Indexer.BlockFetcher do JSONRPC at once over all connections. *Each transaction only has one receipt.* """ def new(named_arguments) when is_list(named_arguments) do - interval = div(named_arguments[:block_interval] || @block_interval, 2) - - state = struct!(__MODULE__, Keyword.delete(named_arguments, :block_interval)) - - %__MODULE__{ - state - | json_rpc_named_arguments: Keyword.fetch!(named_arguments, :json_rpc_named_arguments), - catchup_bound_interval: BoundInterval.within(interval..(interval * 10)), - realtime_interval: interval - } + struct!(__MODULE__, named_arguments) end - def stream_import(%__MODULE__{} = state, seq, indexer_mode, task_opts) do - seq + def stream_import(%__MODULE__{blocks_concurrency: blocks_concurrency, sequence: sequence} = state) + when is_pid(sequence) do + sequence |> Sequence.build_stream() |> Task.async_stream( - &import_range(&1, state, seq, indexer_mode), - Keyword.merge(task_opts, timeout: :infinity) + &import_range(state, &1), + max_concurrency: blocks_concurrency, + timeout: :infinity ) |> Stream.run() end @@ -126,13 +113,13 @@ defmodule Indexer.BlockFetcher do end) end - defp insert(seq, range, indexer_mode, options) when is_list(options) do + defp insert(%__MODULE__{broadcast: broadcast, sequence: sequence}, options) when is_list(options) do {address_hash_to_fetched_balance_block_number, import_options} = pop_address_hash_to_fetched_balance_block_number(options) transaction_hash_to_block_number = get_transaction_hash_to_block_number(import_options) - options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index) + options_with_broadcast = Keyword.merge(import_options, broadcast: broadcast) with {:ok, results} <- Chain.import(options_with_broadcast) do async_import_remaining_block_data( @@ -144,11 +131,13 @@ defmodule Indexer.BlockFetcher do {:ok, results} else {:error, step, failed_value, _changes_so_far} = error -> + range = Keyword.fetch!(options, :range) + debug(fn -> "failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying" end) - :ok = Sequence.queue(seq, range) + :ok = Sequence.queue(sequence, range) error end @@ -202,10 +191,10 @@ defmodule Indexer.BlockFetcher do |> InternalTransactionFetcher.async_fetch(10_000) end - # Run at state.blocks_concurrency max_concurrency when called by `stream_import/3` + # Run at state.blocks_concurrency max_concurrency when called by `stream_import/1` # Only public for testing @doc false - def import_range(range, %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq, indexer_mode) do + def import_range(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments, sequence: seq} = state, range) do with {:blocks, {:ok, next, result}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, %{blocks: blocks, transactions: transactions_without_receipts} = result, @@ -222,9 +211,8 @@ defmodule Indexer.BlockFetcher do }) insert( - seq, - range, - indexer_mode, + state, + range: range, addresses: [params: addresses], blocks: [params: blocks], logs: [params: logs], diff --git a/apps/indexer/lib/indexer/block_fetcher/catchup.ex b/apps/indexer/lib/indexer/block_fetcher/catchup.ex index 906d06b3b5..6e9c9eca41 100644 --- a/apps/indexer/lib/indexer/block_fetcher/catchup.ex +++ b/apps/indexer/lib/indexer/block_fetcher/catchup.ex @@ -6,22 +6,44 @@ defmodule Indexer.BlockFetcher.Catchup do require Logger import Indexer, only: [debug: 1] - import Indexer.BlockFetcher, only: [stream_import: 4] + import Indexer.BlockFetcher, only: [stream_import: 1] alias Explorer.Chain alias Indexer.{BlockFetcher, BoundInterval, Sequence} + @enforce_keys ~w(block_fetcher bound_interval)a + defstruct ~w(block_fetcher bound_interval task)a + + def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do + block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: true} + minimum_interval = div(block_interval, 2) + + %__MODULE__{ + block_fetcher: block_fetcher, + bound_interval: BoundInterval.within(minimum_interval..(minimum_interval * 10)) + } + end + @doc """ Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` """ - @spec put(%BlockFetcher{catchup_task: nil}) :: %BlockFetcher{catchup_task: Task.t()} - def put(%BlockFetcher{catchup_task: nil} = state) do - catchup_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) - - %BlockFetcher{state | catchup_task: catchup_task} + @spec put(%BlockFetcher.Supervisor{catchup: %__MODULE__{task: nil}}) :: %BlockFetcher.Supervisor{ + catchup: %__MODULE__{task: Task.t()} + } + def put(%BlockFetcher.Supervisor{catchup: %__MODULUE__{task: nil} = state} = supervisor_state) do + put_in( + supervisor_state.catchup.task, + Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) + ) end - def task(%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state) do + def task( + %__MODULE__{ + block_fetcher: + %BlockFetcher{blocks_batch_size: blocks_batch_size, json_rpc_named_arguments: json_rpc_named_arguments} = + block_fetcher + } = state + ) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) case latest_block_number do @@ -48,10 +70,10 @@ defmodule Indexer.BlockFetcher.Catchup do :ok _ -> - {:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size) - Sequence.cap(seq) + {:ok, sequence} = Sequence.start_link(ranges: missing_ranges, step: -1 * blocks_batch_size) + Sequence.cap(sequence) - stream_import(state, seq, :catchup_index, max_concurrency: state.blocks_concurrency) + stream_import(%BlockFetcher{block_fetcher | sequence: sequence}) end %{first_block_number: first, missing_block_count: missing_block_count} @@ -60,28 +82,30 @@ defmodule Indexer.BlockFetcher.Catchup do def handle_success( {ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}}, - %BlockFetcher{ - catchup_bound_interval: catchup_bound_interval, - catchup_task: %Task{ref: ref} - } = state + %BlockFetcher.Supervisor{ + catchup: %__MODULE__{ + bound_interval: bound_interval, + task: %Task{ref: ref} + } + } = supervisor_state ) when is_integer(missing_block_count) do - new_catchup_bound_interval = + new_bound_interval = case missing_block_count do 0 -> Logger.info("Index already caught up in #{first_block_number}-0") - BoundInterval.increase(catchup_bound_interval) + BoundInterval.increase(bound_interval) _ -> Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0") - BoundInterval.decrease(catchup_bound_interval) + BoundInterval.decrease(bound_interval) end Process.demonitor(ref, [:flush]) - interval = new_catchup_bound_interval.current + interval = new_bound_interval.current Logger.info(fn -> "Checking if index needs to catch up in #{interval}ms" @@ -89,17 +113,19 @@ defmodule Indexer.BlockFetcher.Catchup do Process.send_after(self(), :catchup_index, interval) - %BlockFetcher{state | catchup_bound_interval: new_catchup_bound_interval, catchup_task: nil} + update_in(supervisor_state.catchup, fn state -> + %__MODULE__{state | bound_interval: new_bound_interval, task: nil} + end) end def handle_failure( {:DOWN, ref, :process, pid, reason}, - %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = state + %BlockFetcher.Supervisor{catchup: %__MODULE__{task: %Task{pid: pid, ref: ref}}} = supervisor_state ) do Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end) send(self(), :catchup_index) - %BlockFetcher{state | catchup_task: nil} + put_in(supervisor_state.catchup.task, nil) end end diff --git a/apps/indexer/lib/indexer/block_fetcher/realtime.ex b/apps/indexer/lib/indexer/block_fetcher/realtime.ex index 6756f1eda4..83c9411269 100644 --- a/apps/indexer/lib/indexer/block_fetcher/realtime.ex +++ b/apps/indexer/lib/indexer/block_fetcher/realtime.ex @@ -5,29 +5,46 @@ defmodule Indexer.BlockFetcher.Realtime do require Logger - import Indexer.BlockFetcher, only: [stream_import: 4] + import Indexer.BlockFetcher, only: [stream_import: 1] alias Indexer.{BlockFetcher, Sequence} + @enforce_keys ~w(block_fetcher interval)a + defstruct block_fetcher: nil, + interval: nil, + task_by_ref: %{} + + def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do + block_fetcher = %BlockFetcher{ + block_fetcher | blocks_concurrency: 1, broadcast: true} + + interval = div(block_interval, 2) + + %__MODULE__{block_fetcher: block_fetcher, interval: interval} + end + @doc """ Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` `realtime_task_by_ref`. """ - def put(%BlockFetcher{} = state) do - %Task{ref: ref} = realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) + def put(%BlockFetcher.Supervisor{realtime: %__MODULE__{} = state} = supervisor_state) do + %Task{ref: ref} = task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) - put_in(state.realtime_task_by_ref[ref], realtime_task) + put_in(supervisor_state.realtime.task_by_ref[ref], task) end - def task(%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state) do + def task(%__MODULE__{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher}) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) - {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) - stream_import(state, seq, :realtime_index, max_concurrency: 1) + {:ok, sequence} = Sequence.start_link(first: latest_block_number, step: 2) + stream_import(%BlockFetcher{block_fetcher | sequence: sequence}) end - def handle_success({ref, :ok = result}, %BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state) do - {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) + def handle_success( + {ref, :ok = result}, + %BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state + ) do + {task, running_task_by_ref} = Map.pop(task_by_ref, ref) - case realtime_task do + case task do nil -> Logger.error(fn -> "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> @@ -40,16 +57,16 @@ defmodule Indexer.BlockFetcher.Realtime do Process.demonitor(ref, [:flush]) - %BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref} + put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref) end def handle_failure( {:DOWN, ref, :process, pid, reason}, - %BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state + %BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state ) do - {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) + {task, running_task_by_ref} = Map.pop(task_by_ref, ref) - case realtime_task do + case task do nil -> Logger.error(fn -> "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> @@ -64,6 +81,6 @@ defmodule Indexer.BlockFetcher.Realtime do end) end - %BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref} + put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref) end end diff --git a/apps/indexer/lib/indexer/block_fetcher/supervisor.ex b/apps/indexer/lib/indexer/block_fetcher/supervisor.ex index 0e27095a63..50d2c6fbad 100644 --- a/apps/indexer/lib/indexer/block_fetcher/supervisor.ex +++ b/apps/indexer/lib/indexer/block_fetcher/supervisor.ex @@ -11,6 +11,12 @@ defmodule Indexer.BlockFetcher.Supervisor do alias Indexer.BlockFetcher alias Indexer.BlockFetcher.{Catchup, Realtime} + # milliseconds + @block_interval 5_000 + + @enforce_keys ~w(catchup realtime)a + defstruct ~w(catchup realtime)a + def child_spec(arg) do # The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker` # instead of `:supervisor` and use the wrong shutdown timeout @@ -29,39 +35,50 @@ defmodule Indexer.BlockFetcher.Supervisor do @impl GenServer def init(named_arguments) do - state = BlockFetcher.new(named_arguments) + state = new(named_arguments) send(self(), :catchup_index) - {:ok, _} = :timer.send_interval(state.realtime_interval, :realtime_index) + {:ok, _} = :timer.send_interval(state.realtime.interval, :realtime_index) {:ok, state} end + defp new(named_arguments) do + {given_block_interval, block_fetcher_named_arguments} = Keyword.pop(named_arguments, :block_interval) + block_fetcher = struct!(BlockFetcher, block_fetcher_named_arguments) + block_interval = given_block_interval || @block_interval + + %__MODULE__{ + catchup: Catchup.new(%{block_fetcher: block_fetcher, block_interval: block_interval}), + realtime: Realtime.new(%{block_fetcher: block_fetcher, block_interval: block_interval}) + } + end + @impl GenServer - def handle_info(:catchup_index, %BlockFetcher{} = state) do + def handle_info(:catchup_index, %__MODULE__{} = state) do {:noreply, Catchup.put(state)} end - def handle_info({ref, _} = message, %BlockFetcher{catchup_task: %Task{ref: ref}} = state) do + def handle_info({ref, _} = message, %__MODULE__{catchup: %Catchup{task: %Task{ref: ref}}} = state) do {:noreply, Catchup.handle_success(message, state)} end def handle_info( {:DOWN, ref, :process, pid, _} = message, - %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = state + %__MODULE__{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = state ) do {:noreply, Catchup.handle_failure(message, state)} end - def handle_info(:realtime_index, %BlockFetcher{} = state) do + def handle_info(:realtime_index, %__MODULE__{} = state) do {:noreply, Realtime.put(state)} end - def handle_info({ref, :ok} = message, %BlockFetcher{} = state) when is_reference(ref) do + def handle_info({ref, :ok} = message, %__MODULE__{} = state) when is_reference(ref) do {:noreply, Realtime.handle_success(message, state)} end - def handle_info({:DOWN, _, :process, _, _} = message, %BlockFetcher{} = state) do + def handle_info({:DOWN, _, :process, _, _} = message, %__MODULE__{} = state) do {:noreply, Realtime.handle_failure(message, state)} end end diff --git a/apps/indexer/test/indexer/block_fetcher/supervisor_test.exs b/apps/indexer/test/indexer/block_fetcher_test.exs similarity index 60% rename from apps/indexer/test/indexer/block_fetcher/supervisor_test.exs rename to apps/indexer/test/indexer/block_fetcher_test.exs index 3a61c7d1c4..07c4250284 100644 --- a/apps/indexer/test/indexer/block_fetcher/supervisor_test.exs +++ b/apps/indexer/test/indexer/block_fetcher_test.exs @@ -1,4 +1,4 @@ -defmodule Indexer.BlockFetcher.SupervisorTest do +defmodule Indexer.BlockFetcherTest do # `async: false` due to use of named GenServer use EthereumJSONRPC.Case, async: false use Explorer.DataCase @@ -13,7 +13,6 @@ defmodule Indexer.BlockFetcher.SupervisorTest do BalanceFetcher, AddressBalanceFetcherCase, BlockFetcher, - BoundInterval, BufferedTask, InternalTransactionFetcher, InternalTransactionFetcherCase, @@ -46,342 +45,17 @@ defmodule Indexer.BlockFetcher.SupervisorTest do # ON blocks.hash = transactions.block_hash) as blocks @first_full_block_number 37 - describe "start_link/1" do - test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do - if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do - case Keyword.fetch!(json_rpc_named_arguments, :variant) do - EthereumJSONRPC.Parity -> - block_number = 3_416_888 - block_quantity = integer_to_quantity(block_number) - - EthereumJSONRPC.Mox - |> stub(:json_rpc, fn - # latest block number to seed starting block number for genesis and realtime tasks - %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> - {:ok, - %{ - "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", - "difficulty" => "0xfffffffffffffffffffffffffffffffe", - "extraData" => "0xd583010a068650617269747986312e32362e32826c69", - "gasLimit" => "0x7a1200", - "gasUsed" => "0x0", - "hash" => "0x627baabf5a17c0cfc547b6903ac5e19eaa91f30d9141be1034e3768f6adbc94e", - "logsBloom" => - "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", - "miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", - "number" => block_quantity, - "parentHash" => "0x006edcaa1e6fde822908783bc4ef1ad3675532d542fce53537557391cfe34c3c", - "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", - "sealFields" => [ - "0x841240b30d", - "0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01" - ], - "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "signature" => - "58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01", - "size" => "0x243", - "stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7", - "step" => "306230029", - "timestamp" => "0x5b437f41", - "totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb", - "transactions" => [], - "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", - "uncles" => [] - }} - - [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> - {:ok, - Enum.map(requests, fn %{id: id, params: [block_quantity, true]} -> - %{ - id: id, - jsonrpc: "2.0", - result: %{ - "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", - "difficulty" => "0xfffffffffffffffffffffffffffffffe", - "extraData" => "0xd583010a068650617269747986312e32362e32826c69", - "gasLimit" => "0x7a1200", - "gasUsed" => "0x0", - "hash" => - Explorer.Factory.block_hash() - |> to_string(), - "logsBloom" => - "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", - "miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", - "number" => block_quantity, - "parentHash" => - Explorer.Factory.block_hash() - |> to_string(), - "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", - "sealFields" => [ - "0x841240b30d", - "0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01" - ], - "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "signature" => - "58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01", - "size" => "0x243", - "stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7", - "step" => "306230029", - "timestamp" => "0x5b437f41", - "totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb", - "transactions" => [], - "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", - "uncles" => [] - } - } - end)} - - [%{method: "eth_getBalance"} | _] = requests, _options -> - {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)} - end) - - EthereumJSONRPC.Geth -> - block_number = 5_950_901 - block_quantity = integer_to_quantity(block_number) - - EthereumJSONRPC.Mox - |> stub(:json_rpc, fn - %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> - {:ok, - %{ - "difficulty" => "0xc2550dc5bfc5d", - "extraData" => "0x65746865726d696e652d657538", - "gasLimit" => "0x7a121d", - "gasUsed" => "0x6cc04b", - "hash" => "0x71f484056fec687fd469989426c94c469ff08a28eae9a1865359d64557bb99f6", - "logsBloom" => - "0x900840000041000850020000002800020800840900200210041006005028810880231200c1a0800001003a00011813005102000020800207080210000020014c00888640001040300c180008000084001000010018010040001118181400a06000280428024010081100015008080814141000644404040a8021101010040001001022000000000880420004008000180004000a01002080890010000a0601001a0000410244421002c0000100920100020004000020c10402004080008000203001000200c4001a000002000c0000000100200410090bc52e080900108230000110010082120200000004e01002000500001009e14001002051000040830080", - "miner" => "0xea674fdde714fd979de3edf0f56aa9716b898ec8", - "mixHash" => "0x555275cd0ab4c3b2fe3936843ee25bb67da05ef7dcf17216bc0e382d21d139a0", - "nonce" => "0xa49e42a024600113", - "number" => block_quantity, - "parentHash" => "0xb4357733c59cc6f785542d072a205f4e195f7198f544ea5e01c1b90ef0f914a5", - "receiptsRoot" => "0x17baf8de366fecc1be494bff245be6357ac60a5fe786099dba89983778c8421e", - "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "size" => "0x6c7b", - "stateRoot" => "0x79345c692a0bf363e95c37750336c534309b3f3fe8b59712ac1527118070f488", - "timestamp" => "0x5b475377", - "totalDifficulty" => "0x120258e22c69502fc88", - "transactions" => ["0xa4b58d1d1473f4891d9ff91f624dba73611bf1f6e9a60d3ca2dcfc75d2ab185c"], - "transactionsRoot" => "0x5972b7988f667d7e86679322641117e503ea2c1bc5a27822a8a8120fe53f2c8b", - "uncles" => [] - }} - - [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> - {:ok, - Enum.map(requests, fn %{id: id, params: [block_quantity, true]} -> - %{ - id: id, - jsonrpc: "2.0", - result: %{ - "difficulty" => "0xc22479024e55f", - "extraData" => "0x73656f3130", - "gasLimit" => "0x7a121d", - "gasUsed" => "0x7a0527", - "hash" => - Explorer.Factory.block_hash() - |> to_string(), - "logsBloom" => - "0x006a044c050a6759208088200009808898246808402123144ac15801c09a2672990130000042500000cc6090b063f195352095a88018194112101a02640000a0109c03c40568440b853a800a60044408604bb49d1d604c802008000884520208496608a520992e0f4b41a94188088920c1995107db4696c03839a911500084001009884100605084c4542953b08101103080254c34c802a00042a62f811340400d22080d000c0e39927ca481800c8024048425462000150850500205a224810041904023a80c00dc01040203000086020111210403081096822008c12500a2060a54834800400851210122c481a04a24b5284e9900a08110c180011001c03100", - "miner" => "0xb2930b35844a230f00e51431acae96fe543a0347", - "mixHash" => "0x5e07a58028d2cee7ddbefe245e6d7b5232d997b66cc906b18ad9ad51535ced24", - "nonce" => "0x3d88ebe8031aadf6", - "number" => block_quantity, - "parentHash" => - Explorer.Factory.block_hash() - |> to_string(), - "receiptsRoot" => "0x5294a8b56be40c0c198aa443664e801bb926d49878f96151849f3ddd0cb5e76d", - "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "size" => "0x4796", - "stateRoot" => "0x3755d4b5c9ae3cd58d7a856a46fbe8fb69f0ba93d81e831cd68feb8b61bc3009", - "timestamp" => "0x5b475393", - "totalDifficulty" => "0x120259a450e2527e1e7", - "transactions" => [], - "transactionsRoot" => "0xa71969ed649cd1f21846ab7b4029e79662941cc34cd473aa4590e666920ad2f4", - "uncles" => [] - } - } - end)} - - [%{method: "eth_getBalance"} | _] = requests, _options -> - {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)} - end) - - variant_name -> - raise ArgumentError, "Unsupported variant name (#{variant_name})" - end - end - - {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) - - default_blocks_batch_size = BlockFetcher.default_blocks_batch_size() - - assert latest_block_number > default_blocks_batch_size - - assert Repo.aggregate(Block, :count, :hash) == 0 - - start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) - AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]}) - - first_catchup_block_number = latest_block_number - 1 - - wait_for_results(fn -> - Repo.one!(from(block in Block, where: block.number == ^first_catchup_block_number)) - end) - - assert Repo.aggregate(Block, :count, :hash) >= 1 - - previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size - - wait_for_results(fn -> - Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number)) - end) - - assert Repo.aggregate(Block, :count, :hash) >= default_blocks_batch_size - end - end - - describe "handle_info(:catchup_index, state)" do - setup context do - # force to use `Mox`, so we can manipulate `lastest_block_number` - put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox) - end - - setup :state - - test "increases catchup_bound_interval if no blocks missing", %{ - json_rpc_named_arguments: json_rpc_named_arguments, - state: state - } do - insert(:block, number: 0) - insert(:block, number: 1) - - EthereumJSONRPC.Mox - |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> - {:ok, %{"number" => "0x1"}} - end) - - start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) - AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - - # from `setup :state` - assert_received :catchup_index - - assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} = - BlockFetcher.Supervisor.handle_info(:catchup_index, state) - - assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message - - # DOWN is not flushed - assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) - - assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) - - # DOWN is flushed - assert {:messages, []} = Process.info(self(), :messages) - - assert message_state.catchup_bound_interval.current > catchup_index_state.catchup_bound_interval.current - end - - test "decreases catchup_bound_interval if blocks missing", %{ - json_rpc_named_arguments: json_rpc_named_arguments, - state: state - } do - EthereumJSONRPC.Mox - |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> - {:ok, %{"number" => "0x1"}} - end) - |> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options -> - {:ok, - [ - %{ - id: id, - jsonrpc: "2.0", - result: %{ - "difficulty" => "0x0", - "gasLimit" => "0x0", - "gasUsed" => "0x0", - "hash" => - Explorer.Factory.block_hash() - |> to_string(), - "miner" => "0xb2930b35844a230f00e51431acae96fe543a0347", - "number" => "0x0", - "parentHash" => - Explorer.Factory.block_hash() - |> to_string(), - "size" => "0x0", - "timestamp" => "0x0", - "totalDifficulty" => "0x0", - "transactions" => [] - } - } - ]} - end) - |> stub(:json_rpc, fn [ - %{ - id: id, - method: "eth_getBalance", - params: ["0xb2930b35844a230f00e51431acae96fe543a0347", "0x0"] - } - ], - _options -> - {:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]} - end) - - start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) - AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - - # from `setup :state` - assert_received :catchup_index - - assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} = - BlockFetcher.Supervisor.handle_info(:catchup_index, state) - - # 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for - # catchup_index - assert_receive {^ref, %{first_block_number: 0, missing_block_count: 1}} = message - - # DOWN is not flushed - assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) - - assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) - - # DOWN is flushed - assert {:messages, []} = Process.info(self(), :messages) - - assert message_state.catchup_bound_interval.current == message_state.catchup_bound_interval.minimum - - # When not at minimum it is decreased - - above_minimum_state = update_in(catchup_index_state.catchup_bound_interval, &BoundInterval.increase/1) - - assert above_minimum_state.catchup_bound_interval.current > message_state.catchup_bound_interval.minimum - assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state) - - assert above_minimum_message_state.catchup_bound_interval.current < - above_minimum_state.catchup_bound_interval.current - end - end - - describe "import_range/4" do - setup :state - + describe "import_range/2" do setup %{json_rpc_named_arguments: json_rpc_named_arguments} do start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - {:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments) - %{state: state} + %{block_fetcher: BlockFetcher.new(broadcast: false, json_rpc_named_arguments: json_rpc_named_arguments)} end test "with single element range that is valid imports one block", %{ - json_rpc_named_arguments: json_rpc_named_arguments, - state: state + block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher } do block_number = 0 @@ -496,6 +170,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do end {:ok, sequence} = Sequence.start_link(first: 0, step: 1) + sequenced_block_fetcher = %BlockFetcher{block_fetcher | sequence: sequence} %{address_hash: address_hash, block_hash: block_hash} = case Keyword.fetch!(json_rpc_named_arguments, :variant) do @@ -532,7 +207,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do end log_bad_gateway( - fn -> BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) end, + fn -> BlockFetcher.import_range(sequenced_block_fetcher, block_number..block_number) end, fn result -> assert {:ok, %{ @@ -560,8 +235,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do # Implement when a full block is found for Ethereum Mainnet and remove :no_geth tag @tag :no_geth test "can import range with all synchronous imported schemas", %{ - json_rpc_named_arguments: json_rpc_named_arguments, - state: state + block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher } do block_number = @first_full_block_number @@ -733,6 +407,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do end {:ok, sequence} = Sequence.start_link(first: 0, step: 1) + sequenced_block_fetcher = %BlockFetcher{block_fetcher | sequence: sequence} case Keyword.fetch!(json_rpc_named_arguments, :variant) do EthereumJSONRPC.Geth -> @@ -792,7 +467,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do 154, 143, 4, 28, 171, 95, 190, 255, 254, 174, 75, 182>> } ] - }} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) + }} = BlockFetcher.import_range(sequenced_block_fetcher, block_number..block_number) wait_for_tasks(InternalTransactionFetcher) wait_for_tasks(BalanceFetcher) @@ -879,7 +554,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> } ] - }} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) + }} = BlockFetcher.import_range(block_fetcher, block_number..block_number) wait_for_tasks(InternalTransactionFetcher) wait_for_tasks(BalanceFetcher) @@ -905,12 +580,6 @@ defmodule Indexer.BlockFetcher.SupervisorTest do end end - defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do - {:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments) - - %{state: state} - end - defp wait_until(timeout, producer) do parent = self() ref = make_ref() diff --git a/apps/indexer/test/indexer/supervisor_test.exs b/apps/indexer/test/indexer/supervisor_test.exs new file mode 100644 index 0000000000..6bfa45257a --- /dev/null +++ b/apps/indexer/test/indexer/supervisor_test.exs @@ -0,0 +1,349 @@ +defmodule Indexer.BlockFetcher.SupervisorTest do + # `async: false` due to use of named GenServer + use EthereumJSONRPC.Case, async: false + use Explorer.DataCase + + import Mox + import EthereumJSONRPC, only: [integer_to_quantity: 1] + + alias Explorer.Chain.Block + alias Indexer.{AddressBalanceFetcherCase, BlockFetcher, BoundInterval, InternalTransactionFetcherCase} + alias Indexer.BlockFetcher.Catchup + + @moduletag capture_log: true + + # MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to + # use expectations and stubs from test's pid. + setup :set_mox_global + + setup :verify_on_exit! + + describe "start_link/1" do + test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do + if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do + case Keyword.fetch!(json_rpc_named_arguments, :variant) do + EthereumJSONRPC.Parity -> + block_number = 3_416_888 + block_quantity = integer_to_quantity(block_number) + + EthereumJSONRPC.Mox + |> stub(:json_rpc, fn + # latest block number to seed starting block number for genesis and realtime tasks + %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, + %{ + "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "difficulty" => "0xfffffffffffffffffffffffffffffffe", + "extraData" => "0xd583010a068650617269747986312e32362e32826c69", + "gasLimit" => "0x7a1200", + "gasUsed" => "0x0", + "hash" => "0x627baabf5a17c0cfc547b6903ac5e19eaa91f30d9141be1034e3768f6adbc94e", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "number" => block_quantity, + "parentHash" => "0x006edcaa1e6fde822908783bc4ef1ad3675532d542fce53537557391cfe34c3c", + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x841240b30d", + "0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01", + "size" => "0x243", + "stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7", + "step" => "306230029", + "timestamp" => "0x5b437f41", + "totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + }} + + [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> + {:ok, + Enum.map(requests, fn %{id: id, params: [block_quantity, true]} -> + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "difficulty" => "0xfffffffffffffffffffffffffffffffe", + "extraData" => "0xd583010a068650617269747986312e32362e32826c69", + "gasLimit" => "0x7a1200", + "gasUsed" => "0x0", + "hash" => + Explorer.Factory.block_hash() + |> to_string(), + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "number" => block_quantity, + "parentHash" => + Explorer.Factory.block_hash() + |> to_string(), + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x841240b30d", + "0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01", + "size" => "0x243", + "stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7", + "step" => "306230029", + "timestamp" => "0x5b437f41", + "totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + } + } + end)} + + [%{method: "eth_getBalance"} | _] = requests, _options -> + {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)} + end) + + EthereumJSONRPC.Geth -> + block_number = 5_950_901 + block_quantity = integer_to_quantity(block_number) + + EthereumJSONRPC.Mox + |> stub(:json_rpc, fn + %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, + %{ + "difficulty" => "0xc2550dc5bfc5d", + "extraData" => "0x65746865726d696e652d657538", + "gasLimit" => "0x7a121d", + "gasUsed" => "0x6cc04b", + "hash" => "0x71f484056fec687fd469989426c94c469ff08a28eae9a1865359d64557bb99f6", + "logsBloom" => + "0x900840000041000850020000002800020800840900200210041006005028810880231200c1a0800001003a00011813005102000020800207080210000020014c00888640001040300c180008000084001000010018010040001118181400a06000280428024010081100015008080814141000644404040a8021101010040001001022000000000880420004008000180004000a01002080890010000a0601001a0000410244421002c0000100920100020004000020c10402004080008000203001000200c4001a000002000c0000000100200410090bc52e080900108230000110010082120200000004e01002000500001009e14001002051000040830080", + "miner" => "0xea674fdde714fd979de3edf0f56aa9716b898ec8", + "mixHash" => "0x555275cd0ab4c3b2fe3936843ee25bb67da05ef7dcf17216bc0e382d21d139a0", + "nonce" => "0xa49e42a024600113", + "number" => block_quantity, + "parentHash" => "0xb4357733c59cc6f785542d072a205f4e195f7198f544ea5e01c1b90ef0f914a5", + "receiptsRoot" => "0x17baf8de366fecc1be494bff245be6357ac60a5fe786099dba89983778c8421e", + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size" => "0x6c7b", + "stateRoot" => "0x79345c692a0bf363e95c37750336c534309b3f3fe8b59712ac1527118070f488", + "timestamp" => "0x5b475377", + "totalDifficulty" => "0x120258e22c69502fc88", + "transactions" => ["0xa4b58d1d1473f4891d9ff91f624dba73611bf1f6e9a60d3ca2dcfc75d2ab185c"], + "transactionsRoot" => "0x5972b7988f667d7e86679322641117e503ea2c1bc5a27822a8a8120fe53f2c8b", + "uncles" => [] + }} + + [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> + {:ok, + Enum.map(requests, fn %{id: id, params: [block_quantity, true]} -> + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "difficulty" => "0xc22479024e55f", + "extraData" => "0x73656f3130", + "gasLimit" => "0x7a121d", + "gasUsed" => "0x7a0527", + "hash" => + Explorer.Factory.block_hash() + |> to_string(), + "logsBloom" => + "0x006a044c050a6759208088200009808898246808402123144ac15801c09a2672990130000042500000cc6090b063f195352095a88018194112101a02640000a0109c03c40568440b853a800a60044408604bb49d1d604c802008000884520208496608a520992e0f4b41a94188088920c1995107db4696c03839a911500084001009884100605084c4542953b08101103080254c34c802a00042a62f811340400d22080d000c0e39927ca481800c8024048425462000150850500205a224810041904023a80c00dc01040203000086020111210403081096822008c12500a2060a54834800400851210122c481a04a24b5284e9900a08110c180011001c03100", + "miner" => "0xb2930b35844a230f00e51431acae96fe543a0347", + "mixHash" => "0x5e07a58028d2cee7ddbefe245e6d7b5232d997b66cc906b18ad9ad51535ced24", + "nonce" => "0x3d88ebe8031aadf6", + "number" => block_quantity, + "parentHash" => + Explorer.Factory.block_hash() + |> to_string(), + "receiptsRoot" => "0x5294a8b56be40c0c198aa443664e801bb926d49878f96151849f3ddd0cb5e76d", + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size" => "0x4796", + "stateRoot" => "0x3755d4b5c9ae3cd58d7a856a46fbe8fb69f0ba93d81e831cd68feb8b61bc3009", + "timestamp" => "0x5b475393", + "totalDifficulty" => "0x120259a450e2527e1e7", + "transactions" => [], + "transactionsRoot" => "0xa71969ed649cd1f21846ab7b4029e79662941cc34cd473aa4590e666920ad2f4", + "uncles" => [] + } + } + end)} + + [%{method: "eth_getBalance"} | _] = requests, _options -> + {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)} + end) + + variant_name -> + raise ArgumentError, "Unsupported variant name (#{variant_name})" + end + end + + {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) + + default_blocks_batch_size = BlockFetcher.default_blocks_batch_size() + + assert latest_block_number > default_blocks_batch_size + + assert Repo.aggregate(Block, :count, :hash) == 0 + + start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]}) + + first_catchup_block_number = latest_block_number - 1 + + wait_for_results(fn -> + Repo.one!(from(block in Block, where: block.number == ^first_catchup_block_number)) + end) + + assert Repo.aggregate(Block, :count, :hash) >= 1 + + previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size + + wait_for_results(fn -> + Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number)) + end) + + assert Repo.aggregate(Block, :count, :hash) >= default_blocks_batch_size + end + end + + describe "handle_info(:catchup_index, state)" do + setup context do + # force to use `Mox`, so we can manipulate `lastest_block_number` + put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox) + end + + setup :state + + test "increases catchup_bound_interval if no blocks missing", %{ + json_rpc_named_arguments: json_rpc_named_arguments, + state: state + } do + insert(:block, number: 0) + insert(:block, number: 1) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, %{"number" => "0x1"}} + end) + + start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + # from `setup :state` + assert_received :catchup_index + + assert {:noreply, + %BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} = + BlockFetcher.Supervisor.handle_info(:catchup_index, state) + + assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message + + # DOWN is not flushed + assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) + + assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) + + # DOWN is flushed + assert {:messages, []} = Process.info(self(), :messages) + + assert message_state.catchup.bound_interval.current > catchup_index_state.catchup.bound_interval.current + end + + test "decreases catchup_bound_interval if blocks missing", %{ + json_rpc_named_arguments: json_rpc_named_arguments, + state: state + } do + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, %{"number" => "0x1"}} + end) + |> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options -> + {:ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "difficulty" => "0x0", + "gasLimit" => "0x0", + "gasUsed" => "0x0", + "hash" => + Explorer.Factory.block_hash() + |> to_string(), + "miner" => "0xb2930b35844a230f00e51431acae96fe543a0347", + "number" => "0x0", + "parentHash" => + Explorer.Factory.block_hash() + |> to_string(), + "size" => "0x0", + "timestamp" => "0x0", + "totalDifficulty" => "0x0", + "transactions" => [] + } + } + ]} + end) + |> stub(:json_rpc, fn [ + %{ + id: id, + method: "eth_getBalance", + params: ["0xb2930b35844a230f00e51431acae96fe543a0347", "0x0"] + } + ], + _options -> + {:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]} + end) + + start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + # from `setup :state` + assert_received :catchup_index + + assert {:noreply, + %BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} = + BlockFetcher.Supervisor.handle_info(:catchup_index, state) + + # 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for + # catchup_index + assert_receive {^ref, %{first_block_number: 0, missing_block_count: 1}} = message + + # DOWN is not flushed + assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) + + assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) + + # DOWN is flushed + assert {:messages, []} = Process.info(self(), :messages) + + assert message_state.catchup.bound_interval.current == message_state.catchup.bound_interval.minimum + + # When not at minimum it is decreased + + above_minimum_state = update_in(catchup_index_state.catchup.bound_interval, &BoundInterval.increase/1) + + assert above_minimum_state.catchup.bound_interval.current > message_state.catchup.bound_interval.minimum + assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state) + + assert above_minimum_message_state.catchup.bound_interval.current < + above_minimum_state.catchup.bound_interval.current + end + end + + defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do + {:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments) + + %{state: state} + end +end