Allow BufferTask retry to change batch

Allows shrink batch when it partially works.
pull/308/head
Luke Imhoff 7 years ago
parent f7d79b666a
commit 01b5fb2120
  1. 9
      apps/explorer/lib/explorer/buffered_task.ex

@ -113,6 +113,7 @@ defmodule Explorer.BufferedTask do
* `:ok` - run was successful * `:ok` - run was successful
* `:retry` - run should be retried after it failed * `:retry` - run should be retried after it failed
* `{:retry, new_entries :: list}` - run should be retried with `new_entries`
""" """
@callback run(entries, retries :: pos_integer) :: :ok | :retry @callback run(entries, retries :: pos_integer) :: :ok | :retry
@ -208,6 +209,10 @@ defmodule Explorer.BufferedTask do
{:noreply, drop_task_and_retry(state, ref)} {:noreply, drop_task_and_retry(state, ref)}
end end
def handle_info({ref, {:performed, {:retry, retryable_entries}}}, state) do
{:noreply, drop_task_and_retry(state, ref, retryable_entries)}
end
def handle_info({ref, :ok}, %{init_task: ref} = state) do def handle_info({ref, :ok}, %{init_task: ref} = state) do
{:noreply, state} {:noreply, state}
end end
@ -243,12 +248,12 @@ defmodule Explorer.BufferedTask do
spawn_next_batch(%{state | tasks: Map.delete(state.tasks, ref)}) spawn_next_batch(%{state | tasks: Map.delete(state.tasks, ref)})
end end
defp drop_task_and_retry(state, ref) do defp drop_task_and_retry(state, ref, new_batch \\ nil) do
{batch, retries} = Map.fetch!(state.tasks, ref) {batch, retries} = Map.fetch!(state.tasks, ref)
state state
|> drop_task(ref) |> drop_task(ref)
|> queue_in_state(batch, retries + 1) |> queue_in_state(new_batch || batch, retries + 1)
end end
defp buffer_entries(state, []), do: state defp buffer_entries(state, []), do: state

Loading…
Cancel
Save