Don't store pid in BufferedTask state

pull/971/head
Luke Imhoff 6 years ago
parent b436d8bf5d
commit 7abc419617
  1. 38
      apps/indexer/lib/indexer/buffered_task.ex

@ -53,7 +53,6 @@ defmodule Indexer.BufferedTask do
alias Indexer.BufferedTask alias Indexer.BufferedTask
@enforce_keys [ @enforce_keys [
:pid,
:callback_module, :callback_module,
:callback_module_state, :callback_module_state,
:task_supervisor, :task_supervisor,
@ -61,8 +60,7 @@ defmodule Indexer.BufferedTask do
:max_batch_size, :max_batch_size,
:init_chunk_size :init_chunk_size
] ]
defstruct pid: nil, defstruct init_task: nil,
init_task: nil,
flush_timer: nil, flush_timer: nil,
callback_module: nil, callback_module: nil,
callback_module_state: nil, callback_module_state: nil,
@ -222,7 +220,6 @@ defmodule Indexer.BufferedTask do
send(self(), :initial_stream) send(self(), :initial_stream)
state = %BufferedTask{ state = %BufferedTask{
pid: self(),
callback_module: callback_module, callback_module: callback_module,
callback_module_state: Keyword.fetch!(opts, :state), callback_module_state: Keyword.fetch!(opts, :state),
task_supervisor: Keyword.fetch!(opts, :task_supervisor), task_supervisor: Keyword.fetch!(opts, :task_supervisor),
@ -322,17 +319,25 @@ defmodule Indexer.BufferedTask do
end end
defp do_initial_stream( defp do_initial_stream(
%BufferedTask{callback_module_state: callback_module_state, init_chunk_size: init_chunk_size} = state %BufferedTask{
callback_module: callback_module,
callback_module_state: callback_module_state,
init_chunk_size: init_chunk_size,
max_batch_size: max_batch_size,
task_supervisor: task_supervisor
} = state
) do ) do
parent = self()
task = task =
Task.Supervisor.async(state.task_supervisor, fn -> Task.Supervisor.async(task_supervisor, fn ->
{0, []} {0, []}
|> state.callback_module.init( |> callback_module.init(
fn fn
entry, {len, acc} when len + 1 >= init_chunk_size -> entry, {len, acc} when len + 1 >= init_chunk_size ->
[entry | acc] [entry | acc]
|> chunk_into_queue(state) |> chunk_into_queue(max_batch_size)
|> async_perform(state.pid) |> async_perform(parent)
{0, []} {0, []}
@ -341,26 +346,27 @@ defmodule Indexer.BufferedTask do
end, end,
callback_module_state callback_module_state
) )
|> catchup_remaining(state) |> catchup_remaining(max_batch_size, parent)
end) end)
schedule_next_buffer_flush(%BufferedTask{state | init_task: task.ref}) schedule_next_buffer_flush(%BufferedTask{state | init_task: task.ref})
end end
defp catchup_remaining({0, []}, _state), do: :ok defp catchup_remaining({0, []}, _max_batch_size, _pid), do: :ok
defp catchup_remaining({len, batch}, state) when is_integer(len) and is_list(batch) do defp catchup_remaining({len, batch}, max_batch_size, pid)
when is_integer(len) and is_list(batch) and is_integer(max_batch_size) and is_pid(pid) do
batch batch
|> chunk_into_queue(state) |> chunk_into_queue(max_batch_size)
|> async_perform(state.pid) |> async_perform(pid)
:ok :ok
end end
defp chunk_into_queue(entries, state) do defp chunk_into_queue(entries, max_batch_size) when is_list(entries) and is_integer(max_batch_size) do
entries entries
|> Enum.reverse() |> Enum.reverse()
|> Enum.chunk_every(state.max_batch_size) |> Enum.chunk_every(max_batch_size)
|> Enum.reduce(:queue.new(), fn batch, acc -> queue_in_queue(acc, batch) end) |> Enum.reduce(:queue.new(), fn batch, acc -> queue_in_queue(acc, batch) end)
end end

Loading…
Cancel
Save