chore: Add queue expanding logic to memory monitor (#9870)

pull/9900/head
Qwerty5Uiop 7 months ago committed by GitHub
parent 8d2a0f0b81
commit a905030454
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      apps/indexer/lib/indexer/block/catchup/sequence.ex
  2. 7
      apps/indexer/lib/indexer/bound_queue.ex
  3. 4
      apps/indexer/lib/indexer/buffered_task.ex
  4. 31
      apps/indexer/lib/indexer/memory/monitor.ex
  5. 8
      apps/indexer/lib/indexer/memory/shrinkable.ex

@ -229,6 +229,10 @@ defmodule Indexer.Block.Catchup.Sequence do
{:reply, BoundQueue.shrunk?(bound_queue), state}
end
def handle_call(:expand, _from, %__MODULE__{bound_queue: bound_queue} = state) do
{:reply, :ok, %{state | bound_queue: BoundQueue.expand(bound_queue)}}
end
@spec push_chunked_range(BoundQueue.t(Range.t()), step, Range.t(), edge()) ::
{:ok, BoundQueue.t(Range.t())} | {:error, reason :: String.t()}
defp push_chunked_range(bound_queue, step, _.._ = range, edge \\ :back)

@ -112,6 +112,13 @@ defmodule Indexer.BoundQueue do
shrink(bound_queue, div(size, 2))
end
@doc """
Set queue maximum_size to nil
"""
def expand(bound_queue) do
%{bound_queue | maximum_size: nil}
end
@doc """
Whether the queue was shrunk.
"""

@ -323,6 +323,10 @@ defmodule Indexer.BufferedTask do
{:reply, BoundQueue.shrunk?(bound_queue), state}
end
def handle_call(:expand, _from, %__MODULE__{bound_queue: bound_queue} = state) do
{:reply, :ok, %{state | bound_queue: BoundQueue.expand(bound_queue)}}
end
defp drop_task(state, ref) do
spawn_next_batch(%BufferedTask{state | task_ref_to_batch: Map.delete(state.task_ref_to_batch, ref)})
end

@ -21,6 +21,8 @@ defmodule Indexer.Memory.Monitor do
use GenServer
@expandable_memory_coefficient 0.4
@doc """
Registers caller as `Indexer.Memory.Shrinkable`.
"""
@ -70,6 +72,11 @@ defmodule Indexer.Memory.Monitor do
shrink_or_log(state)
end
if total <= memory_limit() * @expandable_memory_coefficient do
log_expandable_memory(%{limit: memory_limit(), total: total})
expand(state)
end
flush(:check)
{:noreply, state}
@ -100,7 +107,20 @@ defmodule Indexer.Memory.Monitor do
to_string(limit),
" bytes (",
to_string(div(100 * total, limit)),
"%) of memory limit used."
"%) of memory limit used, shrinking queues"
]
end)
end
defp log_expandable_memory(%{total: total, limit: limit}) do
Logger.info(fn ->
[
to_string(total),
" / ",
to_string(limit),
" bytes (",
to_string(div(100 * total, limit)),
"%) of memory limit used, expanding queues"
]
end)
end
@ -164,6 +184,15 @@ defmodule Indexer.Memory.Monitor do
end
end
defp expand(%__MODULE__{} = state) do
state
|> shrinkable_memory_pairs()
|> Enum.each(fn {pid, _memory} ->
Logger.info(fn -> ["Expanding queue ", process(pid)] end)
Shrinkable.expand(pid)
end)
end
defp shrinkable_memory_pairs(%__MODULE__{shrinkable_set: shrinkable_set}) do
shrinkable_set
|> Enum.map(fn pid -> {pid, memory(pid)} end)

@ -22,4 +22,12 @@ defmodule Indexer.Memory.Shrinkable do
def shrunk?(pid) when is_pid(pid) do
GenServer.call(pid, :shrunk?)
end
@doc """
Asks `pid` to expand its size
"""
@spec expand(pid()) :: :ok
def expand(pid) when is_pid(pid) do
GenServer.call(pid, :expand)
end
end

Loading…
Cancel
Save