BlockFetcher.realtime_task -> BlockFetcher.Realtime.task

pull/489/head
Luke Imhoff 6 years ago
parent 2afa934225
commit 807f5e6d7c
  1. 55
      apps/indexer/lib/indexer/block_fetcher.ex
  2. 69
      apps/indexer/lib/indexer/block_fetcher/realtime.ex

@ -11,7 +11,7 @@ defmodule Indexer.BlockFetcher do
alias Explorer.Chain alias Explorer.Chain
alias Indexer.{BalanceFetcher, AddressExtraction, BoundInterval, InternalTransactionFetcher, Sequence} alias Indexer.{BalanceFetcher, AddressExtraction, BoundInterval, InternalTransactionFetcher, Sequence}
alias Indexer.BlockFetcher.Catchup alias Indexer.BlockFetcher.{Catchup, Realtime}
# dialyzer thinks that Logger.debug functions always have no_local_return # dialyzer thinks that Logger.debug functions always have no_local_return
@dialyzer {:nowarn_function, import_range: 4} @dialyzer {:nowarn_function, import_range: 4}
@ -123,52 +123,15 @@ defmodule Indexer.BlockFetcher do
end end
def handle_info(:realtime_index, %__MODULE__{} = state) do def handle_info(:realtime_index, %__MODULE__{} = state) do
%Task{ref: ref} = {:noreply, Realtime.put(state)}
realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> realtime_task(state) end)
new_state = put_in(state.realtime_task_by_ref[ref], realtime_task)
{:noreply, new_state}
end end
def handle_info({ref, :ok = result}, %__MODULE__{realtime_task_by_ref: realtime_task_by_ref} = state) do def handle_info({ref, :ok} = message, %__MODULE__{} = state) when is_reference(ref) do
{realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) {:noreply, Realtime.handle_success(message, state)}
case realtime_task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref returned result (#{inspect(result)})"
end)
_ ->
:ok
end end
Process.demonitor(ref, [:flush]) def handle_info({:DOWN, _, :process, _, _} = message, %__MODULE__{} = state) do
{:noreply, Realtime.handle_failure(message, state)}
{:noreply, %__MODULE__{state | realtime_task_by_ref: running_realtime_task_by_ref}}
end
def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{realtime_task_by_ref: realtime_task_by_ref} = state) do
{realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref)
case realtime_task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref reports unknown pid (#{pid}) DOWN due to reason (#{reason}})"
end)
_ ->
Logger.error(fn ->
"Realtime index stream exited with reason (#{inspect(reason)}). " <>
"The next realtime index task will fill the missing block " <>
"if the lastest block number has not advanced by then or the catch up index will fill the missing block."
end)
end
{:noreply, %__MODULE__{state | realtime_task_by_ref: running_realtime_task_by_ref}}
end end
defp cap_seq(seq, next, range) do defp cap_seq(seq, next, range) do
@ -286,12 +249,6 @@ defmodule Indexer.BlockFetcher do
|> InternalTransactionFetcher.async_fetch(10_000) |> InternalTransactionFetcher.async_fetch(10_000)
end end
defp realtime_task(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
{:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2)
stream_import(state, seq, :realtime_index, max_concurrency: 1)
end
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/3` # Run at state.blocks_concurrency max_concurrency when called by `stream_import/3`
# Only public for testing # Only public for testing
@doc false @doc false

@ -0,0 +1,69 @@
defmodule Indexer.BlockFetcher.Realtime do
@moduledoc """
Fetches and indexes block ranges from latest block forward.
"""
require Logger
import Indexer.BlockFetcher, only: [stream_import: 4]
alias Indexer.{BlockFetcher, Sequence}
@doc """
Starts `task/1` and puts it in `t:Indexer.BlockFetcher.t/0` `realtime_task_by_ref`.
"""
def put(%BlockFetcher{} = state) do
%Task{ref: ref} = realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, __MODULE__, :task, [state])
put_in(state.realtime_task_by_ref[ref], realtime_task)
end
def task(%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
{:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2)
stream_import(state, seq, :realtime_index, max_concurrency: 1)
end
def handle_success({ref, :ok = result}, %BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state) do
{realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref)
case realtime_task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref returned result (#{inspect(result)})"
end)
_ ->
:ok
end
Process.demonitor(ref, [:flush])
%BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref}
end
def handle_failure(
{:DOWN, ref, :process, pid, reason},
%BlockFetcher{realtime_task_by_ref: realtime_task_by_ref} = state
) do
{realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref)
case realtime_task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref reports unknown pid (#{pid}) DOWN due to reason (#{reason}})"
end)
_ ->
Logger.error(fn ->
"Realtime index stream exited with reason (#{inspect(reason)}). " <>
"The next realtime index task will fill the missing block " <>
"if the lastest block number has not advanced by then or the catch up index will fill the missing block."
end)
end
%BlockFetcher{state | realtime_task_by_ref: running_realtime_task_by_ref}
end
end
Loading…
Cancel
Save