|
|
@ -152,7 +152,7 @@ defmodule Explorer.BufferedTask do |
|
|
|
|
|
|
|
|
|
|
|
state |
|
|
|
state |
|
|
|
|> drop_task(ref) |
|
|
|
|> drop_task(ref) |
|
|
|
|> queue(batch, retries + 1) |
|
|
|
|> queue_in_state(batch, retries + 1) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp buffer_entries(state, []), do: state |
|
|
|
defp buffer_entries(state, []), do: state |
|
|
@ -161,12 +161,12 @@ defmodule Explorer.BufferedTask do |
|
|
|
%{state | current_buffer: [entries | state.current_buffer]} |
|
|
|
%{state | current_buffer: [entries | state.current_buffer]} |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp queue(%{} = state, batch, retries) do |
|
|
|
defp queue_in_state(%{} = state, batch, retries) do |
|
|
|
%{state | buffer: queue(state.buffer, batch, retries)} |
|
|
|
%{state | buffer: queue_in_queue(state.buffer, batch, retries)} |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp queue({_, _} = que, batch, retries) do |
|
|
|
defp queue_in_queue(queue, batch, retries) do |
|
|
|
:queue.in({batch, retries}, que) |
|
|
|
:queue.in({batch, retries}, queue) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp do_initial_stream(%{init_chunk_size: init_chunk_size} = state) do |
|
|
|
defp do_initial_stream(%{init_chunk_size: init_chunk_size} = state) do |
|
|
@ -204,7 +204,7 @@ defmodule Explorer.BufferedTask do |
|
|
|
entries |
|
|
|
entries |
|
|
|
|> Enum.reverse() |
|
|
|
|> Enum.reverse() |
|
|
|
|> Enum.chunk_every(state.max_batch_size) |
|
|
|
|> Enum.chunk_every(state.max_batch_size) |
|
|
|
|> Enum.reduce(:queue.new(), fn batch, acc -> queue(acc, batch, 0) end) |
|
|
|
|> Enum.reduce(:queue.new(), fn batch, acc -> queue_in_queue(acc, batch, 0) end) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp take_batch(state) do |
|
|
|
defp take_batch(state) do |
|
|
@ -247,7 +247,7 @@ defmodule Explorer.BufferedTask do |
|
|
|
|> List.flatten() |
|
|
|
|> List.flatten() |
|
|
|
|> Enum.chunk_every(state.max_batch_size) |
|
|
|
|> Enum.chunk_every(state.max_batch_size) |
|
|
|
|> Enum.reduce(%{state | current_buffer: []}, fn batch, state_acc -> |
|
|
|
|> Enum.reduce(%{state | current_buffer: []}, fn batch, state_acc -> |
|
|
|
queue(state_acc, batch, 0) |
|
|
|
queue_in_state(state_acc, batch, 0) |
|
|
|
end) |
|
|
|
end) |
|
|
|
|> flush() |
|
|
|
|> flush() |
|
|
|
end |
|
|
|
end |
|
|
|