Use monitored, supervised task for improved isolation

pull/162/head
Chris McCord 7 years ago committed by Luke Imhoff
parent 61b9e18f84
commit 31eb7ead89
  1. 131
      apps/explorer/lib/explorer/indexer/block_fetcher.ex
  2. 2
      apps/explorer/test/explorer/indexer/block_fetcher_test.exs

@ -9,14 +9,15 @@ defmodule Explorer.Indexer.BlockFetcher do
alias Explorer.{Chain, Indexer, JSONRPC}
alias Explorer.Indexer.{AddressFetcher, BlockFetcher, Sequence}
alias Explorer.Indexer.{AddressFetcher, Sequence}
alias Explorer.JSONRPC.Transactions
# dialyzer thinks that Logger.debug functions always have no_local_return
@dialyzer {:nowarn_function, import_range: 3}
# These are all the *default* values for options. DO NOT use them directly in the code. Get options from `state`.
# These are all the *default* values for options.
# DO NOT use them directly in the code. Get options from `state`.
@debug_logs false
@ -28,27 +29,10 @@ defmodule Explorer.Indexer.BlockFetcher do
# milliseconds
@block_rate 5_000
@realtime_interval_per_block_rate 2
@receipts_batch_size 250
@receipts_concurrency 20
defstruct debug_logs: @debug_logs,
blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
genesis_task: nil,
internal_transactions_batch_size: @internal_transactions_batch_size,
internal_transactions_concurrency: @internal_transactions_concurrency,
poll_timer: nil,
realtime_interval: @block_rate * @realtime_interval_per_block_rate,
realtime_task: nil,
receipts_batch_size: @receipts_batch_size,
receipts_concurrency: @receipts_concurrency
# This needs to match the documented options below for `start_link/1`
@allowed_option_names ~w(debug_logs blocks_batch_size blocks_concurrency block_rate internal_transactions_batch_size
internal_transactions_concurrrency receipts_batch_size receipts_concurrency)a
@doc """
Starts the server.
@ -90,57 +74,61 @@ defmodule Explorer.Indexer.BlockFetcher do
@impl GenServer
def init(opts) do
fields =
:explorer
|> Application.fetch_env!(:indexer)
|> Keyword.merge(opts)
|> Keyword.take(@allowed_option_names)
|> put_block_rate()
state = struct!(%BlockFetcher{}, fields)
send(self(), :catchup_index)
opts = Keyword.merge(Application.fetch_env!(:explorer, :indexer), opts)
:timer.send_interval(15_000, self(), :debug_count)
{:ok, state}
end
defp put_block_rate(allowed_options) do
if Keyword.has_key?(allowed_options, :block_rate) do
{block_rate, passthrough_fields} = Keyword.pop(allowed_options, :block_rate)
Keyword.put(passthrough_fields, :realtime_interval, block_rate * @realtime_interval_per_block_rate)
else
allowed_options
end
state = %{
genesis_task: nil,
realtime_task: nil,
debug_logs: Keyword.get(opts, :debug_logs, @debug_logs),
realtime_interval: (opts[:block_rate] || @block_rate) * 2,
blocks_batch_size: Keyword.get(opts, :blocks_batch_size, @blocks_batch_size),
blocks_concurrency: Keyword.get(opts, :blocks_concurrency, @blocks_concurrency),
internal_transactions_batch_size:
Keyword.get(opts, :internal_transactions_batch_size, @internal_transactions_batch_size),
internal_transactions_concurrency:
Keyword.get(opts, :internal_transactions_concurrency, @internal_transactions_concurrency),
receipts_batch_size: Keyword.get(opts, :receipts_batch_size, @receipts_batch_size),
receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency)
}
{:ok, schedule_next_catchup_index(state)}
end
@impl GenServer
def handle_info(:catchup_index, %BlockFetcher{} = state) do
{:ok, genesis_task} = Task.start_link(fn -> genesis_task(state) end)
Process.monitor(genesis_task)
def handle_info(:catchup_index, %{} = state) do
{:ok, genesis_task, _ref} = monitor_task(fn -> genesis_task(state) end)
{:noreply, %BlockFetcher{state | genesis_task: genesis_task}}
{:noreply, %{state | genesis_task: genesis_task}}
end
def handle_info(:realtime_index, %BlockFetcher{} = state) do
{:ok, realtime_task} = Task.start_link(fn -> realtime_task(state) end)
def handle_info(:realtime_index, %{} = state) do
{:ok, realtime_task, _ref} = monitor_task(fn -> realtime_task(state) end)
Process.monitor(realtime_task)
{:noreply, %{state | realtime_task: realtime_task}}
end
{:noreply, %BlockFetcher{state | realtime_task: realtime_task}}
def handle_info({:DOWN, _ref, :process, pid, :normal}, %{realtime_task: pid} = state) do
{:noreply, schedule_next_realtime_fetch(%{state | realtime_task: nil})}
end
def handle_info({:DOWN, _ref, :process, pid, :normal}, %BlockFetcher{realtime_task: pid} = state) do
{:noreply, schedule_next_realtime_fetch(%BlockFetcher{state | realtime_task: nil})}
def handle_info({:DOWN, _ref, :process, pid, _reason}, %{realtime_task: pid} = state) do
Logger.error(fn -> "realtime index stream exited. Restarting" end)
{:noreply, schedule_next_realtime_fetch(%{state | realtime_task: nil})}
end
def handle_info({:DOWN, _ref, :process, pid, :normal}, %BlockFetcher{genesis_task: pid} = state) do
def handle_info({:DOWN, _ref, :process, pid, :normal}, %{genesis_task: pid} = state) do
Logger.info(fn -> "Finished index from genesis. Transitioning to realtime index." end)
{:noreply, schedule_next_realtime_fetch(%BlockFetcher{state | genesis_task: nil})}
{:noreply, schedule_next_realtime_fetch(%{state | genesis_task: nil})}
end
def handle_info(:debug_count, %BlockFetcher{} = state) do
def handle_info({:DOWN, _ref, :process, pid, _reason}, %{genesis_task: pid} = state) do
Logger.error(fn -> "gensis index stream exited. Restarting" end)
{:noreply, schedule_next_catchup_index(%{state | genesis_task: nil})}
end
def handle_info(:debug_count, %{} = state) do
debug(state, fn ->
"""
@ -162,14 +150,14 @@ defmodule Explorer.Indexer.BlockFetcher do
:ok = Sequence.cap(seq)
end
defp cap_seq(_seq, :more, {block_start, block_end}, %BlockFetcher{} = state) do
defp cap_seq(_seq, :more, {block_start, block_end}, %{} = state) do
debug(state, fn -> "got blocks #{block_start} - #{block_end}" end)
:ok
end
defp fetch_internal_transactions(_state, []), do: {:ok, []}
defp fetch_internal_transactions(%BlockFetcher{} = state, hashes) do
defp fetch_internal_transactions(%{} = state, hashes) do
debug(state, fn -> "fetching internal transactions for #{length(hashes)} transactions" end)
stream_opts = [max_concurrency: state.internal_transactions_concurrency, timeout: :infinity]
@ -185,7 +173,7 @@ defmodule Explorer.Indexer.BlockFetcher do
defp fetch_transaction_receipts(_state, []), do: {:ok, %{logs: [], receipts: []}}
defp fetch_transaction_receipts(%BlockFetcher{} = state, hashes) do
defp fetch_transaction_receipts(%{} = state, hashes) do
debug(state, fn -> "fetching #{length(hashes)} transaction receipts" end)
stream_opts = [max_concurrency: state.receipts_concurrency, timeout: :infinity]
@ -204,7 +192,7 @@ defmodule Explorer.Indexer.BlockFetcher do
end)
end
defp genesis_task(%BlockFetcher{} = state) do
defp genesis_task(%{} = state) do
{count, missing_ranges} = missing_block_numbers(state)
current_block = Indexer.next_block_number()
@ -214,7 +202,7 @@ defmodule Explorer.Indexer.BlockFetcher do
stream_import(state, seq, max_concurrency: state.blocks_concurrency)
end
defp insert(%BlockFetcher{} = state, seq, range, params) do
defp insert(%{} = state, seq, range, params) do
with {:ok, %{addresses: address_hashes}} = ok <- Chain.import_blocks(params) do
:ok = AddressFetcher.async_fetch_balances(address_hashes)
ok
@ -230,7 +218,7 @@ defmodule Explorer.Indexer.BlockFetcher do
end
end
defp missing_block_numbers(%BlockFetcher{blocks_batch_size: blocks_batch_size}) do
defp missing_block_numbers(%{blocks_batch_size: blocks_batch_size}) do
{count, missing_ranges} = Chain.missing_block_numbers()
chunked_ranges =
@ -254,7 +242,7 @@ defmodule Explorer.Indexer.BlockFetcher do
{count, chunked_ranges}
end
defp realtime_task(%BlockFetcher{} = state) do
defp realtime_task(%{} = state) do
{:ok, seq} = Sequence.start_link([], Indexer.next_block_number(), 2)
stream_import(state, seq, max_concurrency: 1)
end
@ -269,7 +257,7 @@ defmodule Explorer.Indexer.BlockFetcher do
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/3`
# Only public for testing
@doc false
def import_range({block_start, block_end} = range, %BlockFetcher{} = state, seq) do
def import_range({block_start, block_end} = range, %{} = state, seq) do
with {:blocks, {:ok, next, result}} <- {:blocks, JSONRPC.fetch_blocks_by_range(block_start, block_end)},
%{blocks: blocks, transactions: transactions} = result,
cap_seq(seq, next, range, state),
@ -297,11 +285,22 @@ defmodule Explorer.Indexer.BlockFetcher do
end
end
defp schedule_next_realtime_fetch(%BlockFetcher{} = state) do
timer = Process.send_after(self(), :realtime_index, state.realtime_interval)
%BlockFetcher{state | poll_timer: timer}
defp schedule_next_catchup_index(state) do
send(self(), :catchup_index)
state
end
defp schedule_next_realtime_fetch(state) do
Process.send_after(self(), :realtime_index, state.realtime_interval)
state
end
defp monitor_task(task_func) do
{:ok, pid} = Task.Supervisor.start_child(Indexer.TaskSupervisor, task_func)
ref = Process.monitor(pid)
{:ok, pid, ref}
end
defp debug(%BlockFetcher{debug_logs: true}, func), do: Logger.debug(func)
defp debug(%BlockFetcher{debug_logs: false}, _func), do: :noop
defp debug(%{debug_logs: true}, func), do: Logger.debug(func)
defp debug(%{debug_logs: false}, _func), do: :noop
end

@ -32,6 +32,8 @@ defmodule Explorer.Indexer.BlockFetcherTest do
test "starts fetching blocks from Genesis" do
assert Repo.aggregate(Block, :count, :hash) == 0
start_supervised!({JSONRPC, []})
start_supervised!({Task.Supervisor, name: Explorer.Indexer.TaskSupervisor})
start_supervised!(BlockFetcher)
wait(fn ->

Loading…
Cancel
Save