|
|
@ -8,7 +8,7 @@ defmodule Explorer.BufferedTask do |
|
|
|
@callback init(initial :: term, reducer :: function) :: |
|
|
|
@callback init(initial :: term, reducer :: function) :: |
|
|
|
{:ok, accumulated_results :: term | initial :: term} | {:error, reason :: term} |
|
|
|
{:ok, accumulated_results :: term | initial :: term} | {:error, reason :: term} |
|
|
|
|
|
|
|
|
|
|
|
@callback run(entries :: list) :: :ok | {:retry, reason :: term} | {:halt, reason :: term} |
|
|
|
@callback run(entries :: list, retries :: pos_integer) :: :ok | {:retry, reason :: term} | {:halt, reason :: term} |
|
|
|
|
|
|
|
|
|
|
|
@flush_interval :timer.seconds(3) |
|
|
|
@flush_interval :timer.seconds(3) |
|
|
|
|
|
|
|
|
|
|
@ -83,11 +83,11 @@ defmodule Explorer.BufferedTask do |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp drop_task_and_retry(state, ref) do |
|
|
|
defp drop_task_and_retry(state, ref) do |
|
|
|
batch = Map.fetch!(state.tasks, ref) |
|
|
|
{batch, retries} = Map.fetch!(state.tasks, ref) |
|
|
|
|
|
|
|
|
|
|
|
state |
|
|
|
state |
|
|
|
|> drop_task(ref) |
|
|
|
|> drop_task(ref) |
|
|
|
|> buffer_entries(batch) |
|
|
|
|> queue(batch, retries + 1) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp buffer_entries(state, []), do: state |
|
|
|
defp buffer_entries(state, []), do: state |
|
|
@ -97,12 +97,16 @@ defmodule Explorer.BufferedTask do |
|
|
|
{batch, overflow} = Enum.split(current_buffer, state.max_batch_size) |
|
|
|
{batch, overflow} = Enum.split(current_buffer, state.max_batch_size) |
|
|
|
|
|
|
|
|
|
|
|
if length(batch) == state.max_batch_size do |
|
|
|
if length(batch) == state.max_batch_size do |
|
|
|
%{state | current_buffer: overflow, buffer: :queue.in(batch, state.buffer)} |
|
|
|
queue(%{state | current_buffer: overflow}, batch, 0) |
|
|
|
else |
|
|
|
else |
|
|
|
%{state | current_buffer: current_buffer} |
|
|
|
%{state | current_buffer: current_buffer} |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
defp queue(state, batch, retries) do |
|
|
|
|
|
|
|
%{state | buffer: :queue.in({batch, retries}, state.buffer)} |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp do_initial_stream(state) do |
|
|
|
defp do_initial_stream(state) do |
|
|
|
{0, []} |
|
|
|
{0, []} |
|
|
|
|> state.callback_module.init(fn entry, {len, acc} -> |
|
|
|
|> state.callback_module.init(fn entry, {len, acc} -> |
|
|
@ -130,7 +134,7 @@ defmodule Explorer.BufferedTask do |
|
|
|
defp take_batch(state) do |
|
|
|
defp take_batch(state) do |
|
|
|
case :queue.out(state.buffer) do |
|
|
|
case :queue.out(state.buffer) do |
|
|
|
{{:value, batch}, new_queue} -> {batch, new_queue} |
|
|
|
{{:value, batch}, new_queue} -> {batch, new_queue} |
|
|
|
{:empty, new_queue} -> {:halt, {[], new_queue}} |
|
|
|
{:empty, new_queue} -> {[], new_queue} |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
@ -147,15 +151,15 @@ defmodule Explorer.BufferedTask do |
|
|
|
state = buffer_entries(state, entries) |
|
|
|
state = buffer_entries(state, entries) |
|
|
|
|
|
|
|
|
|
|
|
if Enum.count(state.tasks) < state.max_concurrency and :queue.len(state.buffer) > 0 do |
|
|
|
if Enum.count(state.tasks) < state.max_concurrency and :queue.len(state.buffer) > 0 do |
|
|
|
{batch, new_queue} = take_batch(state) |
|
|
|
{{batch, retries}, new_queue} = take_batch(state) |
|
|
|
|
|
|
|
|
|
|
|
task = |
|
|
|
task = |
|
|
|
Task.Supervisor.async_nolink(Explorer.TaskSupervisor, fn -> |
|
|
|
Task.Supervisor.async_nolink(Explorer.TaskSupervisor, fn -> |
|
|
|
debug(state, fn -> "processing #{Enum.count(batch)} entries for #{inspect(state.callback_module)}" end) |
|
|
|
debug(state, fn -> "processing #{Enum.count(batch)} entries for #{inspect(state.callback_module)}" end) |
|
|
|
{:performed, state.callback_module.run(batch)} |
|
|
|
{:performed, state.callback_module.run(batch, retries)} |
|
|
|
end) |
|
|
|
end) |
|
|
|
|
|
|
|
|
|
|
|
%{state | tasks: Map.put(state.tasks, task.ref, batch), buffer: new_queue} |
|
|
|
%{state | tasks: Map.put(state.tasks, task.ref, {batch, retries}), buffer: new_queue} |
|
|
|
else |
|
|
|
else |
|
|
|
state |
|
|
|
state |
|
|
|
end |
|
|
|
end |
|
|
@ -168,11 +172,9 @@ defmodule Explorer.BufferedTask do |
|
|
|
defp flush(%{current_buffer: current} = state) do |
|
|
|
defp flush(%{current_buffer: current} = state) do |
|
|
|
{batch, overflow} = Enum.split(current, state.max_batch_size) |
|
|
|
{batch, overflow} = Enum.split(current, state.max_batch_size) |
|
|
|
|
|
|
|
|
|
|
|
flush(%{ |
|
|
|
%{state | current_buffer: overflow} |
|
|
|
state |
|
|
|
|> queue(batch, 0) |
|
|
|
| buffer: :queue.in(batch, state.buffer), |
|
|
|
|> flush() |
|
|
|
current_buffer: overflow |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp debug(%{debug_logs: true}, func), do: Logger.debug(func) |
|
|
|
defp debug(%{debug_logs: true}, func), do: Logger.debug(func) |
|
|
|