Handle result from realtime tasks

Previously, BlockFetcher only had handle_info for the DOWN message from
realtime tasks, but when the reason is :normal, the task would have
succeeded and therefore, the BlockFetcher would also have received a
`{ref, :ok}` that was not being matched.
pull/439/head
Luke Imhoff 6 years ago
parent 6abe66f7b1
commit 9d66f4b84b
  1. 63
      apps/indexer/lib/indexer/block_fetcher.ex

@ -64,7 +64,7 @@ defmodule Indexer.BlockFetcher do
catchup_task: nil,
catchup_block_number: nil,
catchup_bound_interval: nil,
realtime_tasks: [],
realtime_task_by_ref: %{},
realtime_interval: nil,
blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
@ -103,12 +103,6 @@ defmodule Indexer.BlockFetcher do
{:noreply, %{state | catchup_task: catchup_task}}
end
def handle_info(:realtime_index, %__MODULE__{realtime_tasks: realtime_tasks} = state) when is_list(realtime_tasks) do
realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> realtime_task(state) end)
{:noreply, %{state | realtime_tasks: [realtime_task | realtime_tasks]}}
end
def handle_info(
{ref, missing_block_count},
%__MODULE__{
@ -152,26 +146,53 @@ defmodule Indexer.BlockFetcher do
{:noreply, %__MODULE__{state | catchup_task: nil}}
end
def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{realtime_tasks: realtime_tasks} = state)
when is_list(realtime_tasks) do
{down_realtime_tasks, running_realtime_tasks} =
Enum.split_with(realtime_tasks, fn
%Task{pid: ^pid, ref: ^ref} -> true
_ -> false
end)
def handle_info(:realtime_index, %__MODULE__{} = state) do
%Task{ref: ref} =
realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> realtime_task(state) end)
new_state = put_in(state.realtime_task_by_ref[ref], realtime_task)
{:noreply, new_state}
end
def handle_info({ref, :ok = result}, %__MODULE__{realtime_task_by_ref: realtime_task_by_ref} = state) do
{realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref)
case realtime_task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref returned result (#{inspect(result)})"
end)
case {Enum.empty?(down_realtime_tasks), reason} do
{true, :normal} ->
_ ->
:ok
end
Process.demonitor(ref, [:flush])
{:noreply, %__MODULE__{state | realtime_task_by_ref: running_realtime_task_by_ref}}
end
def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{realtime_task_by_ref: realtime_task_by_ref} = state) do
{realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref)
{true, reason} ->
Logger.error(fn -> "Realtime index stream exited with reason (#{inspect(reason)}). Restarting" end)
case realtime_task do
nil ->
Logger.error(fn ->
"Unknown ref (#{inspect(ref)}) that is neither the catchup index" <>
" nor a realtime index Task ref reports unknown pid (#{pid}) DOWN due to reason (#{reason}})"
end)
{_, reason} ->
Logger.error(fn -> "Unexpected pid (#{inspect(pid)}) exited with reason (#{inspect(reason)})." end)
_ ->
Logger.error(fn ->
"Realtime index stream exited with reason (#{inspect(reason)}). " <>
"The next realtime index task will fill the missing block " <>
"if the lastest block number has not advanced by then or the catch up index will fill the missing block."
end)
end
{:noreply, %__MODULE__{state | realtime_tasks: running_realtime_tasks}}
{:noreply, %__MODULE__{state | realtime_task_by_ref: running_realtime_task_by_ref}}
end
defp cap_seq(seq, next, range) do

Loading…
Cancel
Save