fix: Refactor catchup rudimentaries + fix graceful shutdown (#9729)

* fix: Refactor catchup rudimentaries + fix graceful shutdown

* Add INDEXER_GRACEFUL_SHUTDOWN_PERIOD env var + docs
pull/9900/head
Qwerty5Uiop 7 months ago committed by GitHub
parent 7077ec57d9
commit 09fc661185
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 21
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/ranges_helper.ex
  2. 17
      apps/explorer/lib/explorer/utility/missing_block_range.ex
  3. 8
      apps/explorer/lib/explorer/utility/missing_ranges_manipulator.ex
  4. 9
      apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex
  5. 153
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  6. 6
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  7. 9
      apps/indexer/lib/indexer/temporary/uncataloged_token_transfers.ex
  8. 2
      apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs
  9. 28
      apps/indexer/test/indexer/block/catchup/missing_ranges_collector_test.exs
  10. 3
      config/runtime.exs
  11. 1
      docker-compose/envs/common-blockscout.env

@ -107,6 +107,27 @@ defmodule EthereumJSONRPC.Utility.RangesHelper do
)
end
@doc """
Converts initial ranges to ranges with size less or equal to the given size
"""
@spec split([Range.t()], integer) :: [Range.t()]
def split(ranges, size) do
ranges
|> Enum.reduce([], fn from..to = range, acc ->
range_size = Range.size(range)
if range_size > size do
Enum.reduce(Range.new(0, range_size - 1, size), acc, fn iterator, inner_acc ->
start_from = from - iterator
[Range.new(start_from, max(start_from - size + 1, to), -1) | inner_acc]
end)
else
[range | acc]
end
end)
|> Enum.reverse()
end
defp parse_integer(string) do
case Integer.parse(string) do
{number, ""} -> number

@ -27,9 +27,22 @@ defmodule Explorer.Utility.MissingBlockRange do
size
|> get_latest_ranges_query()
|> Repo.all()
|> Enum.map(fn %{from_number: from, to_number: to} ->
%Range{first: from, last: to, step: if(from > to, do: -1, else: 1)}
|> Enum.reduce_while({size, []}, fn %{from_number: from, to_number: to}, {remaining_count, ranges} ->
range_size = from - to + 1
cond do
range_size < remaining_count ->
{:cont, {remaining_count - range_size, [Range.new(from, to, -1) | ranges]}}
range_size > remaining_count ->
{:halt, {0, [Range.new(from, from - remaining_count + 1, -1) | ranges]}}
range_size == remaining_count ->
{:halt, {0, [Range.new(from, to, -1) | ranges]}}
end
end)
|> elem(1)
|> Enum.reverse()
end
def add_ranges_by_block_numbers(numbers) do

@ -12,8 +12,8 @@ defmodule Explorer.Utility.MissingRangesManipulator do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def get_latest_batch do
GenServer.call(__MODULE__, :get_latest_batch)
def get_latest_batch(size) do
GenServer.call(__MODULE__, {:get_latest_batch, size})
end
def clear_batch(batch) do
@ -34,8 +34,8 @@ defmodule Explorer.Utility.MissingRangesManipulator do
end
@impl true
def handle_call(:get_latest_batch, _from, state) do
{:reply, MissingBlockRange.get_latest_batch(), state}
def handle_call({:get_latest_batch, size}, _from, state) do
{:reply, MissingBlockRange.get_latest_batch(size), state}
end
def handle_call({:clear_batch, batch}, _from, state) do

@ -54,6 +54,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do
@impl GenServer
def init(named_arguments) do
Logger.metadata(fetcher: :block_catchup)
Process.flag(:trap_exit, true)
state = new(named_arguments)
@ -180,7 +181,13 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do
@impl GenServer
def handle_info(:catchup_index, %__MODULE__{fetcher: %Catchup.Fetcher{} = catchup} = state) do
{:noreply,
%__MODULE__{state | task: Task.Supervisor.async_nolink(Catchup.TaskSupervisor, Catchup.Fetcher, :task, [catchup])}}
%__MODULE__{
state
| task:
Task.Supervisor.async_nolink(Catchup.TaskSupervisor, Catchup.Fetcher, :task, [catchup],
shutdown: Application.get_env(:indexer, :graceful_shutdown_period)
)
}}
end
def handle_info(

@ -23,19 +23,16 @@ defmodule Indexer.Block.Catchup.Fetcher do
]
alias Ecto.Changeset
alias EthereumJSONRPC.Utility.RangesHelper
alias Explorer.Chain
alias Explorer.Chain.NullRoundHeight
alias Explorer.Utility.{MassiveBlock, MissingRangesManipulator}
alias Indexer.{Block, Tracer}
alias Indexer.Block.Catchup.{Sequence, TaskSupervisor}
alias Indexer.Memory.Shrinkable
alias Indexer.Block.Catchup.TaskSupervisor
alias Indexer.Prometheus
@behaviour Block.Fetcher
@shutdown_after :timer.minutes(5)
@sequence_name :block_catchup_sequencer
defstruct block_fetcher: nil,
memory_monitor: nil
@ -47,8 +44,9 @@ defmodule Indexer.Block.Catchup.Fetcher do
"""
def task(state) do
Logger.metadata(fetcher: :block_catchup)
Process.flag(:trap_exit, true)
case MissingRangesManipulator.get_latest_batch() do
case MissingRangesManipulator.get_latest_batch(blocks_batch_size() * blocks_concurrency()) do
[] ->
%{
first_block_number: nil,
@ -57,9 +55,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
shrunk: false
}
latest_missing_ranges ->
missing_ranges = filter_consensus_blocks(latest_missing_ranges)
missing_ranges ->
first.._ = List.first(missing_ranges)
_..last = List.last(missing_ranges)
@ -70,40 +66,17 @@ defmodule Indexer.Block.Catchup.Fetcher do
|> Stream.map(&Enum.count/1)
|> Enum.sum()
step = step(first, last, blocks_batch_size())
sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state)
gen_server_opts = [name: @sequence_name]
{:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts)
Sequence.cap(sequence)
stream_fetch_and_import(state, sequence)
shrunk = Shrinkable.shrunk?(sequence)
stream_fetch_and_import(state, missing_ranges)
%{
first_block_number: first,
last_block_number: last,
missing_block_count: missing_block_count,
shrunk: shrunk
shrunk: false
}
end
end
defp filter_consensus_blocks(ranges) do
filtered_ranges =
ranges
|> Enum.map(&Chain.missing_block_number_ranges(&1))
|> List.flatten()
consensus_blocks = ranges_to_numbers(ranges) -- ranges_to_numbers(filtered_ranges)
consensus_blocks
|> numbers_to_ranges()
|> MissingRangesManipulator.clear_batch()
filtered_ranges
end
@doc """
The number of blocks to request in one call to the JSONRPC. Defaults to
10. Block requests also include the transactions for those blocks. *These transactions
@ -125,10 +98,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
Application.get_env(:indexer, __MODULE__)[:concurrency]
end
defp step(first, last, blocks_batch_size) do
if first < last, do: blocks_batch_size, else: -1 * blocks_batch_size
end
@async_import_remaining_block_data_options ~w(address_hash_to_fetched_balance_block_number)a
@impl Block.Fetcher
@ -168,15 +137,14 @@ defmodule Indexer.Block.Catchup.Fetcher do
async_import_blobs(imported)
end
defp stream_fetch_and_import(state, sequence)
when is_pid(sequence) do
ranges = Sequence.build_stream(sequence)
defp stream_fetch_and_import(state, ranges) do
TaskSupervisor
|> Task.Supervisor.async_stream(ranges, &fetch_and_import_range_from_sequence(state, &1, sequence),
|> Task.Supervisor.async_stream(
RangesHelper.split(ranges, blocks_batch_size()),
&fetch_and_import_missing_range(state, &1),
max_concurrency: blocks_concurrency(),
timeout: :infinity,
shutdown: @shutdown_after
shutdown: Application.get_env(:indexer, :graceful_shutdown_period)
)
|> Stream.run()
end
@ -184,13 +152,12 @@ defmodule Indexer.Block.Catchup.Fetcher do
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/1`
@decorate trace(
name: "fetch",
resource: "Indexer.Block.Catchup.Fetcher.fetch_and_import_range_from_sequence/3",
resource: "Indexer.Block.Catchup.Fetcher.fetch_and_import_missing_range/3",
tracer: Tracer
)
defp fetch_and_import_range_from_sequence(
defp fetch_and_import_missing_range(
%__MODULE__{block_fetcher: %Block.Fetcher{} = block_fetcher},
first..last = range,
sequence
first..last = range
) do
Logger.metadata(fetcher: :block_catchup, first_block_number: first, last_block_number: last)
Process.flag(:trap_exit, true)
@ -201,9 +168,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
case result do
{:ok, %{inserted: inserted, errors: errors}} ->
valid_errors = handle_null_rounds(errors)
errors = cap_seq(sequence, valid_errors)
retry(sequence, errors)
handle_null_rounds(errors)
clear_missing_ranges(range, errors)
{:ok, inserted: inserted}
@ -212,8 +177,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
Prometheus.Instrumenter.import_errors()
Logger.error(fn -> ["failed to validate: ", inspect(changesets), ". Retrying."] end, step: step)
push_back(sequence, range)
error
{:error, {:import = step, reason}} = error ->
@ -221,8 +184,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
Logger.error(fn -> [inspect(reason), ". Retrying."] end, step: step)
if reason == :timeout, do: add_range_to_massive_blocks(range)
push_back(sequence, range)
error
{:error, {step, reason}} = error ->
@ -233,8 +194,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
step: step
)
push_back(sequence, range)
error
{:error, {step, failed_value, _changes_so_far}} = error ->
@ -245,8 +204,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
step: step
)
push_back(sequence, range)
error
end
rescue
@ -284,39 +241,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
|> MassiveBlock.insert_block_numbers()
end
defp cap_seq(seq, errors) do
{not_founds, other_errors} =
Enum.split_with(errors, fn
%{code: 404, data: %{number: _}} -> true
_ -> false
end)
case not_founds do
[] ->
Logger.debug("got blocks")
other_errors
_ ->
Sequence.cap(seq)
end
other_errors
end
defp push_back(sequence, range) do
case Sequence.push_back(sequence, range) do
:ok -> :ok
{:error, reason} -> Logger.error(fn -> ["Could not push back to Sequence: ", inspect(reason)] end)
end
end
defp retry(sequence, block_errors) when is_list(block_errors) do
block_errors
|> block_errors_to_block_number_ranges()
|> Enum.map(&push_back(sequence, &1))
end
defp clear_missing_ranges(initial_range, errors \\ []) do
success_numbers = Enum.to_list(initial_range) -- Enum.map(errors, &block_error_to_number/1)
@ -325,12 +249,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
|> MissingRangesManipulator.clear_batch()
end
defp block_errors_to_block_number_ranges(block_errors) when is_list(block_errors) do
block_errors
|> Enum.map(&block_error_to_number/1)
|> numbers_to_ranges()
end
defp block_error_to_number(%{data: %{number: number}}) when is_integer(number), do: number
defp numbers_to_ranges([]), do: []
@ -353,43 +271,4 @@ defmodule Indexer.Block.Catchup.Fetcher do
fn range -> {:cont, range, nil} end
)
end
defp ranges_to_numbers(ranges) do
ranges
|> Enum.map(&Enum.to_list/1)
|> List.flatten()
end
defp put_memory_monitor(sequence_options, %__MODULE__{memory_monitor: nil}) when is_list(sequence_options),
do: sequence_options
defp put_memory_monitor(sequence_options, %__MODULE__{memory_monitor: memory_monitor})
when is_list(sequence_options) do
Keyword.put(sequence_options, :memory_monitor, memory_monitor)
end
@doc """
Puts a list of block numbers to the front of the sequencing queue.
"""
@spec push_front([non_neg_integer()]) :: :ok | {:error, :queue_unavailable | :maximum_size | String.t()}
def push_front(block_numbers) do
if Process.whereis(@sequence_name) do
Enum.reduce_while(block_numbers, :ok, fn block_number, :ok ->
sequence_push_front(block_number)
end)
else
{:error, :queue_unavailable}
end
end
defp sequence_push_front(block_number) do
if is_integer(block_number) do
case Sequence.push_front(@sequence_name, block_number..block_number) do
:ok -> {:cont, :ok}
{:error, _} = error -> {:halt, error}
end
else
Logger.warn(fn -> ["Received a non-integer block number: ", inspect(block_number)] end)
end
end
end

@ -76,6 +76,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
@impl GenServer
def init(%{block_fetcher: %Block.Fetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments}) do
Logger.metadata(fetcher: :block_realtime)
Process.flag(:trap_exit, true)
{:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: :realtime, callback_module: __MODULE__}},
{:continue, {:init, subscribe_named_arguments}}}
@ -162,6 +163,11 @@ defmodule Indexer.Block.Realtime.Fetcher do
{:noreply, state}
end
@impl GenServer
def terminate(_reason, %__MODULE__{timer: timer}) do
Process.cancel_timer(timer)
end
if Application.compile_env(:explorer, :chain_type) == "stability" do
defp fetch_validators_async do
GenServer.cast(Indexer.Fetcher.Stability.Validator, :update_validators_list)

@ -13,7 +13,7 @@ defmodule Indexer.Temporary.UncatalogedTokenTransfers do
require Logger
alias Explorer.Chain.TokenTransfer
alias Indexer.Block.Catchup.Fetcher
alias Explorer.Utility.MissingRangesManipulator
alias Indexer.Temporary.UncatalogedTokenTransfers
def child_spec([init_arguments]) do
@ -94,6 +94,11 @@ defmodule Indexer.Temporary.UncatalogedTokenTransfers do
end
defp async_push_front(block_numbers) do
Task.Supervisor.async_nolink(UncatalogedTokenTransfers.TaskSupervisor, Fetcher, :push_front, [block_numbers])
Task.Supervisor.async_nolink(
UncatalogedTokenTransfers.TaskSupervisor,
MissingRangesManipulator,
:add_ranges_by_block_numbers,
[block_numbers]
)
end
end

@ -388,7 +388,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
assert :ok = Supervisor.terminate_child(pid, :task)
assert_receive {:DOWN, ^reference, :process, ^child_pid, :shutdown}
assert_receive {:DOWN, ^reference, :process, ^child_pid, :normal}
end
test "with other child_id returns {:error, :not_found}", %{pid: pid} do

@ -16,23 +16,21 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do
MissingRangesCollector.start_link([])
Process.sleep(1000)
assert [999_999..900_000//-1] = batch = MissingBlockRange.get_latest_batch(1)
assert [999_999..999_900//-1] = batch = MissingBlockRange.get_latest_batch(100)
MissingBlockRange.clear_batch(batch)
assert [899_999..800_000//-1] = batch = MissingBlockRange.get_latest_batch(1)
assert [999_899..999_800//-1] = batch = MissingBlockRange.get_latest_batch(100)
MissingBlockRange.clear_batch(batch)
assert [799_999..700_000//-1] = batch = MissingBlockRange.get_latest_batch(1)
assert [999_799..999_700//-1] = batch = MissingBlockRange.get_latest_batch(100)
MissingBlockRange.clear_batch(batch)
insert(:block, number: 1_200_000)
insert(:block, number: 1_000_200)
Process.sleep(1000)
assert [1_199_999..1_100_001//-1] = batch = MissingBlockRange.get_latest_batch(1)
assert [1_000_199..1_000_100//-1] = batch = MissingBlockRange.get_latest_batch(100)
MissingBlockRange.clear_batch(batch)
assert [1_100_000..1_000_001//-1] = batch = MissingBlockRange.get_latest_batch(1)
assert [1_000_099..1_000_001//-1, 999_699..999_699//-1] = batch = MissingBlockRange.get_latest_batch(100)
MissingBlockRange.clear_batch(batch)
assert [699_999..600_000//-1] = batch = MissingBlockRange.get_latest_batch(1)
MissingBlockRange.clear_batch(batch)
assert [599_999..500_124//-1, 500_122..500_000//-1] = MissingBlockRange.get_latest_batch(2)
assert [999_698..999_599//-1] = MissingBlockRange.get_latest_batch(100)
end
test "FIRST_BLOCK and LAST_BLOCK envs" do
@ -60,16 +58,14 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do
test "infinite range" do
Application.put_env(:indexer, :block_ranges, "1..5,3..5,2qw1..12,10..11a,,asd..qwe,10..latest")
insert(:block, number: 200_000)
insert(:block, number: 200)
MissingRangesCollector.start_link([])
Process.sleep(500)
assert [199_999..100_010//-1] = batch = MissingBlockRange.get_latest_batch(1)
MissingBlockRange.clear_batch(batch)
assert [100_009..10//-1] = batch = MissingBlockRange.get_latest_batch(1)
assert [199..100//-1] = batch = MissingBlockRange.get_latest_batch(100)
MissingBlockRange.clear_batch(batch)
assert [5..1//-1] = MissingBlockRange.get_latest_batch(1)
assert [99..10//-1, 5..1//-1] = MissingBlockRange.get_latest_batch(100)
end
test "finite range" do
@ -80,7 +76,7 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do
MissingRangesCollector.start_link([])
Process.sleep(500)
assert [200..150//-1, 50..30//-1, 25..5//-1] = batch = MissingBlockRange.get_latest_batch(3)
assert [200..150//-1, 50..30//-1, 25..5//-1] = batch = MissingBlockRange.get_latest_batch(100)
MissingBlockRange.clear_batch(batch)
assert [] = MissingBlockRange.get_latest_batch()
end
@ -96,7 +92,7 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do
Process.sleep(500)
assert [200..176//-1, 174..150//-1, 50..34//-1, 32..30//-1, 25..5//-1] =
batch = MissingBlockRange.get_latest_batch(5)
batch = MissingBlockRange.get_latest_batch(91)
MissingBlockRange.clear_batch(batch)
assert [] = MissingBlockRange.get_latest_batch()

@ -594,7 +594,8 @@ config :indexer,
ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_FETCHER_INIT_QUERY_LIMIT", 100_000),
coin_balances_fetcher_init_limit:
ConfigHelper.parse_integer_env_var("INDEXER_COIN_BALANCES_FETCHER_INIT_QUERY_LIMIT", 2000),
ipfs_gateway_url: System.get_env("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs")
ipfs_gateway_url: System.get_env("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs"),
graceful_shutdown_period: ConfigHelper.parse_time_env_var("INDEXER_GRACEFUL_SHUTDOWN_PERIOD", "5m")
config :indexer, Indexer.Supervisor, enabled: !ConfigHelper.parse_bool_env_var("DISABLE_INDEXER")

@ -225,6 +225,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_FETCHER_INIT_QUERY_LIMIT=
# INDEXER_TOKEN_BALANCES_FETCHER_INIT_QUERY_LIMIT=
# INDEXER_COIN_BALANCES_FETCHER_INIT_QUERY_LIMIT=
# INDEXER_GRACEFUL_SHUTDOWN_PERIOD=
# WITHDRAWALS_FIRST_BLOCK=
# INDEXER_OPTIMISM_L1_RPC=
# INDEXER_OPTIMISM_L1_BATCH_START_BLOCK=

Loading…
Cancel
Save