From 807f5e6d7cc0c2a8fa40a3e2e9a07a60ce821717 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 26 Jul 2018 10:17:32 -0500 Subject: [PATCH] BlockFetcher.realtime_task -> BlockFetcher.Realtime.task --- apps/indexer/lib/indexer/block_fetcher.ex | 55 ++------------- .../lib/indexer/block_fetcher/realtime.ex | 69 +++++++++++++++++++ 2 files changed, 75 insertions(+), 49 deletions(-) create mode 100644 apps/indexer/lib/indexer/block_fetcher/realtime.ex diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index cbf9a41025..93a4dc85cd 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -11,7 +11,7 @@ defmodule Indexer.BlockFetcher do alias Explorer.Chain alias Indexer.{BalanceFetcher, AddressExtraction, BoundInterval, InternalTransactionFetcher, Sequence} - alias Indexer.BlockFetcher.Catchup + alias Indexer.BlockFetcher.{Catchup, Realtime} # dialyzer thinks that Logger.debug functions always have no_local_return @dialyzer {:nowarn_function, import_range: 4} @@ -123,52 +123,15 @@ defmodule Indexer.BlockFetcher do end def handle_info(:realtime_index, %__MODULE__{} = state) do - %Task{ref: ref} = - realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> realtime_task(state) end) - - new_state = put_in(state.realtime_task_by_ref[ref], realtime_task) - - {:noreply, new_state} + {:noreply, Realtime.put(state)} end - def handle_info({ref, :ok = result}, %__MODULE__{realtime_task_by_ref: realtime_task_by_ref} = state) do - {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) - - case realtime_task do - nil -> - Logger.error(fn -> - "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> - " nor a realtime index Task ref returned result (#{inspect(result)})" - end) - - _ -> - :ok - end - - Process.demonitor(ref, [:flush]) - - {:noreply, %__MODULE__{state | realtime_task_by_ref: running_realtime_task_by_ref}} + def handle_info({ref, :ok} = message, %__MODULE__{} = state) when is_reference(ref) do + {:noreply, Realtime.handle_success(message, state)} end - def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{realtime_task_by_ref: realtime_task_by_ref} = state) do - {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) - - case realtime_task do - nil -> - Logger.error(fn -> - "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> - " nor a realtime index Task ref reports unknown pid (#{pid}) DOWN due to reason (#{reason}})" - end) - - _ -> - Logger.error(fn -> - "Realtime index stream exited with reason (#{inspect(reason)}). " <> - "The next realtime index task will fill the missing block " <> - "if the lastest block number has not advanced by then or the catch up index will fill the missing block." - end) - end - - {:noreply, %__MODULE__{state | realtime_task_by_ref: running_realtime_task_by_ref}} + def handle_info({:DOWN, _, :process, _, _} = message, %__MODULE__{} = state) do + {:noreply, Realtime.handle_failure(message, state)} end defp cap_seq(seq, next, range) do @@ -286,12 +249,6 @@ defmodule Indexer.BlockFetcher do |> InternalTransactionFetcher.async_fetch(10_000) end - defp realtime_task(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state) do - {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) - {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) - stream_import(state, seq, :realtime_index, max_concurrency: 1) - end - # Run at state.blocks_concurrency max_concurrency when called by `stream_import/3` # Only public for testing @doc false diff --git a/apps/indexer/lib/indexer/block_fetcher/realtime.ex b/apps/indexer/lib/indexer/block_fetcher/realtime.ex new file mode 100644 index 0000000000..6756f1eda4 --- /dev/null +++ b/apps/indexer/lib/indexer/block_fetcher/realtime.ex @@ -0,0 +1,69 @@ +defmodule Indexer.BlockFetcher.Realtime do + @moduledoc """ + Fetches and indexes block ranges from latest block forward. + """ + + require Logger + + import Indexer.BlockFetcher, only: [stream_import: 4] + + alias Indexer.{BlockFetcher, Sequence} + + @doc """ + Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` `realtime_task_by_ref`. + """ + def put(%BlockFetcher{} = state) do + %Task{ref: ref} = realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state]) + + put_in(state.realtime_task_by_ref[ref], realtime_task) + end + + def task(%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state) do + {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) + {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) + stream_import(state, seq, :realtime_index, max_concurrency: 1) + end + + def handle_success({ref, :ok = result}, %BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state) do + {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) + + case realtime_task do + nil -> + Logger.error(fn -> + "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> + " nor a realtime index Task ref returned result (#{inspect(result)})" + end) + + _ -> + :ok + end + + Process.demonitor(ref, [:flush]) + + %BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref} + end + + def handle_failure( + {:DOWN, ref, :process, pid, reason}, + %BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state + ) do + {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) + + case realtime_task do + nil -> + Logger.error(fn -> + "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> + " nor a realtime index Task ref reports unknown pid (#{pid}) DOWN due to reason (#{reason}})" + end) + + _ -> + Logger.error(fn -> + "Realtime index stream exited with reason (#{inspect(reason)}). " <> + "The next realtime index task will fill the missing block " <> + "if the lastest block number has not advanced by then or the catch up index will fill the missing block." + end) + end + + %BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref} + end +end