Use struct to prevent regression in poll_timer not existing in state

Prevent regression in bug where transition to realtime does not work
because poll_timer does not exist in state map so `%{state | poll_timer:
...}` fails at end of genesis task like:

```
[error] GenServer Explorer.Indexer.BlockFetcher terminating
** (KeyError) key :poll_timer not found
    (explorer) lib/explorer/indexer/block_fetcher.ex:283:
Explorer.Indexer.BlockFetcher.schedule_next_realtime_fetch/1
    (explorer) lib/explorer/indexer/block_fetcher.ex:121:
Explorer.Indexer.BlockFetcher.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:DOWN, #Reference<0.2595141227.225443843.205604>,
:process, #PID<0.10048.17>, :normal}
State: %{blocks_batch_size: 10, blocks_concurrency: 10, debug_logs:
true, genesis_task: #PID<0.10048.17>, internal_transactions_batch_size:
50, internal_transactions_concurrency: 8, realtime_interval: 10000,
receipts_batch_size: 250, receipts_concurrency: 20}
```
pull/162/head
Luke Imhoff 7 years ago
parent 8f35e93c0d
commit 098985faf4
  1. 95
      apps/explorer/lib/explorer/indexer/block_fetcher.ex

@ -9,7 +9,7 @@ defmodule Explorer.Indexer.BlockFetcher do
alias Explorer.{Chain, Indexer, JSONRPC}
alias Explorer.Indexer.{AddressFetcher, Sequence}
alias Explorer.Indexer.{AddressFetcher, BlockFetcher, Sequence}
alias Explorer.JSONRPC.Transactions
@ -28,10 +28,27 @@ defmodule Explorer.Indexer.BlockFetcher do
# milliseconds
@block_rate 5_000
@realtime_interval_per_block_rate 2
@receipts_batch_size 250
@receipts_concurrency 20
defstruct debug_logs: @debug_logs,
blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
genesis_task: nil,
internal_transactions_batch_size: @internal_transactions_batch_size,
internal_transactions_concurrency: @internal_transactions_concurrency,
poll_timer: nil,
realtime_interval: @block_rate * @realtime_interval_per_block_rate,
realtime_task: nil,
receipts_batch_size: @receipts_batch_size,
receipts_concurrency: @receipts_concurrency
# This needs to match the documented options below for `start_link/1`
@allowed_option_names ~w(debug_logs blocks_batch_size blocks_concurrency block_rate internal_transactions_batch_size
internal_transactions_concurrrency receipts_batch_size receipts_concurrency)a
@doc """
Starts the server.
@ -40,7 +57,7 @@ defmodule Explorer.Indexer.BlockFetcher do
Default options are pulled from application config under the
`:explorer, :indexer` keyspace. The follow options can be overridden:
* `:debug_logs` - When `true` logs verbose index progress. Defaults `false`.
* `:debug_logs` - When `true` logs verbose index progress. Defaults `#{@debug_logs}`.
* `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to
`#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions
are not paginated.*
@ -73,55 +90,57 @@ defmodule Explorer.Indexer.BlockFetcher do
@impl GenServer
def init(opts) do
opts = Keyword.merge(Application.fetch_env!(:explorer, :indexer), opts)
fields =
:explorer
|> Application.fetch_env!(:indexer)
|> Keyword.merge(opts)
|> Keyword.take(@allowed_option_names)
|> put_block_rate()
state = struct!(%BlockFetcher{}, fields)
send(self(), :catchup_index)
:timer.send_interval(15_000, self(), :debug_count)
state = %{
genesis_task: nil,
debug_logs: Keyword.get(opts, :debug_logs, @debug_logs),
realtime_interval: (opts[:block_rate] || @block_rate) * 2,
blocks_batch_size: Keyword.get(opts, :blocks_batch_size, @blocks_batch_size),
blocks_concurrency: Keyword.get(opts, :blocks_concurrency, @blocks_concurrency),
internal_transactions_batch_size:
Keyword.get(opts, :internal_transactions_batch_size, @internal_transactions_batch_size),
internal_transactions_concurrency:
Keyword.get(opts, :internal_transactions_concurrency, @internal_transactions_concurrency),
receipts_batch_size: Keyword.get(opts, :receipts_batch_size, @receipts_batch_size),
receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency)
}
{:ok, state}
end
defp put_block_rate(allowed_options) do
if Keyword.has_key?(allowed_options, :block_rate) do
{block_rate, passthrough_fields} = Keyword.pop(allowed_options, :block_rate)
Keyword.put(passthrough_fields, :realtime_interval, block_rate * @realtime_interval_per_block_rate)
else
allowed_options
end
end
@impl GenServer
def handle_info(:catchup_index, state) do
def handle_info(:catchup_index, %BlockFetcher{} = state) do
{:ok, genesis_task} = Task.start_link(fn -> genesis_task(state) end)
Process.monitor(genesis_task)
{:noreply, %{state | genesis_task: genesis_task}}
{:noreply, %BlockFetcher{state | genesis_task: genesis_task}}
end
def handle_info(:realtime_index, state) do
def handle_info(:realtime_index, %BlockFetcher{} = state) do
{:ok, realtime_task} = Task.start_link(fn -> realtime_task(state) end)
Process.monitor(realtime_task)
{:noreply, %{state | realtime_task: realtime_task}}
{:noreply, %BlockFetcher{state | realtime_task: realtime_task}}
end
def handle_info({:DOWN, _ref, :process, pid, :normal}, %{realtime_task: pid} = state) do
{:noreply, schedule_next_realtime_fetch(%{state | realtime_task: nil})}
def handle_info({:DOWN, _ref, :process, pid, :normal}, %BlockFetcher{realtime_task: pid} = state) do
{:noreply, schedule_next_realtime_fetch(%BlockFetcher{state | realtime_task: nil})}
end
def handle_info({:DOWN, _ref, :process, pid, :normal}, %{genesis_task: pid} = state) do
def handle_info({:DOWN, _ref, :process, pid, :normal}, %BlockFetcher{genesis_task: pid} = state) do
Logger.info(fn -> "Finished index from genesis. Transitioning to realtime index." end)
{:noreply, schedule_next_realtime_fetch(%{state | genesis_task: nil})}
{:noreply, schedule_next_realtime_fetch(%BlockFetcher{state | genesis_task: nil})}
end
def handle_info(:debug_count, state) do
def handle_info(:debug_count, %BlockFetcher{} = state) do
debug(state, fn ->
"""
@ -143,14 +162,14 @@ defmodule Explorer.Indexer.BlockFetcher do
:ok = Sequence.cap(seq)
end
defp cap_seq(_seq, :more, {block_start, block_end}, state) do
defp cap_seq(_seq, :more, {block_start, block_end}, %BlockFetcher{} = state) do
debug(state, fn -> "got blocks #{block_start} - #{block_end}" end)
:ok
end
defp fetch_internal_transactions(_state, []), do: {:ok, []}
defp fetch_internal_transactions(state, hashes) do
defp fetch_internal_transactions(%BlockFetcher{} = state, hashes) do
debug(state, fn -> "fetching internal transactions for #{length(hashes)} transactions" end)
stream_opts = [max_concurrency: state.internal_transactions_concurrency, timeout: :infinity]
@ -166,7 +185,7 @@ defmodule Explorer.Indexer.BlockFetcher do
defp fetch_transaction_receipts(_state, []), do: {:ok, %{logs: [], receipts: []}}
defp fetch_transaction_receipts(state, hashes) do
defp fetch_transaction_receipts(%BlockFetcher{} = state, hashes) do
debug(state, fn -> "fetching #{length(hashes)} transaction receipts" end)
stream_opts = [max_concurrency: state.receipts_concurrency, timeout: :infinity]
@ -185,7 +204,7 @@ defmodule Explorer.Indexer.BlockFetcher do
end)
end
defp genesis_task(state) do
defp genesis_task(%BlockFetcher{} = state) do
{count, missing_ranges} = missing_block_numbers(state)
current_block = Indexer.next_block_number()
@ -195,7 +214,7 @@ defmodule Explorer.Indexer.BlockFetcher do
stream_import(state, seq, max_concurrency: state.blocks_concurrency)
end
defp insert(state, seq, range, params) do
defp insert(%BlockFetcher{} = state, seq, range, params) do
with {:ok, %{addresses: address_hashes}} = ok <- Chain.import_blocks(params) do
:ok = AddressFetcher.async_fetch_balances(address_hashes)
ok
@ -211,7 +230,7 @@ defmodule Explorer.Indexer.BlockFetcher do
end
end
defp missing_block_numbers(%{blocks_batch_size: blocks_batch_size}) do
defp missing_block_numbers(%BlockFetcher{blocks_batch_size: blocks_batch_size}) do
{count, missing_ranges} = Chain.missing_block_numbers()
chunked_ranges =
@ -235,7 +254,7 @@ defmodule Explorer.Indexer.BlockFetcher do
{count, chunked_ranges}
end
defp realtime_task(state) do
defp realtime_task(%BlockFetcher{} = state) do
{:ok, seq} = Sequence.start_link([], Indexer.next_block_number(), 2)
stream_import(state, seq, max_concurrency: 1)
end
@ -250,7 +269,7 @@ defmodule Explorer.Indexer.BlockFetcher do
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/3`
# Only public for testing
@doc false
def import_range({block_start, block_end} = range, state, seq) do
def import_range({block_start, block_end} = range, %BlockFetcher{} = state, seq) do
with {:blocks, {:ok, next, result}} <- {:blocks, JSONRPC.fetch_blocks_by_range(block_start, block_end)},
%{blocks: blocks, transactions: transactions} = result,
cap_seq(seq, next, range, state),
@ -278,11 +297,11 @@ defmodule Explorer.Indexer.BlockFetcher do
end
end
defp schedule_next_realtime_fetch(state) do
defp schedule_next_realtime_fetch(%BlockFetcher{} = state) do
timer = Process.send_after(self(), :realtime_index, state.realtime_interval)
%{state | poll_timer: timer}
%BlockFetcher{state | poll_timer: timer}
end
defp debug(%{debug_logs: true}, func), do: Logger.debug(func)
defp debug(%{debug_logs: false}, _func), do: :noop
defp debug(%BlockFetcher{debug_logs: true}, func), do: Logger.debug(func)
defp debug(%BlockFetcher{debug_logs: false}, _func), do: :noop
end

Loading…
Cancel
Save