Merge pull request #1001 from poanetwork/ac-undefined-function-enqueue

Fix for undefined function error in TokenTransfer Uncatalogued Worker Module
pull/1015/head
Luke Imhoff 6 years ago committed by GitHub
commit 1e6d5f135a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      apps/indexer/lib/indexer/token_transfer/uncataloged/worker.ex
  2. 19
      apps/indexer/test/indexer/token_transfer/uncataloged/worker_test.exs

@ -9,6 +9,8 @@ defmodule Indexer.TokenTransfer.Uncataloged.Worker do
use GenServer
require Logger
alias Explorer.Chain
alias Indexer.Block.Catchup.Fetcher
alias Indexer.TokenTransfer.Uncataloged
@ -57,13 +59,13 @@ defmodule Indexer.TokenTransfer.Uncataloged.Worker do
{:noreply, state}
block_numbers ->
Process.send_after(self(), :enqueue_blocks, state.retry_interval)
Process.send_after(self(), :push_front_blocks, state.retry_interval)
{:noreply, %{state | block_numbers: block_numbers}}
end
end
def handle_info(:enqueue_blocks, %{block_numbers: block_numbers} = state) do
%Task{ref: ref} = async_enqueue(block_numbers)
def handle_info(:push_front_blocks, %{block_numbers: block_numbers} = state) do
%Task{ref: ref} = async_push_front(block_numbers)
{:noreply, %{state | task_ref: ref}}
end
@ -73,18 +75,21 @@ defmodule Indexer.TokenTransfer.Uncataloged.Worker do
{:stop, :shutdown}
end
def handle_info({ref, {:error, :queue_unavailable}}, %{task_ref: ref, retry_interval: millis} = state) do
def handle_info({ref, {:error, reason}}, %{task_ref: ref, retry_interval: millis} = state) do
Logger.error(fn -> inspect(reason) end)
Process.demonitor(ref, [:flush])
Process.send_after(self(), :enqueue_blocks, millis)
Process.send_after(self(), :push_front_blocks, millis)
{:noreply, %{state | task_ref: nil}}
end
def handle_info({:DOWN, ref, :process, _, _}, %{task_ref: ref, retry_interval: millis} = state) do
Process.send_after(self(), :enqueue_blocks, millis)
Process.send_after(self(), :push_front_blocks, millis)
{:noreply, %{state | task_ref: nil}}
end
defp async_enqueue(block_numbers) do
Task.Supervisor.async_nolink(Uncataloged.TaskSupervisor, Fetcher, :enqueue, [block_numbers])
defp async_push_front(block_numbers) do
Task.Supervisor.async_nolink(Uncataloged.TaskSupervisor, Fetcher, :push_front, [block_numbers])
end
end

@ -1,8 +1,11 @@
defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do
use Explorer.DataCase
alias Indexer.Sequence
alias Indexer.TokenTransfer.Uncataloged.{Worker, TaskSupervisor}
@moduletag :capture_log
describe "start_link/1" do
test "starts the worker" do
assert {:ok, _pid} = Worker.start_link(supervisor: self())
@ -32,17 +35,21 @@ defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do
state = %{task_ref: nil, block_numbers: [], retry_interval: 1}
assert {:noreply, ^expected_state} = Worker.handle_info(:scan, state)
assert_receive :enqueue_blocks
assert_receive :push_front_blocks
end
end
describe "handle_info with :enqueue_blocks" do
describe "handle_info with :push_front_blocks" do
test "starts a task" do
task_sup_pid = start_supervised!({Task.Supervisor, name: TaskSupervisor})
start_supervised!({Sequence, [[ranges: [], step: -1], [name: :block_catchup_sequencer]]})
state = %{task_ref: nil, block_numbers: [1]}
assert {:noreply, new_state} = Worker.handle_info(:enqueue_blocks, state)
assert is_reference(new_state.task_ref)
assert {:noreply, %{task_ref: task_ref}} = Worker.handle_info(:push_front_blocks, state)
assert is_reference(task_ref)
refute_receive {^task_ref, {:error, :queue_unavailable}}
assert_receive {^task_ref, :ok}
stop_supervised(task_sup_pid)
end
@ -61,7 +68,7 @@ defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1}
expected_state = %{state | task_ref: nil}
assert {:noreply, ^expected_state} = Worker.handle_info({ref, {:error, :queue_unavailable}}, state)
assert_receive :enqueue_blocks
assert_receive :push_front_blocks
end
end
@ -70,7 +77,7 @@ defmodule Indexer.TokenTransfer.Uncataloged.WorkerTest do
ref = Process.monitor(self())
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1}
assert {:noreply, %{task_ref: nil}} = Worker.handle_info({:DOWN, ref, :process, self(), :EXIT}, state)
assert_receive :enqueue_blocks
assert_receive :push_front_blocks
end
end
end

Loading…
Cancel
Save