diff --git a/apps/indexer/lib/indexer/application.ex b/apps/indexer/lib/indexer/application.ex index 9d1e2badb5..657ddcd45e 100644 --- a/apps/indexer/lib/indexer/application.ex +++ b/apps/indexer/lib/indexer/application.ex @@ -11,13 +11,21 @@ defmodule Indexer.Application do def start(_type, _args) do json_rpc_named_arguments = Application.fetch_env!(:indexer, :json_rpc_named_arguments) + block_fetcher_supervisor_named_arguments = + :indexer + |> Application.get_all_env() + |> Keyword.take( + ~w(blocks_batch_size blocks_concurrency block_interval json_rpc_named_arguments receipts_batch_size + receipts_concurrency)a + ) + children = [ {Task.Supervisor, name: Indexer.TaskSupervisor}, {BalanceFetcher, name: BalanceFetcher, json_rpc_named_arguments: json_rpc_named_arguments}, {PendingTransactionFetcher, name: PendingTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments}, {InternalTransactionFetcher, name: InternalTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments}, - {BlockFetcher, []} + {BlockFetcher.Supervisor, [block_fetcher_supervisor_named_arguments, [name: BlockFetcher.Supervisor]]} ] opts = [strategy: :one_for_one, name: Indexer.Supervisor] diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 198bcc9643..30c5d755eb 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -3,15 +3,12 @@ defmodule Indexer.BlockFetcher do Fetches and indexes block ranges from gensis to realtime. """ - use GenServer - require Logger import Indexer, only: [debug: 1] alias Explorer.Chain - alias Indexer.{BalanceFetcher, AddressExtraction, BoundInterval, InternalTransactionFetcher, Sequence} - alias Indexer.BlockFetcher.{Catchup, Realtime} + alias Indexer.{AddressExtraction, BalanceFetcher, BoundInterval, InternalTransactionFetcher, Sequence} # dialyzer thinks that Logger.debug functions always have no_local_return @dialyzer {:nowarn_function, import_range: 4} @@ -42,11 +39,12 @@ defmodule Indexer.BlockFetcher do def default_blocks_batch_size, do: @blocks_batch_size @doc """ - Starts the server. + Required named arguments - ## Options + * `:json_rpc_named_arguments` - `t:EthereumJSONRPC.json_rpc_named_arguments/0` passed to + `EthereumJSONRPC.json_rpc/2`. - Default options are pulled from application config under the :indexer` keyspace. The follow options can be overridden: + The follow options can be overridden: * `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to `#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions @@ -66,8 +64,17 @@ defmodule Indexer.BlockFetcher do `#{@blocks_concurrency * @receipts_concurrency * @receipts_batch_size}`) receipts can be requested from the JSONRPC at once over all connections. *Each transaction only has one receipt.* """ - def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) + def new(named_arguments) when is_list(named_arguments) do + interval = div(named_arguments[:block_interval] || @block_interval, 2) + + state = struct!(__MODULE__, Keyword.delete(named_arguments, :block_interval)) + + %__MODULE__{ + state + | json_rpc_named_arguments: Keyword.fetch!(named_arguments, :json_rpc_named_arguments), + catchup_bound_interval: BoundInterval.within(interval..(interval * 10)), + realtime_interval: interval + } end def stream_import(%__MODULE__{} = state, seq, indexer_mode, task_opts) do @@ -80,59 +87,6 @@ defmodule Indexer.BlockFetcher do |> Stream.run() end - @impl GenServer - def init(opts) do - opts = - :indexer - |> Application.get_all_env() - |> Keyword.merge(opts) - - interval = div(opts[:block_interval] || @block_interval, 2) - - state = %__MODULE__{ - json_rpc_named_arguments: Keyword.fetch!(opts, :json_rpc_named_arguments), - catchup_bound_interval: BoundInterval.within(interval..(interval * 10)), - realtime_interval: interval, - blocks_batch_size: Keyword.get(opts, :blocks_batch_size, @blocks_batch_size), - blocks_concurrency: Keyword.get(opts, :blocks_concurrency, @blocks_concurrency), - receipts_batch_size: Keyword.get(opts, :receipts_batch_size, @receipts_batch_size), - receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency) - } - - send(self(), :catchup_index) - {:ok, _} = :timer.send_interval(state.realtime_interval, :realtime_index) - - {:ok, state} - end - - @impl GenServer - def handle_info(:catchup_index, %__MODULE__{} = state) do - {:noreply, Catchup.put(state)} - end - - def handle_info({ref, _} = message, %__MODULE__{catchup_task: %Task{ref: ref}} = state) do - {:noreply, Catchup.handle_success(message, state)} - end - - def handle_info( - {:DOWN, ref, :process, pid, _} = message, - %__MODULE__{catchup_task: %Task{pid: pid, ref: ref}} = state - ) do - {:noreply, Catchup.handle_failure(message, state)} - end - - def handle_info(:realtime_index, %__MODULE__{} = state) do - {:noreply, Realtime.put(state)} - end - - def handle_info({ref, :ok} = message, %__MODULE__{} = state) when is_reference(ref) do - {:noreply, Realtime.handle_success(message, state)} - end - - def handle_info({:DOWN, _, :process, _, _} = message, %__MODULE__{} = state) do - {:noreply, Realtime.handle_failure(message, state)} - end - defp cap_seq(seq, next, range) do case next do :more -> diff --git a/apps/indexer/lib/indexer/block_fetcher/supervisor.ex b/apps/indexer/lib/indexer/block_fetcher/supervisor.ex new file mode 100644 index 0000000000..0e27095a63 --- /dev/null +++ b/apps/indexer/lib/indexer/block_fetcher/supervisor.ex @@ -0,0 +1,67 @@ +defmodule Indexer.BlockFetcher.Supervisor do + @moduledoc """ + Supervises the `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`. + """ + + # NOT a `Supervisor` because of the `Task` restart strategies are custom. + use GenServer + + require Logger + + alias Indexer.BlockFetcher + alias Indexer.BlockFetcher.{Catchup, Realtime} + + def child_spec(arg) do + # The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker` + # instead of `:supervisor` and use the wrong shutdown timeout + Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, []) + end + + @doc """ + Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`. + + For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`. + """ + @spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid} + def start_link([named_arguments, gen_server_options]) when is_list(named_arguments) and is_list(gen_server_options) do + GenServer.start_link(__MODULE__, named_arguments, gen_server_options) + end + + @impl GenServer + def init(named_arguments) do + state = BlockFetcher.new(named_arguments) + + send(self(), :catchup_index) + {:ok, _} = :timer.send_interval(state.realtime_interval, :realtime_index) + + {:ok, state} + end + + @impl GenServer + def handle_info(:catchup_index, %BlockFetcher{} = state) do + {:noreply, Catchup.put(state)} + end + + def handle_info({ref, _} = message, %BlockFetcher{catchup_task: %Task{ref: ref}} = state) do + {:noreply, Catchup.handle_success(message, state)} + end + + def handle_info( + {:DOWN, ref, :process, pid, _} = message, + %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = state + ) do + {:noreply, Catchup.handle_failure(message, state)} + end + + def handle_info(:realtime_index, %BlockFetcher{} = state) do + {:noreply, Realtime.put(state)} + end + + def handle_info({ref, :ok} = message, %BlockFetcher{} = state) when is_reference(ref) do + {:noreply, Realtime.handle_success(message, state)} + end + + def handle_info({:DOWN, _, :process, _, _} = message, %BlockFetcher{} = state) do + {:noreply, Realtime.handle_failure(message, state)} + end +end diff --git a/apps/indexer/test/indexer/block_fetcher_test.exs b/apps/indexer/test/indexer/block_fetcher/supervisor_test.exs similarity index 98% rename from apps/indexer/test/indexer/block_fetcher_test.exs rename to apps/indexer/test/indexer/block_fetcher/supervisor_test.exs index 6dbd640405..3a61c7d1c4 100644 --- a/apps/indexer/test/indexer/block_fetcher_test.exs +++ b/apps/indexer/test/indexer/block_fetcher/supervisor_test.exs @@ -1,4 +1,4 @@ -defmodule Indexer.BlockFetcherTest do +defmodule Indexer.BlockFetcher.SupervisorTest do # `async: false` due to use of named GenServer use EthereumJSONRPC.Case, async: false use Explorer.DataCase @@ -223,7 +223,7 @@ defmodule Indexer.BlockFetcherTest do start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - start_supervised!({BlockFetcher, json_rpc_named_arguments: json_rpc_named_arguments}) + start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]}) first_catchup_block_number = latest_block_number - 1 @@ -271,14 +271,14 @@ defmodule Indexer.BlockFetcherTest do assert_received :catchup_index assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} = - BlockFetcher.handle_info(:catchup_index, state) + BlockFetcher.Supervisor.handle_info(:catchup_index, state) assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message # DOWN is not flushed assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) - assert {:noreply, message_state} = BlockFetcher.handle_info(message, catchup_index_state) + assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) # DOWN is flushed assert {:messages, []} = Process.info(self(), :messages) @@ -339,7 +339,7 @@ defmodule Indexer.BlockFetcherTest do assert_received :catchup_index assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} = - BlockFetcher.handle_info(:catchup_index, state) + BlockFetcher.Supervisor.handle_info(:catchup_index, state) # 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for # catchup_index @@ -348,7 +348,7 @@ defmodule Indexer.BlockFetcherTest do # DOWN is not flushed assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages) - assert {:noreply, message_state} = BlockFetcher.handle_info(message, catchup_index_state) + assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state) # DOWN is flushed assert {:messages, []} = Process.info(self(), :messages) @@ -360,7 +360,7 @@ defmodule Indexer.BlockFetcherTest do above_minimum_state = update_in(catchup_index_state.catchup_bound_interval, &BoundInterval.increase/1) assert above_minimum_state.catchup_bound_interval.current > message_state.catchup_bound_interval.minimum - assert {:noreply, above_minimum_message_state} = BlockFetcher.handle_info(message, above_minimum_state) + assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state) assert above_minimum_message_state.catchup_bound_interval.current < above_minimum_state.catchup_bound_interval.current @@ -374,7 +374,7 @@ defmodule Indexer.BlockFetcherTest do start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) - {:ok, state} = BlockFetcher.init(json_rpc_named_arguments: json_rpc_named_arguments) + {:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments) %{state: state} end @@ -906,7 +906,7 @@ defmodule Indexer.BlockFetcherTest do end defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do - {:ok, state} = BlockFetcher.init(json_rpc_named_arguments: json_rpc_named_arguments) + {:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments) %{state: state} end