Separate supervision and fetching parts of BlockFetcher

Move supervision aspect to BlockFetcher.Supervisor and leave only the
actual fetching code in BlockFetcher.
pull/489/head
Luke Imhoff 6 years ago
parent e0fed23fad
commit 2333cc6cf5
  1. 10
      apps/indexer/lib/indexer/application.ex
  2. 78
      apps/indexer/lib/indexer/block_fetcher.ex
  3. 67
      apps/indexer/lib/indexer/block_fetcher/supervisor.ex
  4. 18
      apps/indexer/test/indexer/block_fetcher/supervisor_test.exs

@ -11,13 +11,21 @@ defmodule Indexer.Application do
def start(_type, _args) do
json_rpc_named_arguments = Application.fetch_env!(:indexer, :json_rpc_named_arguments)
block_fetcher_supervisor_named_arguments =
:indexer
|> Application.get_all_env()
|> Keyword.take(
~w(blocks_batch_size blocks_concurrency block_interval json_rpc_named_arguments receipts_batch_size
receipts_concurrency)a
)
children = [
{Task.Supervisor, name: Indexer.TaskSupervisor},
{BalanceFetcher, name: BalanceFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{PendingTransactionFetcher, name: PendingTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{InternalTransactionFetcher,
name: InternalTransactionFetcher, json_rpc_named_arguments: json_rpc_named_arguments},
{BlockFetcher, []}
{BlockFetcher.Supervisor, [block_fetcher_supervisor_named_arguments, [name: BlockFetcher.Supervisor]]}
]
opts = [strategy: :one_for_one, name: Indexer.Supervisor]

@ -3,15 +3,12 @@ defmodule Indexer.BlockFetcher do
Fetches and indexes block ranges from gensis to realtime.
"""
use GenServer
require Logger
import Indexer, only: [debug: 1]
alias Explorer.Chain
alias Indexer.{BalanceFetcher, AddressExtraction, BoundInterval, InternalTransactionFetcher, Sequence}
alias Indexer.BlockFetcher.{Catchup, Realtime}
alias Indexer.{AddressExtraction, BalanceFetcher, BoundInterval, InternalTransactionFetcher, Sequence}
# dialyzer thinks that Logger.debug functions always have no_local_return
@dialyzer {:nowarn_function, import_range: 4}
@ -42,11 +39,12 @@ defmodule Indexer.BlockFetcher do
def default_blocks_batch_size, do: @blocks_batch_size
@doc """
Starts the server.
Required named arguments
## Options
* `:json_rpc_named_arguments` - `t:EthereumJSONRPC.json_rpc_named_arguments/0` passed to
`EthereumJSONRPC.json_rpc/2`.
Default options are pulled from application config under the :indexer` keyspace. The follow options can be overridden:
The follow options can be overridden:
* `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to
`#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions
@ -66,8 +64,17 @@ defmodule Indexer.BlockFetcher do
`#{@blocks_concurrency * @receipts_concurrency * @receipts_batch_size}`) receipts can be requested from the
JSONRPC at once over all connections. *Each transaction only has one receipt.*
"""
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
def new(named_arguments) when is_list(named_arguments) do
interval = div(named_arguments[:block_interval] || @block_interval, 2)
state = struct!(__MODULE__, Keyword.delete(named_arguments, :block_interval))
%__MODULE__{
state
| json_rpc_named_arguments: Keyword.fetch!(named_arguments, :json_rpc_named_arguments),
catchup_bound_interval: BoundInterval.within(interval..(interval * 10)),
realtime_interval: interval
}
end
def stream_import(%__MODULE__{} = state, seq, indexer_mode, task_opts) do
@ -80,59 +87,6 @@ defmodule Indexer.BlockFetcher do
|> Stream.run()
end
@impl GenServer
def init(opts) do
opts =
:indexer
|> Application.get_all_env()
|> Keyword.merge(opts)
interval = div(opts[:block_interval] || @block_interval, 2)
state = %__MODULE__{
json_rpc_named_arguments: Keyword.fetch!(opts, :json_rpc_named_arguments),
catchup_bound_interval: BoundInterval.within(interval..(interval * 10)),
realtime_interval: interval,
blocks_batch_size: Keyword.get(opts, :blocks_batch_size, @blocks_batch_size),
blocks_concurrency: Keyword.get(opts, :blocks_concurrency, @blocks_concurrency),
receipts_batch_size: Keyword.get(opts, :receipts_batch_size, @receipts_batch_size),
receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency)
}
send(self(), :catchup_index)
{:ok, _} = :timer.send_interval(state.realtime_interval, :realtime_index)
{:ok, state}
end
@impl GenServer
def handle_info(:catchup_index, %__MODULE__{} = state) do
{:noreply, Catchup.put(state)}
end
def handle_info({ref, _} = message, %__MODULE__{catchup_task: %Task{ref: ref}} = state) do
{:noreply, Catchup.handle_success(message, state)}
end
def handle_info(
{:DOWN, ref, :process, pid, _} = message,
%__MODULE__{catchup_task: %Task{pid: pid, ref: ref}} = state
) do
{:noreply, Catchup.handle_failure(message, state)}
end
def handle_info(:realtime_index, %__MODULE__{} = state) do
{:noreply, Realtime.put(state)}
end
def handle_info({ref, :ok} = message, %__MODULE__{} = state) when is_reference(ref) do
{:noreply, Realtime.handle_success(message, state)}
end
def handle_info({:DOWN, _, :process, _, _} = message, %__MODULE__{} = state) do
{:noreply, Realtime.handle_failure(message, state)}
end
defp cap_seq(seq, next, range) do
case next do
:more ->

@ -0,0 +1,67 @@
defmodule Indexer.BlockFetcher.Supervisor do
@moduledoc """
Supervises the `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
"""
# NOT a `Supervisor` because of the `Task` restart strategies are custom.
use GenServer
require Logger
alias Indexer.BlockFetcher
alias Indexer.BlockFetcher.{Catchup, Realtime}
def child_spec(arg) do
# The `child_spec` from `use Supervisor` because the one from `use GenServer` will set the `type` to `:worker`
# instead of `:supervisor` and use the wrong shutdown timeout
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, [arg]}, type: :supervisor}, [])
end
@doc """
Starts supervisor of `Indexer.BlockerFetcher.Catchup` and `Indexer.BlockFetcher.Realtime`.
For `named_arguments` see `Indexer.BlockFetcher.new/1`. For `t:GenServer.options/0` see `GenServer.start_link/3`.
"""
@spec start_link([named_arguments :: list() | GenServer.options()]) :: {:ok, pid}
def start_link([named_arguments, gen_server_options]) when is_list(named_arguments) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, named_arguments, gen_server_options)
end
@impl GenServer
def init(named_arguments) do
state = BlockFetcher.new(named_arguments)
send(self(), :catchup_index)
{:ok, _} = :timer.send_interval(state.realtime_interval, :realtime_index)
{:ok, state}
end
@impl GenServer
def handle_info(:catchup_index, %BlockFetcher{} = state) do
{:noreply, Catchup.put(state)}
end
def handle_info({ref, _} = message, %BlockFetcher{catchup_task: %Task{ref: ref}} = state) do
{:noreply, Catchup.handle_success(message, state)}
end
def handle_info(
{:DOWN, ref, :process, pid, _} = message,
%BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = state
) do
{:noreply, Catchup.handle_failure(message, state)}
end
def handle_info(:realtime_index, %BlockFetcher{} = state) do
{:noreply, Realtime.put(state)}
end
def handle_info({ref, :ok} = message, %BlockFetcher{} = state) when is_reference(ref) do
{:noreply, Realtime.handle_success(message, state)}
end
def handle_info({:DOWN, _, :process, _, _} = message, %BlockFetcher{} = state) do
{:noreply, Realtime.handle_failure(message, state)}
end
end

@ -1,4 +1,4 @@
defmodule Indexer.BlockFetcherTest do
defmodule Indexer.BlockFetcher.SupervisorTest do
# `async: false` due to use of named GenServer
use EthereumJSONRPC.Case, async: false
use Explorer.DataCase
@ -223,7 +223,7 @@ defmodule Indexer.BlockFetcherTest do
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised!({BlockFetcher, json_rpc_named_arguments: json_rpc_named_arguments})
start_supervised!({BlockFetcher.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], []]})
first_catchup_block_number = latest_block_number - 1
@ -271,14 +271,14 @@ defmodule Indexer.BlockFetcherTest do
assert_received :catchup_index
assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
BlockFetcher.handle_info(:catchup_index, state)
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.handle_info(message, catchup_index_state)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
@ -339,7 +339,7 @@ defmodule Indexer.BlockFetcherTest do
assert_received :catchup_index
assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
BlockFetcher.handle_info(:catchup_index, state)
BlockFetcher.Supervisor.handle_info(:catchup_index, state)
# 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for
# catchup_index
@ -348,7 +348,7 @@ defmodule Indexer.BlockFetcherTest do
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.handle_info(message, catchup_index_state)
assert {:noreply, message_state} = BlockFetcher.Supervisor.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
@ -360,7 +360,7 @@ defmodule Indexer.BlockFetcherTest do
above_minimum_state = update_in(catchup_index_state.catchup_bound_interval, &BoundInterval.increase/1)
assert above_minimum_state.catchup_bound_interval.current > message_state.catchup_bound_interval.minimum
assert {:noreply, above_minimum_message_state} = BlockFetcher.handle_info(message, above_minimum_state)
assert {:noreply, above_minimum_message_state} = BlockFetcher.Supervisor.handle_info(message, above_minimum_state)
assert above_minimum_message_state.catchup_bound_interval.current <
above_minimum_state.catchup_bound_interval.current
@ -374,7 +374,7 @@ defmodule Indexer.BlockFetcherTest do
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
{:ok, state} = BlockFetcher.init(json_rpc_named_arguments: json_rpc_named_arguments)
{:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments)
%{state: state}
end
@ -906,7 +906,7 @@ defmodule Indexer.BlockFetcherTest do
end
defp state(%{json_rpc_named_arguments: json_rpc_named_arguments}) do
{:ok, state} = BlockFetcher.init(json_rpc_named_arguments: json_rpc_named_arguments)
{:ok, state} = BlockFetcher.Supervisor.init(json_rpc_named_arguments: json_rpc_named_arguments)
%{state: state}
end
Loading…
Cancel
Save