From 5892c8357b63008fddb3b34ecc20f4560f8ce01c Mon Sep 17 00:00:00 2001 From: Andrew Cravenho Date: Thu, 25 Oct 2018 20:10:10 -0400 Subject: [PATCH] enqueue fix for push_front Co-authored-by: Luke Imhoff --- .../token_transfer/uncataloged/worker.ex | 21 ++++++++++++------- .../uncataloged/worker_test.exs | 19 +++++++++++------ 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/apps/indexer/lib/indexer/token_transfer/uncataloged/worker.ex b/apps/indexer/lib/indexer/token_transfer/uncataloged/worker.ex index 91fa4b6e24..0d053895ee 100644 --- a/apps/indexer/lib/indexer/token_transfer/uncataloged/worker.ex +++ b/apps/indexer/lib/indexer/token_transfer/uncataloged/worker.ex @@ -9,6 +9,8 @@ defmodule Indexer.TokenTransfer.Uncataloged.Worker do use GenServer + require Logger + alias Explorer.Chain alias Indexer.Block.Catchup.Fetcher alias Indexer.TokenTransfer.Uncataloged @@ -57,13 +59,13 @@ defmodule Indexer.TokenTransfer.Uncataloged.Worker do {:noreply, state} block_numbers -> - Process.send_after(self(), :enqueue_blocks, state.retry_interval) + Process.send_after(self(), :push_front_blocks, state.retry_interval) {:noreply, %{state | block_numbers: block_numbers}} end end - def handle_info(:enqueue_blocks, %{block_numbers: block_numbers} = state) do - %Task{ref: ref} = async_enqueue(block_numbers) + def handle_info(:push_front_blocks, %{block_numbers: block_numbers} = state) do + %Task{ref: ref} = async_push_front(block_numbers) {:noreply, %{state | task_ref: ref}} end @@ -73,18 +75,21 @@ defmodule Indexer.TokenTransfer.Uncataloged.Worker do {:stop, :shutdown} end - def handle_info({ref, {:error, :queue_unavailable}}, %{task_ref: ref, retry_interval: millis} = state) do + def handle_info({ref, {:error, reason}}, %{task_ref: ref, retry_interval: millis} = state) do + Logger.error(fn -> inspect(reason) end) + Process.demonitor(ref, [:flush]) - Process.send_after(self(), :enqueue_blocks, millis) + Process.send_after(self(), :push_front_blocks, millis) + {:noreply, %{state | task_ref: nil}} end def handle_info({:DOWN, ref, :process, _, _}, %{task_ref: ref, retry_interval: millis} = state) do - Process.send_after(self(), :enqueue_blocks, millis) + Process.send_after(self(), :push_front_blocks, millis) {:noreply, %{state | task_ref: nil}} end - defp async_enqueue(block_numbers) do - Task.Supervisor.async_nolink(Uncataloged.TaskSupervisor, Fetcher, :enqueue, [block_numbers]) + defp async_push_front(block_numbers) do + Task.Supervisor.async_nolink(Uncataloged.TaskSupervisor, Fetcher, :push_front, [block_numbers]) end end diff --git a/apps/indexer/test/indexer/token_transfer/uncataloged/worker_test.exs b/apps/indexer/test/indexer/token_transfer/uncataloged/worker_test.exs index 5507976a92..b1cd8a2221 100644 --- a/apps/indexer/test/indexer/token_transfer/uncataloged/worker_test.exs +++ b/apps/indexer/test/indexer/token_transfer/uncataloged/worker_test.exs @@ -1,8 +1,11 @@ defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do use Explorer.DataCase + alias Indexer.Sequence alias Indexer.TokenTransfer.Uncataloged.{Worker, TaskSupervisor} + @moduletag :capture_log + describe "start_link/1" do test "starts the worker" do assert {:ok, _pid} = Worker.start_link(supervisor: self()) @@ -32,17 +35,21 @@ defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do state = %{task_ref: nil, block_numbers: [], retry_interval: 1} assert {:noreply, ^expected_state} = Worker.handle_info(:scan, state) - assert_receive :enqueue_blocks + assert_receive :push_front_blocks end end - describe "handle_info with :enqueue_blocks" do + describe "handle_info with :push_front_blocks" do test "starts a task" do task_sup_pid = start_supervised!({Task.Supervisor, name: TaskSupervisor}) + start_supervised!({Sequence, [[ranges: [], step: -1], [name: :block_catchup_sequencer]]}) state = %{task_ref: nil, block_numbers: [1]} - assert {:noreply, new_state} = Worker.handle_info(:enqueue_blocks, state) - assert is_reference(new_state.task_ref) + assert {:noreply, %{task_ref: task_ref}} = Worker.handle_info(:push_front_blocks, state) + assert is_reference(task_ref) + + refute_receive {^task_ref, {:error, :queue_unavailable}} + assert_receive {^task_ref, :ok} stop_supervised(task_sup_pid) end @@ -61,7 +68,7 @@ defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} expected_state = %{state | task_ref: nil} assert {:noreply, ^expected_state} = Worker.handle_info({ref, {:error, :queue_unavailable}}, state) - assert_receive :enqueue_blocks + assert_receive :push_front_blocks end end @@ -70,7 +77,7 @@ defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do ref = Process.monitor(self()) state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} assert {:noreply, %{task_ref: nil}} = Worker.handle_info({:DOWN, ref, :process, self(), :EXIT}, state) - assert_receive :enqueue_blocks + assert_receive :push_front_blocks end end end