Indexer.Memory.Monitor

`Indexer.Memory.Monitor` checks if the BEAM memory usage exceeds a set
limit (defaults to 1 GiB) and if it does, it asks the process with the
most memory that is registered as shrinkable to shrink.
pull/971/head
Luke Imhoff 6 years ago
parent 879a8382f2
commit cae25195e5
  1. 12
      README.md
  2. 12
      apps/indexer/README.md
  3. 4
      apps/indexer/config/config.exs
  4. 38
      apps/indexer/lib/indexer/application.ex
  5. 148
      apps/indexer/lib/indexer/memory/monitor.ex
  6. 15
      apps/indexer/lib/indexer/memory/shrinkable.ex
  7. 70
      apps/indexer/lib/indexer/shrinkable/supervisor.ex

@ -261,6 +261,18 @@ BlockScout is setup to export [Prometheus](https://prometheus.io/) metrics at `/
3. Click "Load" 3. Click "Load"
6. View the dashboards. (You will need to click-around and use BlockScout for the web-related metrics to show up.) 6. View the dashboards. (You will need to click-around and use BlockScout for the web-related metrics to show up.)
## Memory Usage
The work queues for building the index of all blocks, balances (coin and token), and internal transactions can grow quite large. By default, the soft-limit is 1 GiB, which can be changed in `apps/indexer/config/config.exs`:
```
config :indexer, memory_limit: 1 <<< 30
```
Memory usage is checked once per minute. If the soft-limit is reached, the shrinkable work queues will shed half their load. The shed load will be restored from the database, the same as when a restart of the server occurs, so rebuilding the work queue will be slower, but use less memory.
If all queues are at their minimum size, then no more memory can be reclaimed and an error will be logged.
## Acknowledgements ## Acknowledgements
We would like to thank the [EthPrize foundation](http://ethprize.io/) for their funding support. We would like to thank the [EthPrize foundation](http://ethprize.io/) for their funding support.

@ -19,6 +19,18 @@ Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_do
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at [https://hexdocs.pm/indexer](https://hexdocs.pm/indexer). be found at [https://hexdocs.pm/indexer](https://hexdocs.pm/indexer).
## Memory Usage
The work queues for building the index of all blocks, balances (coin and token), and internal transactions can grow quite large. By default, the soft-limit is 1 GiB, which can be changed in `config/config.exs`:
```
config :indexer, memory_limit: 1 <<< 30
```
Memory usage is checked once per minute. If the soft-limit is reached, the shrinkable work queues will shed half their load. The shed load will be restored from the database, the same as when a restart of the server occurs, so rebuilding the work queue will be slower, but use less memory.
If all queues are at their minimum size, then no more memory can be reclaimed and an error will be logged.
## Testing ## Testing
### Parity ### Parity

@ -2,7 +2,11 @@
# and its dependencies with the aid of the Mix.Config module. # and its dependencies with the aid of the Mix.Config module.
use Mix.Config use Mix.Config
import Bitwise
config :indexer, config :indexer,
# bytes
memory_limit: 1 <<< 30,
ecto_repos: [Explorer.Repo] ecto_repos: [Explorer.Repo]
config :logger, :indexer, config :logger, :indexer,

@ -6,42 +6,22 @@ defmodule Indexer.Application do
use Application use Application
alias Indexer.{ alias Indexer.{
Block, Memory,
CoinBalance, Shrinkable
InternalTransaction,
PendingTransaction,
Token,
TokenBalance,
TokenTransfer
} }
@impl Application @impl Application
def start(_type, _args) do def start(_type, _args) do
json_rpc_named_arguments = Application.fetch_env!(:indexer, :json_rpc_named_arguments)
block_fetcher_supervisor_named_arguments =
:indexer
|> Application.get_all_env()
|> Keyword.take(
~w(blocks_batch_size blocks_concurrency block_interval json_rpc_named_arguments receipts_batch_size
receipts_concurrency subscribe_named_arguments)a
)
|> Enum.into(%{})
children = [ children = [
{CoinBalance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: CoinBalance.Supervisor]]}, Memory.Monitor,
{PendingTransaction.Supervisor, Shrinkable.Supervisor
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: PendingTransactionFetcher]]},
{InternalTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: InternalTransaction.Supervisor]]},
{Token.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: Token.Supervisor]]},
{TokenBalance.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: TokenBalance.Supervisor]]},
{Block.Supervisor, [block_fetcher_supervisor_named_arguments, [name: Block.Supervisor]]},
{TokenTransfer.Uncataloged.Supervisor, [[], [name: TokenTransfer.Uncataloged.Supervisor]]}
] ]
opts = [strategy: :one_for_one, name: Indexer.Supervisor] opts = [
# If the `Memory.Monitor` dies, it needs all the `Shrinkable`s to re-register, so restart them.
strategy: :rest_for_one,
name: Indexer.Supervisor
]
Supervisor.start_link(children, opts) Supervisor.start_link(children, opts)
end end

@ -0,0 +1,148 @@
defmodule Indexer.Memory.Monitor do
@moduledoc """
Monitors memory usage of Erlang VM.
If memory usage (as reported by `:erlang.memory(:total)` exceeds the configured limit, then the `Process` with the
worst memory usage (as reported by `Process.info(pid, :memory)`) in `shrinkable_set` is asked to
`c:Indexer.Memory.Shrinkable.shrink/0`.
"""
require Bitwise
require Logger
import Bitwise
alias Indexer.Memory.Shrinkable
defstruct limit: 1 <<< 30,
timer_interval: :timer.minutes(1),
timer_reference: nil,
shrinkable_set: MapSet.new()
use GenServer
@doc """
Registers caller as `Indexer.Memory.Shrinkable`.
"""
def shrinkable(server \\ __MODULE__) do
GenServer.call(server, :shrinkable)
end
def child_spec([]) do
child_spec([%{}, []])
end
def child_spec([init_options, gen_server_options] = start_link_arguments)
when is_map(init_options) and is_list(gen_server_options) do
Supervisor.child_spec(%{id: __MODULE__, start: {__MODULE__, :start_link, start_link_arguments}}, [])
end
def start_link(init_options, gen_server_options \\ []) when is_map(init_options) and is_list(gen_server_options) do
GenServer.start_link(__MODULE__, init_options, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl GenServer
def init(options) when is_map(options) do
state = struct!(__MODULE__, options)
{:ok, timer_reference} = :timer.send_interval(state.timer_interval, :check)
{:ok, %__MODULE__{state | timer_reference: timer_reference}}
end
@impl GenServer
def handle_call(:shinkable, {pid, _}, %__MODULE__{shrinkable_set: shrinkable_set} = state) do
Process.monitor(pid)
{:reply, :ok, %__MODULE__{state | shrinkable_set: MapSet.put(shrinkable_set, pid)}}
end
@impl GenServer
def handle_info({:DOWN, _, :process, pid, _}, %__MODULE__{shrinkable_set: shrinkable_set}) do
{:noreply, %__MODULE__{shrinkable_set: MapSet.delete(shrinkable_set, pid)}}
end
@impl GenServer
def handle_info(:check, %__MODULE__{limit: limit} = state) do
total = :erlang.memory(:total)
if limit < total do
case shrinkable_with_most_memory(state) do
{:error, :not_found} ->
Logger.error(fn ->
[
prefix(%{total: total, limit: limit}),
" No processes are registered as shrinkable. Limit will remain surpassed."
]
end)
{:ok, {pid, memory}} ->
Logger.warn(fn ->
prefix = [
prefix(%{total: total, limit: limit}),
" Worst memory usage (",
to_string(memory),
" bytes) among shrinkable processes is ",
inspect(pid)
]
{:registered_name, registered_name} = Process.info(pid, :registered_name)
prefix =
case registered_name do
[] -> [prefix, "."]
_ -> [prefix, " (", inspect(registered_name), ")."]
end
[prefix, " Asking ", inspect(pid), " to shrinkable to drop below limit."]
end)
:ok = Shrinkable.shrink(pid)
end
end
flush(:check)
{:noreply, state}
end
defp flush(message) do
receive do
^message -> flush(message)
after
0 ->
:ok
end
end
defp memory(pid) when is_pid(pid) do
case Process.info(pid, :memory) do
{:memory, memory} -> memory
# process died
nil -> 0
end
end
defp prefix(%{total: total, limit: limit}) do
[
to_string(total),
" / ",
to_string(limit),
" bytes (",
to_string(div(100 * total, limit)),
"%) of memory limit used."
]
end
defp shrinkable_with_most_memory(%__MODULE__{shrinkable_set: shrinkable_set}) do
if Enum.empty?(shrinkable_set) do
{:error, :not_found}
else
pid_memory =
shrinkable_set
|> Enum.map(fn pid -> {pid, memory(pid)} end)
|> Enum.max_by(&elem(&1, 1))
{:ok, pid_memory}
end
end
end

@ -0,0 +1,15 @@
defmodule Indexer.Memory.Shrinkable do
@moduledoc """
A process that can shrink its memory usage when asked by `Indexer.Memory.Monitor`.
Processes need to `handle_call(:shrink, from, state)`.
"""
@doc """
Asks `pid` to shrink its memory usage.
"""
@spec shrink(pid()) :: :ok
def shrink(pid) when is_pid(pid) do
GenServer.call(pid, :shrink)
end
end

@ -0,0 +1,70 @@
defmodule Indexer.Shrinkable.Supervisor do
@moduledoc """
Supervisor of all supervision trees that depend on `Indexer.Alarm.Supervisor`.
"""
use Supervisor
alias Indexer.{
Block,
CoinBalance,
InternalTransaction,
PendingTransaction,
Token,
TokenBalance,
TokenTransfer
}
def child_spec([]) do
child_spec([[]])
end
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
type: :supervisor
}
Supervisor.child_spec(default, [])
end
def start_link(arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, arguments, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl Supervisor
def init([]) do
json_rpc_named_arguments = Application.fetch_env!(:indexer, :json_rpc_named_arguments)
block_fetcher_supervisor_named_arguments =
:indexer
|> Application.get_all_env()
|> Keyword.take(
~w(blocks_batch_size blocks_concurrency block_interval json_rpc_named_arguments receipts_batch_size
receipts_concurrency subscribe_named_arguments)a
)
|> Enum.into(%{})
Supervisor.init(
[
{CoinBalance.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: CoinBalance.Supervisor]]},
{PendingTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: PendingTransactionFetcher]]},
{InternalTransaction.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: InternalTransaction.Supervisor]]},
{Token.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments], [name: Token.Supervisor]]},
{TokenBalance.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments], [name: TokenBalance.Supervisor]]},
{Block.Supervisor, [block_fetcher_supervisor_named_arguments, [name: Block.Supervisor]]},
{TokenTransfer.Uncataloged.Supervisor, [[], [name: TokenTransfer.Uncataloged.Supervisor]]}
],
strategy: :one_for_one
)
end
end
Loading…
Cancel
Save