diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 30c5d755eb..6c9a6fe2af 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -8,10 +8,10 @@ defmodule Indexer.BlockFetcher do import Indexer, only: [debug: 1] alias Explorer.Chain - alias Indexer.{AddressExtraction, BalanceFetcher, BoundInterval, InternalTransactionFetcher, Sequence} + alias Indexer.{AddressExtraction, BalanceFetcher, InternalTransactionFetcher, Sequence} # dialyzer thinks that Logger.debug functions always have no_local_return - @dialyzer {:nowarn_function, import_range: 4} + @dialyzer {:nowarn_function, import_range: 2} # These are all the *default* values for options. # DO NOT use them directly in the code. Get options from `state`. @@ -19,21 +19,17 @@ defmodule Indexer.BlockFetcher do @blocks_batch_size 10 @blocks_concurrency 10 - # milliseconds - @block_interval 5_000 - @receipts_batch_size 250 @receipts_concurrency 10 - defstruct json_rpc_named_arguments: [], - catchup_task: nil, - catchup_bound_interval: nil, - realtime_task_by_ref: %{}, - realtime_interval: nil, + @enforce_keys ~w(json_rpc_named_arguments)a + defstruct json_rpc_named_arguments: nil, blocks_batch_size: @blocks_batch_size, blocks_concurrency: @blocks_concurrency, + broadcast: nil, receipts_batch_size: @receipts_batch_size, - receipts_concurrency: @receipts_concurrency + receipts_concurrency: @receipts_concurrency, + sequence: nil @doc false def default_blocks_batch_size, do: @blocks_batch_size @@ -53,8 +49,6 @@ defmodule Indexer.BlockFetcher do Defaults to #{@blocks_concurrency}. So upto `blocks_concurrency * block_batch_size` (defaults to `#{@blocks_concurrency * @blocks_batch_size}`) blocks can be requested from the JSONRPC at once over all connections. - * `:block_interval` - The number of milliseconds between new blocks being published. Defaults to - `#{@block_interval}` milliseconds. * `:receipts_batch_size` - The number of receipts to request in one call to the JSONRPC. Defaults to `#{@receipts_batch_size}`. Receipt requests also include the logs for when the transaction was collated into the block. *These logs are not paginated.* @@ -65,24 +59,17 @@ defmodule Indexer.BlockFetcher do JSONRPC at once over all connections. *Each transaction only has one receipt.* """ def new(named_arguments) when is_list(named_arguments) do - interval = div(named_arguments[:block_interval] || @block_interval, 2) - - state = struct!(__MODULE__, Keyword.delete(named_arguments, :block_interval)) - - %__MODULE__{ - state - | json_rpc_named_arguments: Keyword.fetch!(named_arguments, :json_rpc_named_arguments), - catchup_bound_interval: BoundInterval.within(interval..(interval * 10)), - realtime_interval: interval - } + struct!(__MODULE__, named_arguments) end - def stream_import(%__MODULE__{} = state, seq, indexer_mode, task_opts) do - seq + def stream_import(%__MODULE__{blocks_concurrency: blocks_concurrency, sequence: sequence} = state) + when is_pid(sequence) do + sequence |> Sequence.build_stream() |> Task.async_stream( - &import_range(&1, state, seq, indexer_mode), - Keyword.merge(task_opts, timeout: :infinity) + &import_range(state, &1), + max_concurrency: blocks_concurrency, + timeout: :infinity ) |> Stream.run() end @@ -126,13 +113,13 @@ defmodule Indexer.BlockFetcher do end) end - defp insert(seq, range, indexer_mode, options) when is_list(options) do + defp insert(%__MODULE__{broadcast: broadcast, sequence: sequence}, options) when is_list(options) do {address_hash_to_fetched_balance_block_number, import_options} = pop_address_hash_to_fetched_balance_block_number(options) transaction_hash_to_block_number = get_transaction_hash_to_block_number(import_options) - options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index) + options_with_broadcast = Keyword.merge(import_options, broadcast: broadcast) with {:ok, results} <- Chain.import(options_with_broadcast) do async_import_remaining_block_data( @@ -144,11 +131,13 @@ defmodule Indexer.BlockFetcher do {:ok, results} else {:error, step, failed_value, _changes_so_far} = error -> + range = Keyword.fetch!(options, :range) + debug(fn -> "failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying" end) - :ok = Sequence.queue(seq, range) + :ok = Sequence.queue(sequence, range) error end @@ -202,10 +191,10 @@ defmodule Indexer.BlockFetcher do |> InternalTransactionFetcher.async_fetch(10_000) end - # Run at state.blocks_concurrency max_concurrency when called by `stream_import/3` + # Run at state.blocks_concurrency max_concurrency when called by `stream_import/1` # Only public for testing @doc false - def import_range(range, %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq, indexer_mode) do + def import_range(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments, sequence: seq} = state, range) do with {:blocks, {:ok, next, result}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, %{blocks: blocks, transactions: transactions_without_receipts} = result, @@ -222,9 +211,8 @@ defmodule Indexer.BlockFetcher do }) insert( - seq, - range, - indexer_mode, + state, + range: range, addresses: [params: addresses], blocks: [params: blocks], logs: [params: logs], diff --git a/apps/indexer/lib/indexer/block_fetcher/catchup.ex b/apps/indexer/lib/indexer/block_fetcher/catchup.ex index 906d06b3b5..6e9c9eca41 100644 --- a/apps/indexer/lib/indexer/block_fetcher/catchup.ex +++ b/apps/indexer/lib/indexer/block_fetcher/catchup.ex @@ -6,22 +6,44 @@ defmodule Indexer.BlockFetcher.Catchup do require Logger import Indexer, only: [debug: 1] - import Indexer.BlockFetcher, only: [stream_import: 4] + import Indexer.BlockFetcher, only: [stream_import: 1] alias Explorer.Chain alias Indexer.{BlockFetcher, BoundInterval, Sequence} + @enforce_keys ~w(block_fetcher bound_interval)a + defstruct ~w(block_fetcher bound_interval task)a + + def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do + block_fetcher = %BlockFetcher{common_block_fetcher | broadcast: true} + minimum_interval = div(block_interval, 2) + + %__MODULE__{ + block_fetcher: block_fetcher, + bound_interval: BoundInterval.within(minimum_interval..(minimum_interval * 10)) + } + end + @doc """ Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` """ - @spec put(%BlockFetcher{catchup_task: nil}) :: %BlockFetcher{catchup_task: Task.t()} - def put(%BlockFetcher{catchup_task: nil} = state) do - catchup_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) - - %BlockFetcher{state | catchup_task: catchup_task} + @spec put(%BlockFetcher.Supervisor{catchup: %__MODULE__{task: nil}}) :: %BlockFetcher.Supervisor{ + catchup: %__MODULE__{task: Task.t()} + } + def put(%BlockFetcher.Supervisor{catchup: %__MODULUE__{task: nil} = state} = supervisor_state) do + put_in( + supervisor_state.catchup.task, + Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) + ) end - def task(%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state) do + def task( + %__MODULE__{ + block_fetcher: + %BlockFetcher{blocks_batch_size: blocks_batch_size, json_rpc_named_arguments: json_rpc_named_arguments} = + block_fetcher + } = state + ) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) case latest_block_number do @@ -48,10 +70,10 @@ defmodule Indexer.BlockFetcher.Catchup do :ok _ -> - {:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size) - Sequence.cap(seq) + {:ok, sequence} = Sequence.start_link(ranges: missing_ranges, step: -1 * blocks_batch_size) + Sequence.cap(sequence) - stream_import(state, seq, :catchup_index, max_concurrency: state.blocks_concurrency) + stream_import(%BlockFetcher{block_fetcher | sequence: sequence}) end %{first_block_number: first, missing_block_count: missing_block_count} @@ -60,28 +82,30 @@ defmodule Indexer.BlockFetcher.Catchup do def handle_success( {ref, %{first_block_number: first_block_number, missing_block_count: missing_block_count}}, - %BlockFetcher{ - catchup_bound_interval: catchup_bound_interval, - catchup_task: %Task{ref: ref} - } = state + %BlockFetcher.Supervisor{ + catchup: %__MODULE__{ + bound_interval: bound_interval, + task: %Task{ref: ref} + } + } = supervisor_state ) when is_integer(missing_block_count) do - new_catchup_bound_interval = + new_bound_interval = case missing_block_count do 0 -> Logger.info("Index already caught up in #{first_block_number}-0") - BoundInterval.increase(catchup_bound_interval) + BoundInterval.increase(bound_interval) _ -> Logger.info("Index had to catch up #{missing_block_count} blocks in #{first_block_number}-0") - BoundInterval.decrease(catchup_bound_interval) + BoundInterval.decrease(bound_interval) end Process.demonitor(ref, [:flush]) - interval = new_catchup_bound_interval.current + interval = new_bound_interval.current Logger.info(fn -> "Checking if index needs to catch up in #{interval}ms" @@ -89,17 +113,19 @@ defmodule Indexer.BlockFetcher.Catchup do Process.send_after(self(), :catchup_index, interval) - %BlockFetcher{state | catchup_bound_interval: new_catchup_bound_interval, catchup_task: nil} + update_in(supervisor_state.catchup, fn state -> + %__MODULE__{state | bound_interval: new_bound_interval, task: nil} + end) end def handle_failure( {:DOWN, ref, :process, pid, reason}, - %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = state + %BlockFetcher.Supervisor{catchup: %__MODULE__{task: %Task{pid: pid, ref: ref}}} = supervisor_state ) do Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end) send(self(), :catchup_index) - %BlockFetcher{state | catchup_task: nil} + put_in(supervisor_state.catchup.task, nil) end end diff --git a/apps/indexer/lib/indexer/block_fetcher/realtime.ex b/apps/indexer/lib/indexer/block_fetcher/realtime.ex index 6756f1eda4..83c9411269 100644 --- a/apps/indexer/lib/indexer/block_fetcher/realtime.ex +++ b/apps/indexer/lib/indexer/block_fetcher/realtime.ex @@ -5,29 +5,46 @@ defmodule Indexer.BlockFetcher.Realtime do require Logger - import Indexer.BlockFetcher, only: [stream_import: 4] + import Indexer.BlockFetcher, only: [stream_import: 1] alias Indexer.{BlockFetcher, Sequence} + @enforce_keys ~w(block_fetcher interval)a + defstruct block_fetcher: nil, + interval: nil, + task_by_ref: %{} + + def new(%{block_fetcher: %BlockFetcher{} = common_block_fetcher, block_interval: block_interval}) do + block_fetcher = %BlockFetcher{ + block_fetcher | blocks_concurrency: 1, broadcast: true} + + interval = div(block_interval, 2) + + %__MODULE__{block_fetcher: block_fetcher, interval: interval} + end + @doc """ Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` `realtime_task_by_ref`. """ - def put(%BlockFetcher{} = state) do - %Task{ref: ref} = realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) + def put(%BlockFetcher.Supervisor{realtime: %__MODULE__{} = state} = supervisor_state) do + %Task{ref: ref} = task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) - put_in(state.realtime_task_by_ref[ref], realtime_task) + put_in(supervisor_state.realtime.task_by_ref[ref], task) end - def task(%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state) do + def task(%__MODULE__{block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher}) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) - {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) - stream_import(state, seq, :realtime_index, max_concurrency: 1) + {:ok, sequence} = Sequence.start_link(first: latest_block_number, step: 2) + stream_import(%BlockFetcher{block_fetcher | sequence: sequence}) end - def handle_success({ref, :ok = result}, %BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state) do - {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) + def handle_success( + {ref, :ok = result}, + %BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state + ) do + {task, running_task_by_ref} = Map.pop(task_by_ref, ref) - case realtime_task do + case task do nil -> Logger.error(fn -> "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> @@ -40,16 +57,16 @@ defmodule Indexer.BlockFetcher.Realtime do Process.demonitor(ref, [:flush]) - %BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref} + put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref) end def handle_failure( {:DOWN, ref, :process, pid, reason}, - %BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state + %BlockFetcher.Supervisor{realtime: %__MODULE__{task_by_ref: task_by_ref}} = supervisor_state ) do - {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) + {task, running_task_by_ref} = Map.pop(task_by_ref, ref) - case realtime_task do + case task do nil -> Logger.error(fn -> "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> @@ -64,6 +81,6 @@ defmodule Indexer.BlockFetcher.Realtime do end) end - %BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref} + put_in(supervisor_state.realtime.task_by_ref, running_task_by_ref) end end diff --git a/apps/indexer/lib/indexer/block_fetcher/supervisor.ex b/apps/indexer/lib/indexer/block_fetcher/supervisor.ex index 0e27095a63..50d2c6fbad 100644 --- a/apps/indexer/lib/indexer/block_fetcher/supervisor.ex +++ b/apps/indexer/lib/indexer/block_fetcher/supervisor.ex @@ -11,6 +11,12 @@ defmodule Indexer.BlockFetcher.Supervisor do alias Indexer.BlockFetcher alias Indexer.BlockFetcher.{Catchup, Realtime} + # milliseconds + @block_interval 5_000 + + @enforce_keys ~w(catchup realtime)a + defstruct ~w(catchup realtime)a + def child_spec(arg) do # The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker` # instead of `:supervisor` and use the wrong shutdown timeout @@ -29,39 +35,50 @@ defmodule Indexer.BlockFetcher.Supervisor do @impl GenServer def init(named_arguments) do - state = BlockFetcher.new(named_arguments) + state = new(named_arguments) send(self(), :catchup_index) - {:ok, _} = :timer.send_interval(state.realtime_interval, :realtime_index) + {:ok, _} = :timer.send_interval(state.realtime.interval, :realtime_index) {:ok, state} end + defp new(named_arguments) do + {given_block_interval, block_fetcher_named_arguments} = Keyword.pop(named_arguments, :block_interval) + block_fetcher = struct!(BlockFetcher, block_fetcher_named_arguments) + block_interval = given_block_interval || @block_interval + + %__MODULE__{ + catchup: Catchup.new(%{block_fetcher: block_fetcher, block_interval: block_interval}), + realtime: Realtime.new(%{block_fetcher: block_fetcher, block_interval: block_interval}) + } + end + @impl GenServer - def handle_info(:catchup_index, %BlockFetcher{} = state) do + def handle_info(:catchup_index, %__MODULE__{} = state) do {:noreply, Catchup.put(state)} end - def handle_info({ref, _} = message, %BlockFetcher{catchup_task: %Task{ref: ref}} = state) do + def handle_info({ref, _} = message, %__MODULE__{catchup: %Catchup{task: %Task{ref: ref}}} = state) do {:noreply, Catchup.handle_success(message, state)} end def handle_info( {:DOWN, ref, :process, pid, _} = message, - %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = state + %__MODULE__{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = state ) do {:noreply, Catchup.handle_failure(message, state)} end - def handle_info(:realtime_index, %BlockFetcher{} = state) do + def handle_info(:realtime_index, %__MODULE__{} = state) do {:noreply, Realtime.put(state)} end - def handle_info({ref, :ok} = message, %BlockFetcher{} = state) when is_reference(ref) do + def handle_info({ref, :ok} = message, %__MODULE__{} = state) when is_reference(ref) do {:noreply, Realtime.handle_success(message, state)} end - def handle_info({:DOWN, _, :process, _, _} = message, %BlockFetcher{} = state) do + def handle_info({:DOWN, _, :process, _, _} = message, %__MODULE__{} = state) do {:noreply, Realtime.handle_failure(message, state)} end end diff --git a/apps/indexer/test/indexer/block_fetcher/supervisor_test.exs b/apps/indexer/test/indexer/block_fetcher_test.exs similarity index 60% rename from apps/indexer/test/indexer/block_fetcher/supervisor_test.exs rename to apps/indexer/test/indexer/block_fetcher_test.exs index 3a61c7d1c4..07c4250284 100644 --- a/apps/indexer/test/indexer/block_fetcher/supervisor_test.exs +++ b/apps/indexer/test/indexer/block_fetcher_test.exs @@ -1,4 +1,4 @@ -defmodule Indexer.BlockFetcher.SupervisorTest do +defmodule Indexer.BlockFetcherTest do # `async: false` due to use of named GenServer use EthereumJSONRPC.Case, async: false use Explorer.DataCase @@ -13,7 +13,6 @@ defmodule Indexer.BlockFetcher.SupervisorTest do BalanceFetcher, AddressBalanceFetcherCase, BlockFetcher, - BoundInterval, BufferedTask, InternalTransactionFetcher, InternalTransactionFetcherCase, @@ -46,342 +45,17 @@ defmodule Indexer.BlockFetcher.SupervisorTest do # ON blocks.hash = transactions.block_hash) as blocks @first_full_block_number 37 - describe "start_link/1" do - test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do - if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do - case Keyword.fetch!(json_rpc_named_arguments, :variant) do - EthereumJSONRPC.Parity -> - block_number = 3_416_888 - block_quantity = integer_to_quantity(block_number) - - EthereumJSONRPC.Mox - |> stub(:json_rpc, fn - # latest block number to seed starting block number for genesis and realtime tasks - %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> - {:ok, - %{ - "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", - "difficulty" => "0xfffffffffffffffffffffffffffffffe", - "extraData" => "0xd583010a068650617269747986312e32362e32826c69", - "gasLimit" => "0x7a1200", - "gasUsed" => "0x0", - "hash" => "0x627baabf5a17c0cfc547b6903ac5e19eaa91f30d9141be1034e3768f6adbc94e", - "logsBloom" => - "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", - "miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", - "number" => block_quantity, - "parentHash" => "0x006edcaa1e6fde822908783bc4ef1ad3675532d542fce53537557391cfe34c3c", - "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", - "sealFields" => [ - "0x841240b30d", - "0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01" - ], - "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "signature" => - "58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01", - "size" => "0x243", - "stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7", - "step" => "306230029", - "timestamp" => "0x5b437f41", - "totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb", - "transactions" => [], - "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", - "uncles" => [] - }} - - [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> - {:ok, - Enum.map(requests, fn %{id: id, params: [block_quantity, true]} -> - %{ - id: id, - jsonrpc: "2.0", - result: %{ - "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", - "difficulty" => "0xfffffffffffffffffffffffffffffffe", - "extraData" => "0xd583010a068650617269747986312e32362e32826c69", - "gasLimit" => "0x7a1200", - "gasUsed" => "0x0", - "hash" => - Explorer.Factory.block_hash() - |> to_string(), - "logsBloom" => - "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", - "miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", - "number" => block_quantity, - "parentHash" => - Explorer.Factory.block_hash() - |> to_string(), - "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", - "sealFields" => [ - "0x841240b30d", - "0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01" - ], - "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "signature" => - "58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01", - "size" => "0x243", - "stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7", - "step" => "306230029", - "timestamp" => "0x5b437f41", - "totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb", - "transactions" => [], - "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", - "uncles" => [] - } - } - end)} - - [%{method: "eth_getBalance"} | _] = requests, _options -> - {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)} - end) - - EthereumJSONRPC.Geth -> - block_number = 5_950_901 - block_quantity = integer_to_quantity(block_number) - - EthereumJSONRPC.Mox - |> stub(:json_rpc, fn - %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> - {:ok, - %{ - "difficulty" => "0xc2550dc5bfc5d", - "extraData" => "0x65746865726d696e652d657538", - "gasLimit" => "0x7a121d", - "gasUsed" => "0x6cc04b", - "hash" => "0x71f484056fec687fd469989426c94c469ff08a28eae9a1865359d64557bb99f6", - "logsBloom" => - "0x900840000041000850020000002800020800840900200210041006005028810880231200c1a0800001003a00011813005102000020800207080210000020014c00888640001040300c180008000084001000010018010040001118181400a06000280428024010081100015008080814141000644404040a8021101010040001001022000000000880420004008000180004000a01002080890010000a0601001a0000410244421002c0000100920100020004000020c10402004080008000203001000200c4001a000002000c0000000100200410090bc52e080900108230000110010082120200000004e01002000500001009e14001002051000040830080", - "miner" => "0xea674fdde714fd979de3edf0f56aa9716b898ec8", - "mixHash" => "0x555275cd0ab4c3b2fe3936843ee25bb67da05ef7dcf17216bc0e382d21d139a0", - "nonce" => "0xa49e42a024600113", - "number" => block_quantity, - "parentHash" => "0xb4357733c59cc6f785542d072a205f4e195f7198f544ea5e01c1b90ef0f914a5", - "receiptsRoot" => "0x17baf8de366fecc1be494bff245be6357ac60a5fe786099dba89983778c8421e", - "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "size" => "0x6c7b", - "stateRoot" => "0x79345c692a0bf363e95c37750336c534309b3f3fe8b59712ac1527118070f488", - "timestamp" => "0x5b475377", - "totalDifficulty" => "0x120258e22c69502fc88", - "transactions" => ["0xa4b58d1d1473f4891d9ff91f624dba73611bf1f6e9a60d3ca2dcfc75d2ab185c"], - "transactionsRoot" => "0x5972b7988f667d7e86679322641117e503ea2c1bc5a27822a8a8120fe53f2c8b", - "uncles" => [] - }} - - [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> - {:ok, - Enum.map(requests, fn %{id: id, params: [block_quantity, true]} -> - %{ - id: id, - jsonrpc: "2.0", - result: %{ - "difficulty" => "0xc22479024e55f", - "extraData" => "0x73656f3130", - "gasLimit" => "0x7a121d", - "gasUsed" => "0x7a0527", - "hash" => - Explorer.Factory.block_hash() - |> to_string(), - "logsBloom" => - "0x006a044c050a6759208088200009808898246808402123144ac15801c09a2672990130000042500000cc6090b063f195352095a88018194112101a02640000a0109c03c40568440b853a800a60044408604bb49d1d604c802008000884520208496608a520992e0f4b41a94188088920c1995107db4696c03839a911500084001009884100605084c4542953b08101103080254c34c802a00042a62f811340400d22080d000c0e39927ca481800c8024048425462000150850500205a224810041904023a80c00dc01040203000086020111210403081096822008c12500a2060a54834800400851210122c481a04a24b5284e9900a08110c180011001c03100", - "miner" => "0xb2930b35844a230f00e51431acae96fe543a0347", - "mixHash" => "0x5e07a58028d2cee7ddbefe245e6d7b5232d997b66cc906b18ad9ad51535ced24", - "nonce" => "0x3d88ebe8031aadf6", - "number" => block_quantity, - "parentHash" => - Explorer.Factory.block_hash() - |> to_string(), - "receiptsRoot" => "0x5294a8b56be40c0c198aa443664e801bb926d49878f96151849f3ddd0cb5e76d", - "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "size" => "0x4796", - "stateRoot" => "0x3755d4b5c9ae3cd58d7a856a46fbe8fb69f0ba93d81e831cd68feb8b61bc3009", - "timestamp" => "0x5b475393", - "totalDifficulty" => "0x120259a450e2527e1e7", - "transactions" => [], - "transactionsRoot" => "0xa71969ed649cd1f21846ab7b4029e79662941cc34cd473aa4590e666920ad2f4", - "uncles" => [] - } - } - end)} - - [%{method: "eth_getBalance"} | _] = requests, _options -> - {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)} - end) - - variant_name -> - raise ArgumentError, "Unsupported variant name (#{variant_name})" - end - end - - {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) - - default_blocks_batch_size = BlockFetcher.default_blocks_batch_size() - - assert latest_block_number > default_blocks_batch_size - - assert Repo.aggregate(Block, :count, :hash) == 0 - - start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) - AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]}) - - first_catchup_block_number = latest_block_number - 1 - - wait_for_results(fn -> - Repo.one!(from(block in Block, where: block.number == ^first_catchup_block_number)) - end) - - assert Repo.aggregate(Block, :count, :hash) >= 1 - - previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size - - wait_for_results(fn -> - Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number)) - end) - - assert Repo.aggregate(Block, :count, :hash) >= default_blocks_batch_size - end - end - - describe "handle_info(:catchup_index, state)" do - setup context do - # force to use `Mox`, so we can manipulate `lastest_block_number` - put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox) - end - - setup :state - - test "increases catchup_bound_interval if no blocks missing", %{ - json_rpc_named_arguments: json_rpc_named_arguments, - state: state - } do - insert(:block, number: 0) - insert(:block, number: 1) - - EthereumJSONRPC.Mox - |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> - {:ok, %{"number" => "0x1"}} - end) - - start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) - AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - - # from `setup :state` - assert_received :catchup_index - - assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} = - BlockFetcher.Supervisor.handle_info(:catchup_index, state) - - assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message - - # DOWN is not flushed - assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) - - assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) - - # DOWN is flushed - assert {:messages, []} = Process.info(self(), :messages) - - assert message_state.catchup_bound_interval.current > catchup_index_state.catchup_bound_interval.current - end - - test "decreases catchup_bound_interval if blocks missing", %{ - json_rpc_named_arguments: json_rpc_named_arguments, - state: state - } do - EthereumJSONRPC.Mox - |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> - {:ok, %{"number" => "0x1"}} - end) - |> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options -> - {:ok, - [ - %{ - id: id, - jsonrpc: "2.0", - result: %{ - "difficulty" => "0x0", - "gasLimit" => "0x0", - "gasUsed" => "0x0", - "hash" => - Explorer.Factory.block_hash() - |> to_string(), - "miner" => "0xb2930b35844a230f00e51431acae96fe543a0347", - "number" => "0x0", - "parentHash" => - Explorer.Factory.block_hash() - |> to_string(), - "size" => "0x0", - "timestamp" => "0x0", - "totalDifficulty" => "0x0", - "transactions" => [] - } - } - ]} - end) - |> stub(:json_rpc, fn [ - %{ - id: id, - method: "eth_getBalance", - params: ["0xb2930b35844a230f00e51431acae96fe543a0347", "0x0"] - } - ], - _options -> - {:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]} - end) - - start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) - AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - - # from `setup :state` - assert_received :catchup_index - - assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} = - BlockFetcher.Supervisor.handle_info(:catchup_index, state) - - # 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for - # catchup_index - assert_receive {^ref, %{first_block_number: 0, missing_block_count: 1}} = message - - # DOWN is not flushed - assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) - - assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) - - # DOWN is flushed - assert {:messages, []} = Process.info(self(), :messages) - - assert message_state.catchup_bound_interval.current == message_state.catchup_bound_interval.minimum - - # When not at minimum it is decreased - - above_minimum_state = update_in(catchup_index_state.catchup_bound_interval, &BoundInterval.increase/1) - - assert above_minimum_state.catchup_bound_interval.current > message_state.catchup_bound_interval.minimum - assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state) - - assert above_minimum_message_state.catchup_bound_interval.current < - above_minimum_state.catchup_bound_interval.current - end - end - - describe "import_range/4" do - setup :state - + describe "import_range/2" do setup %{json_rpc_named_arguments: json_rpc_named_arguments} do start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - {:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments) - %{state: state} + %{block_fetcher: BlockFetcher.new(broadcast: false, json_rpc_named_arguments: json_rpc_named_arguments)} end test "with single element range that is valid imports one block", %{ - json_rpc_named_arguments: json_rpc_named_arguments, - state: state + block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher } do block_number = 0 @@ -496,6 +170,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do end {:ok, sequence} = Sequence.start_link(first: 0, step: 1) + sequenced_block_fetcher = %BlockFetcher{block_fetcher | sequence: sequence} %{address_hash: address_hash, block_hash: block_hash} = case Keyword.fetch!(json_rpc_named_arguments, :variant) do @@ -532,7 +207,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do end log_bad_gateway( - fn -> BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) end, + fn -> BlockFetcher.import_range(sequenced_block_fetcher, block_number..block_number) end, fn result -> assert {:ok, %{ @@ -560,8 +235,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do # Implement when a full block is found for Ethereum Mainnet and remove :no_geth tag @tag :no_geth test "can import range with all synchronous imported schemas", %{ - json_rpc_named_arguments: json_rpc_named_arguments, - state: state + block_fetcher: %BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher } do block_number = @first_full_block_number @@ -733,6 +407,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do end {:ok, sequence} = Sequence.start_link(first: 0, step: 1) + sequenced_block_fetcher = %BlockFetcher{block_fetcher | sequence: sequence} case Keyword.fetch!(json_rpc_named_arguments, :variant) do EthereumJSONRPC.Geth -> @@ -792,7 +467,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do 154, 143, 4, 28, 171, 95, 190, 255, 254, 174, 75, 182>> } ] - }} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) + }} = BlockFetcher.import_range(sequenced_block_fetcher, block_number..block_number) wait_for_tasks(InternalTransactionFetcher) wait_for_tasks(BalanceFetcher) @@ -879,7 +554,7 @@ defmodule Indexer.BlockFetcher.SupervisorTest do 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> } ] - }} = BlockFetcher.import_range(block_number..block_number, state, sequence, :realtime_index) + }} = BlockFetcher.import_range(block_fetcher, block_number..block_number) wait_for_tasks(InternalTransactionFetcher) wait_for_tasks(BalanceFetcher) @@ -905,12 +580,6 @@ defmodule Indexer.BlockFetcher.SupervisorTest do end end - defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do - {:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments) - - %{state: state} - end - defp wait_until(timeout, producer) do parent = self() ref = make_ref() diff --git a/apps/indexer/test/indexer/supervisor_test.exs b/apps/indexer/test/indexer/supervisor_test.exs new file mode 100644 index 0000000000..6bfa45257a --- /dev/null +++ b/apps/indexer/test/indexer/supervisor_test.exs @@ -0,0 +1,349 @@ +defmodule Indexer.BlockFetcher.SupervisorTest do + # `async: false` due to use of named GenServer + use EthereumJSONRPC.Case, async: false + use Explorer.DataCase + + import Mox + import EthereumJSONRPC, only: [integer_to_quantity: 1] + + alias Explorer.Chain.Block + alias Indexer.{AddressBalanceFetcherCase, BlockFetcher, BoundInterval, InternalTransactionFetcherCase} + alias Indexer.BlockFetcher.Catchup + + @moduletag capture_log: true + + # MUST use global mode because we aren't guaranteed to get `start_supervised`'s pid back fast enough to `allow` it to + # use expectations and stubs from test's pid. + setup :set_mox_global + + setup :verify_on_exit! + + describe "start_link/1" do + test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do + if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do + case Keyword.fetch!(json_rpc_named_arguments, :variant) do + EthereumJSONRPC.Parity -> + block_number = 3_416_888 + block_quantity = integer_to_quantity(block_number) + + EthereumJSONRPC.Mox + |> stub(:json_rpc, fn + # latest block number to seed starting block number for genesis and realtime tasks + %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, + %{ + "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "difficulty" => "0xfffffffffffffffffffffffffffffffe", + "extraData" => "0xd583010a068650617269747986312e32362e32826c69", + "gasLimit" => "0x7a1200", + "gasUsed" => "0x0", + "hash" => "0x627baabf5a17c0cfc547b6903ac5e19eaa91f30d9141be1034e3768f6adbc94e", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "number" => block_quantity, + "parentHash" => "0x006edcaa1e6fde822908783bc4ef1ad3675532d542fce53537557391cfe34c3c", + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x841240b30d", + "0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01", + "size" => "0x243", + "stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7", + "step" => "306230029", + "timestamp" => "0x5b437f41", + "totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + }} + + [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> + {:ok, + Enum.map(requests, fn %{id: id, params: [block_quantity, true]} -> + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "difficulty" => "0xfffffffffffffffffffffffffffffffe", + "extraData" => "0xd583010a068650617269747986312e32362e32826c69", + "gasLimit" => "0x7a1200", + "gasUsed" => "0x0", + "hash" => + Explorer.Factory.block_hash() + |> to_string(), + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", + "number" => block_quantity, + "parentHash" => + Explorer.Factory.block_hash() + |> to_string(), + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x841240b30d", + "0xb84158bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "58bc4fa5891934bc94c5dca0301867ce4f35925ef46ea187496162668210bba61b4cda09d7e0dca2f1dd041fad498ced6697aeef72656927f52c55b630f2591c01", + "size" => "0x243", + "stateRoot" => "0x9a8111062667f7b162851a1cbbe8aece5ff12e761b3dcee93b787fcc12548cf7", + "step" => "306230029", + "timestamp" => "0x5b437f41", + "totalDifficulty" => "0x342337ffffffffffffffffffffffffed8d29bb", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + } + } + end)} + + [%{method: "eth_getBalance"} | _] = requests, _options -> + {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)} + end) + + EthereumJSONRPC.Geth -> + block_number = 5_950_901 + block_quantity = integer_to_quantity(block_number) + + EthereumJSONRPC.Mox + |> stub(:json_rpc, fn + %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, + %{ + "difficulty" => "0xc2550dc5bfc5d", + "extraData" => "0x65746865726d696e652d657538", + "gasLimit" => "0x7a121d", + "gasUsed" => "0x6cc04b", + "hash" => "0x71f484056fec687fd469989426c94c469ff08a28eae9a1865359d64557bb99f6", + "logsBloom" => + "0x900840000041000850020000002800020800840900200210041006005028810880231200c1a0800001003a00011813005102000020800207080210000020014c00888640001040300c180008000084001000010018010040001118181400a06000280428024010081100015008080814141000644404040a8021101010040001001022000000000880420004008000180004000a01002080890010000a0601001a0000410244421002c0000100920100020004000020c10402004080008000203001000200c4001a000002000c0000000100200410090bc52e080900108230000110010082120200000004e01002000500001009e14001002051000040830080", + "miner" => "0xea674fdde714fd979de3edf0f56aa9716b898ec8", + "mixHash" => "0x555275cd0ab4c3b2fe3936843ee25bb67da05ef7dcf17216bc0e382d21d139a0", + "nonce" => "0xa49e42a024600113", + "number" => block_quantity, + "parentHash" => "0xb4357733c59cc6f785542d072a205f4e195f7198f544ea5e01c1b90ef0f914a5", + "receiptsRoot" => "0x17baf8de366fecc1be494bff245be6357ac60a5fe786099dba89983778c8421e", + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size" => "0x6c7b", + "stateRoot" => "0x79345c692a0bf363e95c37750336c534309b3f3fe8b59712ac1527118070f488", + "timestamp" => "0x5b475377", + "totalDifficulty" => "0x120258e22c69502fc88", + "transactions" => ["0xa4b58d1d1473f4891d9ff91f624dba73611bf1f6e9a60d3ca2dcfc75d2ab185c"], + "transactionsRoot" => "0x5972b7988f667d7e86679322641117e503ea2c1bc5a27822a8a8120fe53f2c8b", + "uncles" => [] + }} + + [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> + {:ok, + Enum.map(requests, fn %{id: id, params: [block_quantity, true]} -> + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "difficulty" => "0xc22479024e55f", + "extraData" => "0x73656f3130", + "gasLimit" => "0x7a121d", + "gasUsed" => "0x7a0527", + "hash" => + Explorer.Factory.block_hash() + |> to_string(), + "logsBloom" => + "0x006a044c050a6759208088200009808898246808402123144ac15801c09a2672990130000042500000cc6090b063f195352095a88018194112101a02640000a0109c03c40568440b853a800a60044408604bb49d1d604c802008000884520208496608a520992e0f4b41a94188088920c1995107db4696c03839a911500084001009884100605084c4542953b08101103080254c34c802a00042a62f811340400d22080d000c0e39927ca481800c8024048425462000150850500205a224810041904023a80c00dc01040203000086020111210403081096822008c12500a2060a54834800400851210122c481a04a24b5284e9900a08110c180011001c03100", + "miner" => "0xb2930b35844a230f00e51431acae96fe543a0347", + "mixHash" => "0x5e07a58028d2cee7ddbefe245e6d7b5232d997b66cc906b18ad9ad51535ced24", + "nonce" => "0x3d88ebe8031aadf6", + "number" => block_quantity, + "parentHash" => + Explorer.Factory.block_hash() + |> to_string(), + "receiptsRoot" => "0x5294a8b56be40c0c198aa443664e801bb926d49878f96151849f3ddd0cb5e76d", + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size" => "0x4796", + "stateRoot" => "0x3755d4b5c9ae3cd58d7a856a46fbe8fb69f0ba93d81e831cd68feb8b61bc3009", + "timestamp" => "0x5b475393", + "totalDifficulty" => "0x120259a450e2527e1e7", + "transactions" => [], + "transactionsRoot" => "0xa71969ed649cd1f21846ab7b4029e79662941cc34cd473aa4590e666920ad2f4", + "uncles" => [] + } + } + end)} + + [%{method: "eth_getBalance"} | _] = requests, _options -> + {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, jsonrpc: "2.0", result: "0x0"} end)} + end) + + variant_name -> + raise ArgumentError, "Unsupported variant name (#{variant_name})" + end + end + + {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) + + default_blocks_batch_size = BlockFetcher.default_blocks_batch_size() + + assert latest_block_number > default_blocks_batch_size + + assert Repo.aggregate(Block, :count, :hash) == 0 + + start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]}) + + first_catchup_block_number = latest_block_number - 1 + + wait_for_results(fn -> + Repo.one!(from(block in Block, where: block.number == ^first_catchup_block_number)) + end) + + assert Repo.aggregate(Block, :count, :hash) >= 1 + + previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size + + wait_for_results(fn -> + Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number)) + end) + + assert Repo.aggregate(Block, :count, :hash) >= default_blocks_batch_size + end + end + + describe "handle_info(:catchup_index, state)" do + setup context do + # force to use `Mox`, so we can manipulate `lastest_block_number` + put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox) + end + + setup :state + + test "increases catchup_bound_interval if no blocks missing", %{ + json_rpc_named_arguments: json_rpc_named_arguments, + state: state + } do + insert(:block, number: 0) + insert(:block, number: 1) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, %{"number" => "0x1"}} + end) + + start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + # from `setup :state` + assert_received :catchup_index + + assert {:noreply, + %BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} = + BlockFetcher.Supervisor.handle_info(:catchup_index, state) + + assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message + + # DOWN is not flushed + assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) + + assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) + + # DOWN is flushed + assert {:messages, []} = Process.info(self(), :messages) + + assert message_state.catchup.bound_interval.current > catchup_index_state.catchup.bound_interval.current + end + + test "decreases catchup_bound_interval if blocks missing", %{ + json_rpc_named_arguments: json_rpc_named_arguments, + state: state + } do + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, %{"number" => "0x1"}} + end) + |> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options -> + {:ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "difficulty" => "0x0", + "gasLimit" => "0x0", + "gasUsed" => "0x0", + "hash" => + Explorer.Factory.block_hash() + |> to_string(), + "miner" => "0xb2930b35844a230f00e51431acae96fe543a0347", + "number" => "0x0", + "parentHash" => + Explorer.Factory.block_hash() + |> to_string(), + "size" => "0x0", + "timestamp" => "0x0", + "totalDifficulty" => "0x0", + "transactions" => [] + } + } + ]} + end) + |> stub(:json_rpc, fn [ + %{ + id: id, + method: "eth_getBalance", + params: ["0xb2930b35844a230f00e51431acae96fe543a0347", "0x0"] + } + ], + _options -> + {:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]} + end) + + start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + # from `setup :state` + assert_received :catchup_index + + assert {:noreply, + %BlockFetcher.Supervisor{catchup: %Catchup{task: %Task{pid: pid, ref: ref}}} = catchup_index_state} = + BlockFetcher.Supervisor.handle_info(:catchup_index, state) + + # 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for + # catchup_index + assert_receive {^ref, %{first_block_number: 0, missing_block_count: 1}} = message + + # DOWN is not flushed + assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) + + assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) + + # DOWN is flushed + assert {:messages, []} = Process.info(self(), :messages) + + assert message_state.catchup.bound_interval.current == message_state.catchup.bound_interval.minimum + + # When not at minimum it is decreased + + above_minimum_state = update_in(catchup_index_state.catchup.bound_interval, &BoundInterval.increase/1) + + assert above_minimum_state.catchup.bound_interval.current > message_state.catchup.bound_interval.minimum + assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state) + + assert above_minimum_message_state.catchup.bound_interval.current < + above_minimum_state.catchup.bound_interval.current + end + end + + defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do + {:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments) + + %{state: state} + end +end