From 7abc4196173571144dd4d324ae701af8b58be13b Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 22 Oct 2018 12:43:06 -0500 Subject: [PATCH] Don't store pid in BufferedTask state --- apps/indexer/lib/indexer/buffered_task.ex | 38 +++++++++++++---------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/apps/indexer/lib/indexer/buffered_task.ex b/apps/indexer/lib/indexer/buffered_task.ex index 8972e73628..58f754abff 100644 --- a/apps/indexer/lib/indexer/buffered_task.ex +++ b/apps/indexer/lib/indexer/buffered_task.ex @@ -53,7 +53,6 @@ defmodule Indexer.BufferedTask do alias Indexer.BufferedTask @enforce_keys [ - :pid, :callback_module, :callback_module_state, :task_supervisor, @@ -61,8 +60,7 @@ defmodule Indexer.BufferedTask do :max_batch_size, :init_chunk_size ] - defstruct pid: nil, - init_task: nil, + defstruct init_task: nil, flush_timer: nil, callback_module: nil, callback_module_state: nil, @@ -222,7 +220,6 @@ defmodule Indexer.BufferedTask do send(self(), :initial_stream) state = %BufferedTask{ - pid: self(), callback_module: callback_module, callback_module_state: Keyword.fetch!(opts, :state), task_supervisor: Keyword.fetch!(opts, :task_supervisor), @@ -322,17 +319,25 @@ defmodule Indexer.BufferedTask do end defp do_initial_stream( - %BufferedTask{callback_module_state: callback_module_state, init_chunk_size: init_chunk_size} = state + %BufferedTask{ + callback_module: callback_module, + callback_module_state: callback_module_state, + init_chunk_size: init_chunk_size, + max_batch_size: max_batch_size, + task_supervisor: task_supervisor + } = state ) do + parent = self() + task = - Task.Supervisor.async(state.task_supervisor, fn -> + Task.Supervisor.async(task_supervisor, fn -> {0, []} - |> state.callback_module.init( + |> callback_module.init( fn entry, {len, acc} when len + 1 >= init_chunk_size -> [entry | acc] - |> chunk_into_queue(state) - |> async_perform(state.pid) + |> chunk_into_queue(max_batch_size) + |> async_perform(parent) {0, []} @@ -341,26 +346,27 @@ defmodule Indexer.BufferedTask do end, callback_module_state ) - |> catchup_remaining(state) + |> catchup_remaining(max_batch_size, parent) end) schedule_next_buffer_flush(%BufferedTask{state | init_task: task.ref}) end - defp catchup_remaining({0, []}, _state), do: :ok + defp catchup_remaining({0, []}, _max_batch_size, _pid), do: :ok - defp catchup_remaining({len, batch}, state) when is_integer(len) and is_list(batch) do + defp catchup_remaining({len, batch}, max_batch_size, pid) + when is_integer(len) and is_list(batch) and is_integer(max_batch_size) and is_pid(pid) do batch - |> chunk_into_queue(state) - |> async_perform(state.pid) + |> chunk_into_queue(max_batch_size) + |> async_perform(pid) :ok end - defp chunk_into_queue(entries, state) do + defp chunk_into_queue(entries, max_batch_size) when is_list(entries) and is_integer(max_batch_size) do entries |> Enum.reverse() - |> Enum.chunk_every(state.max_batch_size) + |> Enum.chunk_every(max_batch_size) |> Enum.reduce(:queue.new(), fn batch, acc -> queue_in_queue(acc, batch) end) end