From 5120614c4e6e1c55adb7b8be08517cb18f2cae16 Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Thu, 24 May 2018 16:39:50 -0400 Subject: [PATCH] Implement retries --- apps/explorer/lib/explorer/buffered_task.ex | 28 ++++--- .../indexer/address_balance_fetcher.ex | 2 +- .../indexer/internal_transaction_fetcher.ex | 2 +- .../test/explorer/buffered_task_test.exs | 79 +++++++++++++------ 4 files changed, 70 insertions(+), 41 deletions(-) diff --git a/apps/explorer/lib/explorer/buffered_task.ex b/apps/explorer/lib/explorer/buffered_task.ex index 59a2719620..e51da706f2 100644 --- a/apps/explorer/lib/explorer/buffered_task.ex +++ b/apps/explorer/lib/explorer/buffered_task.ex @@ -8,7 +8,7 @@ defmodule Explorer.BufferedTask do @callback init(initial :: term, reducer :: function) :: {:ok, accumulated_results :: term | initial :: term} | {:error, reason :: term} - @callback run(entries :: list) :: :ok | {:retry, reason :: term} | {:halt, reason :: term} + @callback run(entries :: list, retries :: pos_integer) :: :ok | {:retry, reason :: term} | {:halt, reason :: term} @flush_interval :timer.seconds(3) @@ -83,11 +83,11 @@ defmodule Explorer.BufferedTask do end defp drop_task_and_retry(state, ref) do - batch = Map.fetch!(state.tasks, ref) + {batch, retries} = Map.fetch!(state.tasks, ref) state |> drop_task(ref) - |> buffer_entries(batch) + |> queue(batch, retries + 1) end defp buffer_entries(state, []), do: state @@ -97,12 +97,16 @@ defmodule Explorer.BufferedTask do {batch, overflow} = Enum.split(current_buffer, state.max_batch_size) if length(batch) == state.max_batch_size do - %{state | current_buffer: overflow, buffer: :queue.in(batch, state.buffer)} + queue(%{state | current_buffer: overflow}, batch, 0) else %{state | current_buffer: current_buffer} end end + defp queue(state, batch, retries) do + %{state | buffer: :queue.in({batch, retries}, state.buffer)} + end + defp do_initial_stream(state) do {0, []} |> state.callback_module.init(fn entry, {len, acc} -> @@ -130,7 +134,7 @@ defmodule Explorer.BufferedTask do defp take_batch(state) do case :queue.out(state.buffer) do {{:value, batch}, new_queue} -> {batch, new_queue} - {:empty, new_queue} -> {:halt, {[], new_queue}} + {:empty, new_queue} -> {[], new_queue} end end @@ -147,15 +151,15 @@ defmodule Explorer.BufferedTask do state = buffer_entries(state, entries) if Enum.count(state.tasks) < state.max_concurrency and :queue.len(state.buffer) > 0 do - {batch, new_queue} = take_batch(state) + {{batch, retries}, new_queue} = take_batch(state) task = Task.Supervisor.async_nolink(Explorer.TaskSupervisor, fn -> debug(state, fn -> "processing #{Enum.count(batch)} entries for #{inspect(state.callback_module)}" end) - {:performed, state.callback_module.run(batch)} + {:performed, state.callback_module.run(batch, retries)} end) - %{state | tasks: Map.put(state.tasks, task.ref, batch), buffer: new_queue} + %{state | tasks: Map.put(state.tasks, task.ref, {batch, retries}), buffer: new_queue} else state end @@ -168,11 +172,9 @@ defmodule Explorer.BufferedTask do defp flush(%{current_buffer: current} = state) do {batch, overflow} = Enum.split(current, state.max_batch_size) - flush(%{ - state - | buffer: :queue.in(batch, state.buffer), - current_buffer: overflow - }) + %{state | current_buffer: overflow} + |> queue(batch, 0) + |> flush() end defp debug(%{debug_logs: true}, func), do: Logger.debug(func) diff --git a/apps/explorer/lib/explorer/indexer/address_balance_fetcher.ex b/apps/explorer/lib/explorer/indexer/address_balance_fetcher.ex index a27af40498..c7bd9794db 100644 --- a/apps/explorer/lib/explorer/indexer/address_balance_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/address_balance_fetcher.ex @@ -19,7 +19,7 @@ defmodule Explorer.Indexer.AddressBalanceFetcher do end) end - def run(string_hashes) do + def run(string_hashes, _retries) do {:ok, results} = EthereumJSONRPC.fetch_balances_by_hash(string_hashes) :ok = Chain.update_balances(results) diff --git a/apps/explorer/lib/explorer/indexer/internal_transaction_fetcher.ex b/apps/explorer/lib/explorer/indexer/internal_transaction_fetcher.ex index 30cd6287ba..81cb998440 100644 --- a/apps/explorer/lib/explorer/indexer/internal_transaction_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/internal_transaction_fetcher.ex @@ -23,7 +23,7 @@ defmodule Explorer.Indexer.InternalTransactionFetcher do end) end - def run(transaction_hashes) do + def run(transaction_hashes, _retries) do case EthereumJSONRPC.fetch_internal_transactions(transaction_hashes) do {:ok, internal_params} -> {:ok, _} = Chain.import_internal_transactions(internal_params) diff --git a/apps/explorer/test/explorer/buffered_task_test.exs b/apps/explorer/test/explorer/buffered_task_test.exs index 9a170ec64b..74d5159179 100644 --- a/apps/explorer/test/explorer/buffered_task_test.exs +++ b/apps/explorer/test/explorer/buffered_task_test.exs @@ -20,27 +20,49 @@ defmodule Explorer.BufferedTaskTest do {:ok, Enum.reduce(initial_collection(), acc, fn item, acc -> reducer.(item, acc) end)} end - def run(batch) do + def run(batch, 0) do send(__MODULE__, {:run, batch}) :ok end end - defmodule FunTask do + defmodule EmptyTask do @behaviour BufferedTask def init(acc, _reducer) do {:ok, acc} end - def run([agent, func]) when is_function(func) do - count = Agent.get_and_update(agent, &{&1, &1 + 1}) - send(__MODULE__, {:run, count}) - func.(count) + def run(batch, 0) do + send(__MODULE__, {:run, batch}) + :ok end + end - def run(batch) do - send(__MODULE__, {:run, batch}) + defmodule RetryableTask do + @behaviour BufferedTask + + def init(acc, _reducer) do + {:ok, acc} + end + + def run([:boom], 0) do + send(__MODULE__, {:run, {0, :boom}}) + raise "boom" + end + + def run([:boom], 1) do + send(__MODULE__, {:run, {1, :boom}}) + :ok + end + + def run(batch, retries) when retries < 2 do + send(__MODULE__, {:run, {retries, batch}}) + {:retry, :because_reasons} + end + + def run(batch, retries) do + send(__MODULE__, {:final_run, {retries, batch}}) :ok end end @@ -69,8 +91,8 @@ defmodule Explorer.BufferedTaskTest do end test "init with zero entries schedules future buffer flushes" do - Process.register(self(), FunTask) - {:ok, buffer} = start_buffer(FunTask) + Process.register(self(), EmptyTask) + {:ok, buffer} = start_buffer(EmptyTask) refute_receive _ BufferedTask.buffer(buffer, ~w(some more entries)) @@ -80,23 +102,28 @@ defmodule Explorer.BufferedTaskTest do refute_receive _ end + @tag :capture_log + test "crashed runs are retried" do + Process.register(self(), RetryableTask) + {:ok, buffer} = start_buffer(RetryableTask) + + BufferedTask.buffer(buffer, [:boom]) + assert_receive {:run, {0, :boom}} + assert_receive {:run, {1, :boom}} + refute_receive _ + end + test "run/1 allows tasks to be programmatically retried" do - Process.register(self(), FunTask) - {:ok, buffer} = start_buffer(FunTask) - {:ok, count} = Agent.start_link(fn -> 1 end) - - BufferedTask.buffer(buffer, [ - count, - fn - 1 -> {:retry, :because_reasons} - 2 -> {:retry, :because_reasons} - 3 -> :ok - end - ]) - - assert_receive {:run, 1} - assert_receive {:run, 2} - assert_receive {:run, 3} + Process.register(self(), RetryableTask) + {:ok, buffer} = start_buffer(RetryableTask) + + BufferedTask.buffer(buffer, [1, 2, 3]) + assert_receive {:run, {0, [1, 2]}} + assert_receive {:run, {0, [3]}} + assert_receive {:run, {1, [1, 2]}} + assert_receive {:run, {1, [3]}} + assert_receive {:final_run, {2, [1, 2]}} + assert_receive {:final_run, {2, [3]}} refute_receive _ end end