From 098985faf4d47deac2d3c617a7cfe833123776d3 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Fri, 11 May 2018 04:55:06 -0500 Subject: [PATCH] Use struct to prevent regression in poll_timer not existing in state Prevent regression in bug where transition to realtime does not work because poll_timer does not exist in state map so `%{state | poll_timer: ...}` fails at end of genesis task like: ``` [error] GenServer Explorer.Indexer.BlockFetcher terminating ** (KeyError) key :poll_timer not found (explorer) lib/explorer/indexer/block_fetcher.ex:283: Explorer.Indexer.BlockFetcher.schedule_next_realtime_fetch/1 (explorer) lib/explorer/indexer/block_fetcher.ex:121: Explorer.Indexer.BlockFetcher.handle_info/2 (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4 (stdlib) gen_server.erl:686: :gen_server.handle_msg/6 (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3 Last message: {:DOWN, #Reference<0.2595141227.225443843.205604>, :process, #PID<0.10048.17>, :normal} State: %{blocks_batch_size: 10, blocks_concurrency: 10, debug_logs: true, genesis_task: #PID<0.10048.17>, internal_transactions_batch_size: 50, internal_transactions_concurrency: 8, realtime_interval: 10000, receipts_batch_size: 250, receipts_concurrency: 20} ``` --- .../lib/explorer/indexer/block_fetcher.ex | 95 +++++++++++-------- 1 file changed, 57 insertions(+), 38 deletions(-) diff --git a/apps/explorer/lib/explorer/indexer/block_fetcher.ex b/apps/explorer/lib/explorer/indexer/block_fetcher.ex index f500038c7b..7f58053c40 100644 --- a/apps/explorer/lib/explorer/indexer/block_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/block_fetcher.ex @@ -9,7 +9,7 @@ defmodule Explorer.Indexer.BlockFetcher do alias Explorer.{Chain, Indexer, JSONRPC} - alias Explorer.Indexer.{AddressFetcher, Sequence} + alias Explorer.Indexer.{AddressFetcher, BlockFetcher, Sequence} alias Explorer.JSONRPC.Transactions @@ -28,10 +28,27 @@ defmodule Explorer.Indexer.BlockFetcher do # milliseconds @block_rate 5_000 + @realtime_interval_per_block_rate 2 @receipts_batch_size 250 @receipts_concurrency 20 + defstruct debug_logs: @debug_logs, + blocks_batch_size: @blocks_batch_size, + blocks_concurrency: @blocks_concurrency, + genesis_task: nil, + internal_transactions_batch_size: @internal_transactions_batch_size, + internal_transactions_concurrency: @internal_transactions_concurrency, + poll_timer: nil, + realtime_interval: @block_rate * @realtime_interval_per_block_rate, + realtime_task: nil, + receipts_batch_size: @receipts_batch_size, + receipts_concurrency: @receipts_concurrency + + # This needs to match the documented options below for `start_link/1` + @allowed_option_names ~w(debug_logs blocks_batch_size blocks_concurrency block_rate internal_transactions_batch_size + internal_transactions_concurrrency receipts_batch_size receipts_concurrency)a + @doc """ Starts the server. @@ -40,7 +57,7 @@ defmodule Explorer.Indexer.BlockFetcher do Default options are pulled from application config under the `:explorer, :indexer` keyspace. The follow options can be overridden: - * `:debug_logs` - When `true` logs verbose index progress. Defaults `false`. + * `:debug_logs` - When `true` logs verbose index progress. Defaults `#{@debug_logs}`. * `:blocks_batch_size` - The number of blocks to request in one call to the JSONRPC. Defaults to `#{@blocks_batch_size}`. Block requests also include the transactions for those blocks. *These transactions are not paginated.* @@ -73,55 +90,57 @@ defmodule Explorer.Indexer.BlockFetcher do @impl GenServer def init(opts) do - opts = Keyword.merge(Application.fetch_env!(:explorer, :indexer), opts) + fields = + :explorer + |> Application.fetch_env!(:indexer) + |> Keyword.merge(opts) + |> Keyword.take(@allowed_option_names) + |> put_block_rate() + + state = struct!(%BlockFetcher{}, fields) send(self(), :catchup_index) :timer.send_interval(15_000, self(), :debug_count) - state = %{ - genesis_task: nil, - debug_logs: Keyword.get(opts, :debug_logs, @debug_logs), - realtime_interval: (opts[:block_rate] || @block_rate) * 2, - blocks_batch_size: Keyword.get(opts, :blocks_batch_size, @blocks_batch_size), - blocks_concurrency: Keyword.get(opts, :blocks_concurrency, @blocks_concurrency), - internal_transactions_batch_size: - Keyword.get(opts, :internal_transactions_batch_size, @internal_transactions_batch_size), - internal_transactions_concurrency: - Keyword.get(opts, :internal_transactions_concurrency, @internal_transactions_concurrency), - receipts_batch_size: Keyword.get(opts, :receipts_batch_size, @receipts_batch_size), - receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency) - } - {:ok, state} end + defp put_block_rate(allowed_options) do + if Keyword.has_key?(allowed_options, :block_rate) do + {block_rate, passthrough_fields} = Keyword.pop(allowed_options, :block_rate) + Keyword.put(passthrough_fields, :realtime_interval, block_rate * @realtime_interval_per_block_rate) + else + allowed_options + end + end + @impl GenServer - def handle_info(:catchup_index, state) do + def handle_info(:catchup_index, %BlockFetcher{} = state) do {:ok, genesis_task} = Task.start_link(fn -> genesis_task(state) end) Process.monitor(genesis_task) - {:noreply, %{state | genesis_task: genesis_task}} + {:noreply, %BlockFetcher{state | genesis_task: genesis_task}} end - def handle_info(:realtime_index, state) do + def handle_info(:realtime_index, %BlockFetcher{} = state) do {:ok, realtime_task} = Task.start_link(fn -> realtime_task(state) end) Process.monitor(realtime_task) - {:noreply, %{state | realtime_task: realtime_task}} + {:noreply, %BlockFetcher{state | realtime_task: realtime_task}} end - def handle_info({:DOWN, _ref, :process, pid, :normal}, %{realtime_task: pid} = state) do - {:noreply, schedule_next_realtime_fetch(%{state | realtime_task: nil})} + def handle_info({:DOWN, _ref, :process, pid, :normal}, %BlockFetcher{realtime_task: pid} = state) do + {:noreply, schedule_next_realtime_fetch(%BlockFetcher{state | realtime_task: nil})} end - def handle_info({:DOWN, _ref, :process, pid, :normal}, %{genesis_task: pid} = state) do + def handle_info({:DOWN, _ref, :process, pid, :normal}, %BlockFetcher{genesis_task: pid} = state) do Logger.info(fn -> "Finished index from genesis. Transitioning to realtime index." end) - {:noreply, schedule_next_realtime_fetch(%{state | genesis_task: nil})} + {:noreply, schedule_next_realtime_fetch(%BlockFetcher{state | genesis_task: nil})} end - def handle_info(:debug_count, state) do + def handle_info(:debug_count, %BlockFetcher{} = state) do debug(state, fn -> """ @@ -143,14 +162,14 @@ defmodule Explorer.Indexer.BlockFetcher do :ok = Sequence.cap(seq) end - defp cap_seq(_seq, :more, {block_start, block_end}, state) do + defp cap_seq(_seq, :more, {block_start, block_end}, %BlockFetcher{} = state) do debug(state, fn -> "got blocks #{block_start} - #{block_end}" end) :ok end defp fetch_internal_transactions(_state, []), do: {:ok, []} - defp fetch_internal_transactions(state, hashes) do + defp fetch_internal_transactions(%BlockFetcher{} = state, hashes) do debug(state, fn -> "fetching internal transactions for #{length(hashes)} transactions" end) stream_opts = [max_concurrency: state.internal_transactions_concurrency, timeout: :infinity] @@ -166,7 +185,7 @@ defmodule Explorer.Indexer.BlockFetcher do defp fetch_transaction_receipts(_state, []), do: {:ok, %{logs: [], receipts: []}} - defp fetch_transaction_receipts(state, hashes) do + defp fetch_transaction_receipts(%BlockFetcher{} = state, hashes) do debug(state, fn -> "fetching #{length(hashes)} transaction receipts" end) stream_opts = [max_concurrency: state.receipts_concurrency, timeout: :infinity] @@ -185,7 +204,7 @@ defmodule Explorer.Indexer.BlockFetcher do end) end - defp genesis_task(state) do + defp genesis_task(%BlockFetcher{} = state) do {count, missing_ranges} = missing_block_numbers(state) current_block = Indexer.next_block_number() @@ -195,7 +214,7 @@ defmodule Explorer.Indexer.BlockFetcher do stream_import(state, seq, max_concurrency: state.blocks_concurrency) end - defp insert(state, seq, range, params) do + defp insert(%BlockFetcher{} = state, seq, range, params) do with {:ok, %{addresses: address_hashes}} = ok <- Chain.import_blocks(params) do :ok = AddressFetcher.async_fetch_balances(address_hashes) ok @@ -211,7 +230,7 @@ defmodule Explorer.Indexer.BlockFetcher do end end - defp missing_block_numbers(%{blocks_batch_size: blocks_batch_size}) do + defp missing_block_numbers(%BlockFetcher{blocks_batch_size: blocks_batch_size}) do {count, missing_ranges} = Chain.missing_block_numbers() chunked_ranges = @@ -235,7 +254,7 @@ defmodule Explorer.Indexer.BlockFetcher do {count, chunked_ranges} end - defp realtime_task(state) do + defp realtime_task(%BlockFetcher{} = state) do {:ok, seq} = Sequence.start_link([], Indexer.next_block_number(), 2) stream_import(state, seq, max_concurrency: 1) end @@ -250,7 +269,7 @@ defmodule Explorer.Indexer.BlockFetcher do # Run at state.blocks_concurrency max_concurrency when called by `stream_import/3` # Only public for testing @doc false - def import_range({block_start, block_end} = range, state, seq) do + def import_range({block_start, block_end} = range, %BlockFetcher{} = state, seq) do with {:blocks, {:ok, next, result}} <- {:blocks, JSONRPC.fetch_blocks_by_range(block_start, block_end)}, %{blocks: blocks, transactions: transactions} = result, cap_seq(seq, next, range, state), @@ -278,11 +297,11 @@ defmodule Explorer.Indexer.BlockFetcher do end end - defp schedule_next_realtime_fetch(state) do + defp schedule_next_realtime_fetch(%BlockFetcher{} = state) do timer = Process.send_after(self(), :realtime_index, state.realtime_interval) - %{state | poll_timer: timer} + %BlockFetcher{state | poll_timer: timer} end - defp debug(%{debug_logs: true}, func), do: Logger.debug(func) - defp debug(%{debug_logs: false}, _func), do: :noop + defp debug(%BlockFetcher{debug_logs: true}, func), do: Logger.debug(func) + defp debug(%BlockFetcher{debug_logs: false}, _func), do: :noop end