diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 2fe51a6ef1..86b54385ad 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -64,7 +64,7 @@ defmodule Indexer.BlockFetcher do catchup_task: nil, catchup_block_number: nil, catchup_bound_interval: nil, - realtime_tasks: [], + realtime_task_by_ref: %{}, realtime_interval: nil, blocks_batch_size: @blocks_batch_size, blocks_concurrency: @blocks_concurrency, @@ -103,12 +103,6 @@ defmodule Indexer.BlockFetcher do {:noreply, %{state | catchup_task: catchup_task}} end - def handle_info(:realtime_index, %__MODULE__{realtime_tasks: realtime_tasks} = state) when is_list(realtime_tasks) do - realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> realtime_task(state) end) - - {:noreply, %{state | realtime_tasks: [realtime_task | realtime_tasks]}} - end - def handle_info( {ref, missing_block_count}, %__MODULE__{ @@ -152,26 +146,53 @@ defmodule Indexer.BlockFetcher do {:noreply, %__MODULE__{state | catchup_task: nil}} end - def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{realtime_tasks: realtime_tasks} = state) - when is_list(realtime_tasks) do - {down_realtime_tasks, running_realtime_tasks} = - Enum.split_with(realtime_tasks, fn - %Task{pid: ^pid, ref: ^ref} -> true - _ -> false - end) + def handle_info(:realtime_index, %__MODULE__{} = state) do + %Task{ref: ref} = + realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> realtime_task(state) end) + + new_state = put_in(state.realtime_task_by_ref[ref], realtime_task) + + {:noreply, new_state} + end + + def handle_info({ref, :ok = result}, %__MODULE__{realtime_task_by_ref: realtime_task_by_ref} = state) do + {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) + + case realtime_task do + nil -> + Logger.error(fn -> + "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> + " nor a realtime index Task ref returned result (#{inspect(result)})" + end) - case {Enum.empty?(down_realtime_tasks), reason} do - {true, :normal} -> + _ -> :ok + end + + Process.demonitor(ref, [:flush]) + + {:noreply, %__MODULE__{state | realtime_task_by_ref: running_realtime_task_by_ref}} + end + + def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{realtime_task_by_ref: realtime_task_by_ref} = state) do + {realtime_task, running_realtime_task_by_ref} = Map.pop(realtime_task_by_ref, ref) - {true, reason} -> - Logger.error(fn -> "Realtime index stream exited with reason (#{inspect(reason)}). Restarting" end) + case realtime_task do + nil -> + Logger.error(fn -> + "Unknown ref (#{inspect(ref)}) that is neither the catchup index" <> + " nor a realtime index Task ref reports unknown pid (#{pid}) DOWN due to reason (#{reason}})" + end) - {_, reason} -> - Logger.error(fn -> "Unexpected pid (#{inspect(pid)}) exited with reason (#{inspect(reason)})." end) + _ -> + Logger.error(fn -> + "Realtime index stream exited with reason (#{inspect(reason)}). " <> + "The next realtime index task will fill the missing block " <> + "if the lastest block number has not advanced by then or the catch up index will fill the missing block." + end) end - {:noreply, %__MODULE__{state | realtime_tasks: running_realtime_tasks}} + {:noreply, %__MODULE__{state | realtime_task_by_ref: running_realtime_task_by_ref}} end defp cap_seq(seq, next, range) do