diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 823eb01084..2fe51a6ef1 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -11,7 +11,7 @@ defmodule Indexer.BlockFetcher do alias EthereumJSONRPC alias Explorer.Chain - alias Indexer.{BalanceFetcher, AddressExtraction, InternalTransactionFetcher, Sequence} + alias Indexer.{BalanceFetcher, AddressExtraction, BoundInterval, InternalTransactionFetcher, Sequence} # dialyzer thinks that Logger.debug functions always have no_local_return @dialyzer {:nowarn_function, import_range: 3} @@ -60,6 +60,17 @@ defmodule Indexer.BlockFetcher do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end + defstruct json_rpc_named_arguments: [], + catchup_task: nil, + catchup_block_number: nil, + catchup_bound_interval: nil, + realtime_tasks: [], + realtime_interval: nil, + blocks_batch_size: @blocks_batch_size, + blocks_concurrency: @blocks_concurrency, + receipts_batch_size: @receipts_batch_size, + receipts_concurrency: @receipts_concurrency + @impl GenServer def init(opts) do opts = @@ -67,48 +78,81 @@ defmodule Indexer.BlockFetcher do |> Application.get_all_env() |> Keyword.merge(opts) - state = %{ + interval = div(opts[:block_interval] || @block_interval, 2) + + state = %__MODULE__{ json_rpc_named_arguments: Keyword.fetch!(opts, :json_rpc_named_arguments), - catchup_task: nil, - realtime_tasks: [], - realtime_interval: div(opts[:block_interval] || @block_interval, 2), + catchup_bound_interval: BoundInterval.within(interval..(interval * 10)), + realtime_interval: interval, blocks_batch_size: Keyword.get(opts, :blocks_batch_size, @blocks_batch_size), blocks_concurrency: Keyword.get(opts, :blocks_concurrency, @blocks_concurrency), receipts_batch_size: Keyword.get(opts, :receipts_batch_size, @receipts_batch_size), receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency) } + send(self(), :catchup_index) {:ok, _} = :timer.send_interval(state.realtime_interval, :realtime_index) - {:ok, schedule_next_catchup_index(state)} + {:ok, state} end @impl GenServer - def handle_info(:catchup_index, %{} = state) do + def handle_info(:catchup_index, %__MODULE__{} = state) do catchup_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> catchup_task(state) end) {:noreply, %{state | catchup_task: catchup_task}} end - def handle_info(:realtime_index, %{realtime_tasks: realtime_tasks} = state) when is_list(realtime_tasks) do + def handle_info(:realtime_index, %__MODULE__{realtime_tasks: realtime_tasks} = state) when is_list(realtime_tasks) do realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> realtime_task(state) end) {:noreply, %{state | realtime_tasks: [realtime_task | realtime_tasks]}} end - def handle_info({:DOWN, ref, :process, pid, :normal}, %{catchup_task: %Task{pid: pid, ref: ref}} = state) do - Logger.info(fn -> "Finished index down to genesis. Transitioning to only realtime index." end) + def handle_info( + {ref, missing_block_count}, + %__MODULE__{ + catchup_block_number: catchup_block_number, + catchup_bound_interval: catchup_bound_interval, + catchup_task: %Task{ref: ref} + } = state + ) + when is_integer(missing_block_count) do + new_catchup_bound_interval = + case missing_block_count do + 0 -> + Logger.info("Index already caught up in #{catchup_block_number}-0") + + BoundInterval.increase(catchup_bound_interval) + + _ -> + Logger.info("Index had to catch up #{missing_block_count} blocks in #{catchup_block_number}-0") + + BoundInterval.decrease(catchup_bound_interval) + end + + Process.demonitor(ref, [:flush]) + + interval = new_catchup_bound_interval.current + + Logger.info(fn -> + "Checking if index needs to catch up in #{interval}ms" + end) + + Process.send_after(self(), :catchup_index, interval) - {:noreply, %{state | catchup_task: nil}} + {:noreply, %{state | catchup_bound_interval: new_catchup_bound_interval, catchup_task: nil}} end - def handle_info({:DOWN, ref, :process, pid, reason}, %{catchup_task: %Task{pid: pid, ref: ref}} = state) do - Logger.error(fn -> "catchup index stream exited with reason (#{inspect(reason)}). Restarting" end) + def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{catchup_task: %Task{pid: pid, ref: ref}} = state) do + Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end) - {:noreply, schedule_next_catchup_index(%{state | catchup_task: nil})} + send(self(), :catchup_index) + + {:noreply, %__MODULE__{state | catchup_task: nil}} end - def handle_info({:DOWN, ref, :process, pid, reason}, %{realtime_tasks: realtime_tasks} = state) + def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{realtime_tasks: realtime_tasks} = state) when is_list(realtime_tasks) do {down_realtime_tasks, running_realtime_tasks} = Enum.split_with(realtime_tasks, fn @@ -127,7 +171,7 @@ defmodule Indexer.BlockFetcher do Logger.error(fn -> "Unexpected pid (#{inspect(pid)}) exited with reason (#{inspect(reason)})." end) end - {:noreply, %{state | realtime_tasks: running_realtime_tasks}} + {:noreply, %__MODULE__{state | realtime_tasks: running_realtime_tasks}} end defp cap_seq(seq, next, range) do @@ -145,9 +189,12 @@ defmodule Indexer.BlockFetcher do :ok end - defp fetch_transaction_receipts(_state, []), do: {:ok, %{logs: [], receipts: []}} + defp fetch_transaction_receipts(%__MODULE__{} = _state, []), do: {:ok, %{logs: [], receipts: []}} - defp fetch_transaction_receipts(%{json_rpc_named_arguments: json_rpc_named_arguments} = state, transaction_params) do + defp fetch_transaction_receipts( + %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, + transaction_params + ) do debug(fn -> "fetching #{length(transaction_params)} transaction receipts" end) stream_opts = [max_concurrency: state.receipts_concurrency, timeout: :infinity] @@ -166,17 +213,42 @@ defmodule Indexer.BlockFetcher do end) end - defp catchup_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do + # Returns number of missing blocks that had to be caught up + defp catchup_task(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) - missing_ranges = Chain.missing_block_number_ranges(latest_block_number..0) - count = Enum.count(missing_ranges) - debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end) + case latest_block_number do + # let realtime indexer get the genesis block + 0 -> + 0 + + _ -> + # realtime indexer gets the current latest block + first = latest_block_number - 1 + last = 0 + missing_ranges = Chain.missing_block_number_ranges(first..last) + range_count = Enum.count(missing_ranges) + + missing_block_count = + missing_ranges + |> Stream.map(&Enum.count/1) + |> Enum.sum() + + debug(fn -> "#{missing_block_count} missed blocks in #{range_count} ranges between #{first} and #{last}" end) - {:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size) - Sequence.cap(seq) + case missing_block_count do + 0 -> + :ok - stream_import(state, seq, max_concurrency: state.blocks_concurrency) + _ -> + {:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size) + Sequence.cap(seq) + + stream_import(state, seq, max_concurrency: state.blocks_concurrency) + end + + missing_block_count + end end defp insert(seq, range, options) when is_list(options) do @@ -253,13 +325,13 @@ defmodule Indexer.BlockFetcher do |> InternalTransactionFetcher.async_fetch(10_000) end - defp realtime_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do + defp realtime_task(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) stream_import(state, seq, max_concurrency: 1) end - defp stream_import(state, seq, task_opts) do + defp stream_import(%__MODULE__{} = state, seq, task_opts) do seq |> Sequence.build_stream() |> Task.async_stream( @@ -272,7 +344,7 @@ defmodule Indexer.BlockFetcher do # Run at state.blocks_concurrency max_concurrency when called by `stream_import/3` # Only public for testing @doc false - def import_range(range, %{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq) do + def import_range(range, %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq) do with {:blocks, {:ok, next, result}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, %{blocks: blocks, transactions: transactions_without_receipts} = result, @@ -321,9 +393,4 @@ defmodule Indexer.BlockFetcher do Map.merge(transaction_params, Map.fetch!(transaction_hash_to_receipt_params, transaction_hash)) end) end - - defp schedule_next_catchup_index(state) do - send(self(), :catchup_index) - state - end end diff --git a/apps/indexer/lib/indexer/bound_interval.ex b/apps/indexer/lib/indexer/bound_interval.ex new file mode 100644 index 0000000000..2b09b5c097 --- /dev/null +++ b/apps/indexer/lib/indexer/bound_interval.ex @@ -0,0 +1,31 @@ +defmodule Indexer.BoundInterval do + @moduledoc """ + An interval for `Process.send_after` that is restricted to being between a `minimum` and `maximum` value + """ + + @enforce_keys ~w(maximum)a + defstruct minimum: 1, + current: 1, + maximum: nil + + def within(minimum..maximum) when is_integer(minimum) and is_integer(maximum) and minimum <= maximum do + %__MODULE__{minimum: minimum, current: minimum, maximum: maximum} + end + + def decrease(%__MODULE__{minimum: minimum, current: current} = bound_interval) + when is_integer(minimum) and is_integer(current) do + new_current = + current + |> div(2) + |> max(minimum) + + %__MODULE__{bound_interval | current: new_current} + end + + def increase(%__MODULE__{current: current, maximum: maximum} = bound_interval) + when is_integer(current) and is_integer(maximum) do + new_current = min(current * 2, maximum) + + %__MODULE__{bound_interval | current: new_current} + end +end diff --git a/apps/indexer/test/indexer/block_fetcher_test.exs b/apps/indexer/test/indexer/block_fetcher_test.exs index 79c9542eb3..d54ac986c3 100644 --- a/apps/indexer/test/indexer/block_fetcher_test.exs +++ b/apps/indexer/test/indexer/block_fetcher_test.exs @@ -13,6 +13,7 @@ defmodule Indexer.BlockFetcherTest do BalanceFetcher, AddressBalanceFetcherCase, BlockFetcher, + BoundInterval, BufferedTask, InternalTransactionFetcher, InternalTransactionFetcherCase, @@ -224,13 +225,15 @@ defmodule Indexer.BlockFetcherTest do InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) start_supervised!({BlockFetcher, json_rpc_named_arguments: json_rpc_named_arguments}) + first_catchup_block_number = latest_block_number - 1 + wait_for_results(fn -> - Repo.one!(from(block in Block, where: block.number == ^latest_block_number)) + Repo.one!(from(block in Block, where: block.number == ^first_catchup_block_number)) end) assert Repo.aggregate(Block, :count, :hash) >= 1 - previous_batch_block_number = latest_block_number - default_blocks_batch_size + previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size wait_for_results(fn -> Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number)) @@ -240,6 +243,130 @@ defmodule Indexer.BlockFetcherTest do end end + describe "handle_info(:catchup_index, state)" do + setup context do + # force to use `Mox`, so we can manipulate `lastest_block_number` + put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox) + end + + setup :state + + test "increases catchup_bound_interval if no blocks missing", %{ + json_rpc_named_arguments: json_rpc_named_arguments, + state: state + } do + insert(:block, number: 0) + insert(:block, number: 1) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, %{"number" => "0x1"}} + end) + + start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + # from `setup :state` + assert_received :catchup_index + + assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} = + BlockFetcher.handle_info(:catchup_index, state) + + assert_receive {^ref, 0} = message + + # DOWN is not flushed + assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) + + assert {:noreply, message_state} = BlockFetcher.handle_info(message, catchup_index_state) + + # DOWN is flushed + assert {:messages, []} = Process.info(self(), :messages) + + assert message_state.catchup_bound_interval.current > catchup_index_state.catchup_bound_interval.current + end + + test "decreases catchup_bound_interval if blocks missing", %{ + json_rpc_named_arguments: json_rpc_named_arguments, + state: state + } do + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options -> + {:ok, %{"number" => "0x1"}} + end) + |> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options -> + {:ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: %{ + "difficulty" => "0x0", + "gasLimit" => "0x0", + "gasUsed" => "0x0", + "hash" => + Explorer.Factory.block_hash() + |> to_string(), + "miner" => "0xb2930b35844a230f00e51431acae96fe543a0347", + "number" => "0x0", + "parentHash" => + Explorer.Factory.block_hash() + |> to_string(), + "size" => "0x0", + "timestamp" => "0x0", + "totalDifficulty" => "0x0", + "transactions" => [] + } + } + ]} + end) + |> stub(:json_rpc, fn [ + %{ + id: id, + method: "eth_getBalance", + params: ["0xb2930b35844a230f00e51431acae96fe543a0347", "0x0"] + } + ], + _options -> + {:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]} + end) + + start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + # from `setup :state` + assert_received :catchup_index + + assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} = + BlockFetcher.handle_info(:catchup_index, state) + + # 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for + # catchup_index + assert_receive {^ref, 1} = message + + # DOWN is not flushed + assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) + + assert {:noreply, message_state} = BlockFetcher.handle_info(message, catchup_index_state) + + # DOWN is flushed + assert {:messages, []} = Process.info(self(), :messages) + + assert message_state.catchup_bound_interval.current == message_state.catchup_bound_interval.minimum + + # When not at minimum it is decreased + + above_minimum_state = update_in(catchup_index_state.catchup_bound_interval, &BoundInterval.increase/1) + + assert above_minimum_state.catchup_bound_interval.current > message_state.catchup_bound_interval.minimum + assert {:noreply, above_minimum_message_state} = BlockFetcher.handle_info(message, above_minimum_state) + + assert above_minimum_message_state.catchup_bound_interval.current < + above_minimum_state.catchup_bound_interval.current + end + end + describe "import_range/3" do setup :state