From 31eb7ead8972eb3798515b866d2803e8eded6eec Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Fri, 11 May 2018 12:36:50 -0400 Subject: [PATCH] Use monitored, supervised task for improved isolation --- .../lib/explorer/indexer/block_fetcher.ex | 131 +++++++++--------- .../explorer/indexer/block_fetcher_test.exs | 2 + 2 files changed, 67 insertions(+), 66 deletions(-) diff --git a/apps/explorer/lib/explorer/indexer/block_fetcher.ex b/apps/explorer/lib/explorer/indexer/block_fetcher.ex index 7f58053c40..4a9bf92ea9 100644 --- a/apps/explorer/lib/explorer/indexer/block_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/block_fetcher.ex @@ -9,14 +9,15 @@ defmodule Explorer.Indexer.BlockFetcher do alias Explorer.{Chain, Indexer, JSONRPC} - alias Explorer.Indexer.{AddressFetcher, BlockFetcher, Sequence} + alias Explorer.Indexer.{AddressFetcher, Sequence} alias Explorer.JSONRPC.Transactions # dialyzer thinks that Logger.debug functions always have no_local_return @dialyzer {:nowarn_function, import_range: 3} - # These are all the *default* values for options. DO NOT use them directly in the code. Get options from `state`. + # These are all the *default* values for options. + # DO NOT use them directly in the code. Get options from `state`. @debug_logs false @@ -28,27 +29,10 @@ defmodule Explorer.Indexer.BlockFetcher do # milliseconds @block_rate 5_000 - @realtime_interval_per_block_rate 2 @receipts_batch_size 250 @receipts_concurrency 20 - defstruct debug_logs: @debug_logs, - blocks_batch_size: @blocks_batch_size, - blocks_concurrency: @blocks_concurrency, - genesis_task: nil, - internal_transactions_batch_size: @internal_transactions_batch_size, - internal_transactions_concurrency: @internal_transactions_concurrency, - poll_timer: nil, - realtime_interval: @block_rate * @realtime_interval_per_block_rate, - realtime_task: nil, - receipts_batch_size: @receipts_batch_size, - receipts_concurrency: @receipts_concurrency - - # This needs to match the documented options below for `start_link/1` - @allowed_option_names ~w(debug_logs blocks_batch_size blocks_concurrency block_rate internal_transactions_batch_size - internal_transactions_concurrrency receipts_batch_size receipts_concurrency)a - @doc """ Starts the server. @@ -90,57 +74,61 @@ defmodule Explorer.Indexer.BlockFetcher do @impl GenServer def init(opts) do - fields = - :explorer - |> Application.fetch_env!(:indexer) - |> Keyword.merge(opts) - |> Keyword.take(@allowed_option_names) - |> put_block_rate() - - state = struct!(%BlockFetcher{}, fields) - - send(self(), :catchup_index) + opts = Keyword.merge(Application.fetch_env!(:explorer, :indexer), opts) :timer.send_interval(15_000, self(), :debug_count) - {:ok, state} - end - - defp put_block_rate(allowed_options) do - if Keyword.has_key?(allowed_options, :block_rate) do - {block_rate, passthrough_fields} = Keyword.pop(allowed_options, :block_rate) - Keyword.put(passthrough_fields, :realtime_interval, block_rate * @realtime_interval_per_block_rate) - else - allowed_options - end + state = %{ + genesis_task: nil, + realtime_task: nil, + debug_logs: Keyword.get(opts, :debug_logs, @debug_logs), + realtime_interval: (opts[:block_rate] || @block_rate) * 2, + blocks_batch_size: Keyword.get(opts, :blocks_batch_size, @blocks_batch_size), + blocks_concurrency: Keyword.get(opts, :blocks_concurrency, @blocks_concurrency), + internal_transactions_batch_size: + Keyword.get(opts, :internal_transactions_batch_size, @internal_transactions_batch_size), + internal_transactions_concurrency: + Keyword.get(opts, :internal_transactions_concurrency, @internal_transactions_concurrency), + receipts_batch_size: Keyword.get(opts, :receipts_batch_size, @receipts_batch_size), + receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency) + } + + {:ok, schedule_next_catchup_index(state)} end @impl GenServer - def handle_info(:catchup_index, %BlockFetcher{} = state) do - {:ok, genesis_task} = Task.start_link(fn -> genesis_task(state) end) - - Process.monitor(genesis_task) + def handle_info(:catchup_index, %{} = state) do + {:ok, genesis_task, _ref} = monitor_task(fn -> genesis_task(state) end) - {:noreply, %BlockFetcher{state | genesis_task: genesis_task}} + {:noreply, %{state | genesis_task: genesis_task}} end - def handle_info(:realtime_index, %BlockFetcher{} = state) do - {:ok, realtime_task} = Task.start_link(fn -> realtime_task(state) end) + def handle_info(:realtime_index, %{} = state) do + {:ok, realtime_task, _ref} = monitor_task(fn -> realtime_task(state) end) - Process.monitor(realtime_task) + {:noreply, %{state | realtime_task: realtime_task}} + end - {:noreply, %BlockFetcher{state | realtime_task: realtime_task}} + def handle_info({:DOWN, _ref, :process, pid, :normal}, %{realtime_task: pid} = state) do + {:noreply, schedule_next_realtime_fetch(%{state | realtime_task: nil})} end - def handle_info({:DOWN, _ref, :process, pid, :normal}, %BlockFetcher{realtime_task: pid} = state) do - {:noreply, schedule_next_realtime_fetch(%BlockFetcher{state | realtime_task: nil})} + def handle_info({:DOWN, _ref, :process, pid, _reason}, %{realtime_task: pid} = state) do + Logger.error(fn -> "realtime index stream exited. Restarting" end) + {:noreply, schedule_next_realtime_fetch(%{state | realtime_task: nil})} end - def handle_info({:DOWN, _ref, :process, pid, :normal}, %BlockFetcher{genesis_task: pid} = state) do + def handle_info({:DOWN, _ref, :process, pid, :normal}, %{genesis_task: pid} = state) do Logger.info(fn -> "Finished index from genesis. Transitioning to realtime index." end) - {:noreply, schedule_next_realtime_fetch(%BlockFetcher{state | genesis_task: nil})} + {:noreply, schedule_next_realtime_fetch(%{state | genesis_task: nil})} end - def handle_info(:debug_count, %BlockFetcher{} = state) do + def handle_info({:DOWN, _ref, :process, pid, _reason}, %{genesis_task: pid} = state) do + Logger.error(fn -> "gensis index stream exited. Restarting" end) + + {:noreply, schedule_next_catchup_index(%{state | genesis_task: nil})} + end + + def handle_info(:debug_count, %{} = state) do debug(state, fn -> """ @@ -162,14 +150,14 @@ defmodule Explorer.Indexer.BlockFetcher do :ok = Sequence.cap(seq) end - defp cap_seq(_seq, :more, {block_start, block_end}, %BlockFetcher{} = state) do + defp cap_seq(_seq, :more, {block_start, block_end}, %{} = state) do debug(state, fn -> "got blocks #{block_start} - #{block_end}" end) :ok end defp fetch_internal_transactions(_state, []), do: {:ok, []} - defp fetch_internal_transactions(%BlockFetcher{} = state, hashes) do + defp fetch_internal_transactions(%{} = state, hashes) do debug(state, fn -> "fetching internal transactions for #{length(hashes)} transactions" end) stream_opts = [max_concurrency: state.internal_transactions_concurrency, timeout: :infinity] @@ -185,7 +173,7 @@ defmodule Explorer.Indexer.BlockFetcher do defp fetch_transaction_receipts(_state, []), do: {:ok, %{logs: [], receipts: []}} - defp fetch_transaction_receipts(%BlockFetcher{} = state, hashes) do + defp fetch_transaction_receipts(%{} = state, hashes) do debug(state, fn -> "fetching #{length(hashes)} transaction receipts" end) stream_opts = [max_concurrency: state.receipts_concurrency, timeout: :infinity] @@ -204,7 +192,7 @@ defmodule Explorer.Indexer.BlockFetcher do end) end - defp genesis_task(%BlockFetcher{} = state) do + defp genesis_task(%{} = state) do {count, missing_ranges} = missing_block_numbers(state) current_block = Indexer.next_block_number() @@ -214,7 +202,7 @@ defmodule Explorer.Indexer.BlockFetcher do stream_import(state, seq, max_concurrency: state.blocks_concurrency) end - defp insert(%BlockFetcher{} = state, seq, range, params) do + defp insert(%{} = state, seq, range, params) do with {:ok, %{addresses: address_hashes}} = ok <- Chain.import_blocks(params) do :ok = AddressFetcher.async_fetch_balances(address_hashes) ok @@ -230,7 +218,7 @@ defmodule Explorer.Indexer.BlockFetcher do end end - defp missing_block_numbers(%BlockFetcher{blocks_batch_size: blocks_batch_size}) do + defp missing_block_numbers(%{blocks_batch_size: blocks_batch_size}) do {count, missing_ranges} = Chain.missing_block_numbers() chunked_ranges = @@ -254,7 +242,7 @@ defmodule Explorer.Indexer.BlockFetcher do {count, chunked_ranges} end - defp realtime_task(%BlockFetcher{} = state) do + defp realtime_task(%{} = state) do {:ok, seq} = Sequence.start_link([], Indexer.next_block_number(), 2) stream_import(state, seq, max_concurrency: 1) end @@ -269,7 +257,7 @@ defmodule Explorer.Indexer.BlockFetcher do # Run at state.blocks_concurrency max_concurrency when called by `stream_import/3` # Only public for testing @doc false - def import_range({block_start, block_end} = range, %BlockFetcher{} = state, seq) do + def import_range({block_start, block_end} = range, %{} = state, seq) do with {:blocks, {:ok, next, result}} <- {:blocks, JSONRPC.fetch_blocks_by_range(block_start, block_end)}, %{blocks: blocks, transactions: transactions} = result, cap_seq(seq, next, range, state), @@ -297,11 +285,22 @@ defmodule Explorer.Indexer.BlockFetcher do end end - defp schedule_next_realtime_fetch(%BlockFetcher{} = state) do - timer = Process.send_after(self(), :realtime_index, state.realtime_interval) - %BlockFetcher{state | poll_timer: timer} + defp schedule_next_catchup_index(state) do + send(self(), :catchup_index) + state + end + + defp schedule_next_realtime_fetch(state) do + Process.send_after(self(), :realtime_index, state.realtime_interval) + state + end + + defp monitor_task(task_func) do + {:ok, pid} = Task.Supervisor.start_child(Indexer.TaskSupervisor, task_func) + ref = Process.monitor(pid) + {:ok, pid, ref} end - defp debug(%BlockFetcher{debug_logs: true}, func), do: Logger.debug(func) - defp debug(%BlockFetcher{debug_logs: false}, _func), do: :noop + defp debug(%{debug_logs: true}, func), do: Logger.debug(func) + defp debug(%{debug_logs: false}, _func), do: :noop end diff --git a/apps/explorer/test/explorer/indexer/block_fetcher_test.exs b/apps/explorer/test/explorer/indexer/block_fetcher_test.exs index 21187c7c57..04c140e6c0 100644 --- a/apps/explorer/test/explorer/indexer/block_fetcher_test.exs +++ b/apps/explorer/test/explorer/indexer/block_fetcher_test.exs @@ -32,6 +32,8 @@ defmodule Explorer.Indexer.BlockFetcherTest do test "starts fetching blocks from Genesis" do assert Repo.aggregate(Block, :count, :hash) == 0 + start_supervised!({JSONRPC, []}) + start_supervised!({Task.Supervisor, name: Explorer.Indexer.TaskSupervisor}) start_supervised!(BlockFetcher) wait(fn ->