Rework buffer chunking

pull/218/head
Chris McCord 7 years ago committed by Luke Imhoff
parent e813045564
commit 3d0d5d956d
  1. 62
      apps/explorer/lib/explorer/buffered_task.ex
  2. 6
      apps/explorer/test/explorer/buffered_task_test.exs

@ -33,6 +33,7 @@ defmodule Explorer.BufferedTask do
flush_interval: Keyword.get(opts, :flush_interval, @flush_interval),
max_batch_size: Keyword.fetch!(opts, :max_batch_size),
max_concurrency: Keyword.fetch!(opts, :max_concurrency),
current_buffer: [],
buffer: :queue.new(),
tasks: %{}
}
@ -45,7 +46,7 @@ defmodule Explorer.BufferedTask do
end
def handle_info(:flush, state) do
{:noreply, state |> spawn_next_batch([]) |> schedule_next_buffer_flush()}
{:noreply, flush(state)}
end
def handle_info({:async_perform, entries}, state) do
@ -89,20 +90,29 @@ defmodule Explorer.BufferedTask do
|> buffer_entries(batch)
end
defp buffer_entries(state, []), do: state
defp buffer_entries(state, entries) do
%{state | buffer: :queue.join(state.buffer, :queue.from_list(entries))}
current_buffer = entries ++ state.current_buffer
{batch, overflow} = Enum.split(current_buffer, state.max_batch_size)
if length(batch) == state.max_batch_size do
%{state | current_buffer: overflow, buffer: :queue.in(batch, state.buffer)}
else
%{state | current_buffer: current_buffer}
end
end
defp do_initial_stream(state) do
state.buffer
|> state.callback_module.init(fn entry, acc ->
batch = :queue.in(entry, acc)
{0, []}
|> state.callback_module.init(fn entry, {len, acc} ->
batch = [entry | acc]
if :queue.len(batch) >= state.max_batch_size do
schedule_async_perform(:queue.to_list(batch))
:queue.new()
if len + 1 >= state.max_batch_size do
schedule_async_perform(Enum.reverse(batch))
{0, []}
else
batch
{len + 1, batch}
end
end)
|> catchup_remaining()
@ -110,24 +120,18 @@ defmodule Explorer.BufferedTask do
schedule_next_buffer_flush(state)
end
defp catchup_remaining({:ok, batch}) do
if :queue.len(batch) > 0 do
schedule_async_perform(:queue.to_list(batch))
end
defp catchup_remaining({:ok, {0, []}}), do: :ok
defp catchup_remaining({:ok, {_len, batch}}) do
schedule_async_perform(Enum.reverse(batch))
:ok
end
defp take_batch(state) do
{entries, remaining_queue} =
Enum.reduce_while(1..state.max_batch_size, {[], state.buffer}, fn _, {entries, queue_acc} ->
case :queue.out(queue_acc) do
{{:value, entry}, new_queue} -> {:cont, {[entry | entries], new_queue}}
{:empty, new_queue} -> {:halt, {entries, new_queue}}
end
end)
{Enum.reverse(entries), remaining_queue}
case :queue.out(state.buffer) do
{{:value, batch}, new_queue} -> {batch, new_queue}
{:empty, new_queue} -> {:halt, {[], new_queue}}
end
end
defp schedule_async_perform(entries, after_ms \\ 0) do
@ -157,6 +161,20 @@ defmodule Explorer.BufferedTask do
end
end
defp flush(%{current_buffer: []} = state) do
state |> spawn_next_batch([]) |> schedule_next_buffer_flush()
end
defp flush(%{current_buffer: current} = state) do
{batch, overflow} = Enum.split(current, state.max_batch_size)
flush(%{
state
| buffer: :queue.in(batch, state.buffer),
current_buffer: overflow
})
end
defp debug(%{debug_logs: true}, func), do: Logger.debug(func)
defp debug(%{debug_logs: false}, _func), do: :noop
end

@ -45,7 +45,7 @@ defmodule Explorer.BufferedTaskTest do
end
end
test "init allows buffer to be loaded up with initial entries" do
test "init buffers initial entries then executes on-demand entries" do
Process.register(self(), CounterTask)
{:ok, buffer} = start_buffer(CounterTask)
@ -62,6 +62,10 @@ defmodule Explorer.BufferedTaskTest do
assert_receive {:run, ~w(14 15)}
assert_receive {:run, ~w(16)}
refute_receive _
BufferedTask.buffer(buffer, ~w(17))
assert_receive {:run, ~w(17)}
refute_receive _
end
test "init with zero entries schedules future buffer flushes" do

Loading…
Cancel
Save