diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/ranges_helper.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/ranges_helper.ex index f7220b044d..f53a8b14f1 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/ranges_helper.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/ranges_helper.ex @@ -107,6 +107,27 @@ defmodule EthereumJSONRPC.Utility.RangesHelper do ) end + @doc """ + Converts initial ranges to ranges with size less or equal to the given size + """ + @spec split([Range.t()], integer) :: [Range.t()] + def split(ranges, size) do + ranges + |> Enum.reduce([], fn from..to = range, acc -> + range_size = Range.size(range) + + if range_size > size do + Enum.reduce(Range.new(0, range_size - 1, size), acc, fn iterator, inner_acc -> + start_from = from - iterator + [Range.new(start_from, max(start_from - size + 1, to), -1) | inner_acc] + end) + else + [range | acc] + end + end) + |> Enum.reverse() + end + defp parse_integer(string) do case Integer.parse(string) do {number, ""} -> number diff --git a/apps/explorer/lib/explorer/utility/missing_block_range.ex b/apps/explorer/lib/explorer/utility/missing_block_range.ex index 1bfd30b340..54fce37626 100644 --- a/apps/explorer/lib/explorer/utility/missing_block_range.ex +++ b/apps/explorer/lib/explorer/utility/missing_block_range.ex @@ -27,9 +27,22 @@ defmodule Explorer.Utility.MissingBlockRange do size |> get_latest_ranges_query() |> Repo.all() - |> Enum.map(fn %{from_number: from, to_number: to} -> - %Range{first: from, last: to, step: if(from > to, do: -1, else: 1)} + |> Enum.reduce_while({size, []}, fn %{from_number: from, to_number: to}, {remaining_count, ranges} -> + range_size = from - to + 1 + + cond do + range_size < remaining_count -> + {:cont, {remaining_count - range_size, [Range.new(from, to, -1) | ranges]}} + + range_size > remaining_count -> + {:halt, {0, [Range.new(from, from - remaining_count + 1, -1) | ranges]}} + + range_size == remaining_count -> + {:halt, {0, [Range.new(from, to, -1) | ranges]}} + end end) + |> elem(1) + |> Enum.reverse() end def add_ranges_by_block_numbers(numbers) do diff --git a/apps/explorer/lib/explorer/utility/missing_ranges_manipulator.ex b/apps/explorer/lib/explorer/utility/missing_ranges_manipulator.ex index f62eaef441..8d54b835e3 100644 --- a/apps/explorer/lib/explorer/utility/missing_ranges_manipulator.ex +++ b/apps/explorer/lib/explorer/utility/missing_ranges_manipulator.ex @@ -12,8 +12,8 @@ defmodule Explorer.Utility.MissingRangesManipulator do GenServer.start_link(__MODULE__, :ok, name: __MODULE__) end - def get_latest_batch do - GenServer.call(__MODULE__, :get_latest_batch) + def get_latest_batch(size) do + GenServer.call(__MODULE__, {:get_latest_batch, size}) end def clear_batch(batch) do @@ -34,8 +34,8 @@ defmodule Explorer.Utility.MissingRangesManipulator do end @impl true - def handle_call(:get_latest_batch, _from, state) do - {:reply, MissingBlockRange.get_latest_batch(), state} + def handle_call({:get_latest_batch, size}, _from, state) do + {:reply, MissingBlockRange.get_latest_batch(size), state} end def handle_call({:clear_batch, batch}, _from, state) do diff --git a/apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex b/apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex index 181b8e49a7..24bc22412f 100644 --- a/apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex +++ b/apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex @@ -54,6 +54,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do @impl GenServer def init(named_arguments) do Logger.metadata(fetcher: :block_catchup) + Process.flag(:trap_exit, true) state = new(named_arguments) @@ -180,7 +181,13 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do @impl GenServer def handle_info(:catchup_index, %__MODULE__{fetcher: %Catchup.Fetcher{} = catchup} = state) do {:noreply, - %__MODULE__{state | task: Task.Supervisor.async_nolink(Catchup.TaskSupervisor, Catchup.Fetcher, :task, [catchup])}} + %__MODULE__{ + state + | task: + Task.Supervisor.async_nolink(Catchup.TaskSupervisor, Catchup.Fetcher, :task, [catchup], + shutdown: Application.get_env(:indexer, :graceful_shutdown_period) + ) + }} end def handle_info( diff --git a/apps/indexer/lib/indexer/block/catchup/fetcher.ex b/apps/indexer/lib/indexer/block/catchup/fetcher.ex index de26da59c5..3ac0349bf1 100644 --- a/apps/indexer/lib/indexer/block/catchup/fetcher.ex +++ b/apps/indexer/lib/indexer/block/catchup/fetcher.ex @@ -23,19 +23,16 @@ defmodule Indexer.Block.Catchup.Fetcher do ] alias Ecto.Changeset + alias EthereumJSONRPC.Utility.RangesHelper alias Explorer.Chain alias Explorer.Chain.NullRoundHeight alias Explorer.Utility.{MassiveBlock, MissingRangesManipulator} alias Indexer.{Block, Tracer} - alias Indexer.Block.Catchup.{Sequence, TaskSupervisor} - alias Indexer.Memory.Shrinkable + alias Indexer.Block.Catchup.TaskSupervisor alias Indexer.Prometheus @behaviour Block.Fetcher - @shutdown_after :timer.minutes(5) - @sequence_name :block_catchup_sequencer - defstruct block_fetcher: nil, memory_monitor: nil @@ -47,8 +44,9 @@ defmodule Indexer.Block.Catchup.Fetcher do """ def task(state) do Logger.metadata(fetcher: :block_catchup) + Process.flag(:trap_exit, true) - case MissingRangesManipulator.get_latest_batch() do + case MissingRangesManipulator.get_latest_batch(blocks_batch_size() * blocks_concurrency()) do [] -> %{ first_block_number: nil, @@ -57,9 +55,7 @@ defmodule Indexer.Block.Catchup.Fetcher do shrunk: false } - latest_missing_ranges -> - missing_ranges = filter_consensus_blocks(latest_missing_ranges) - + missing_ranges -> first.._ = List.first(missing_ranges) _..last = List.last(missing_ranges) @@ -70,40 +66,17 @@ defmodule Indexer.Block.Catchup.Fetcher do |> Stream.map(&Enum.count/1) |> Enum.sum() - step = step(first, last, blocks_batch_size()) - sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state) - gen_server_opts = [name: @sequence_name] - {:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts) - Sequence.cap(sequence) - - stream_fetch_and_import(state, sequence) - - shrunk = Shrinkable.shrunk?(sequence) + stream_fetch_and_import(state, missing_ranges) %{ first_block_number: first, last_block_number: last, missing_block_count: missing_block_count, - shrunk: shrunk + shrunk: false } end end - defp filter_consensus_blocks(ranges) do - filtered_ranges = - ranges - |> Enum.map(&Chain.missing_block_number_ranges(&1)) - |> List.flatten() - - consensus_blocks = ranges_to_numbers(ranges) -- ranges_to_numbers(filtered_ranges) - - consensus_blocks - |> numbers_to_ranges() - |> MissingRangesManipulator.clear_batch() - - filtered_ranges - end - @doc """ The number of blocks to request in one call to the JSONRPC. Defaults to 10. Block requests also include the transactions for those blocks. *These transactions @@ -125,10 +98,6 @@ defmodule Indexer.Block.Catchup.Fetcher do Application.get_env(:indexer, __MODULE__)[:concurrency] end - defp step(first, last, blocks_batch_size) do - if first < last, do: blocks_batch_size, else: -1 * blocks_batch_size - end - @async_import_remaining_block_data_options ~w(address_hash_to_fetched_balance_block_number)a @impl Block.Fetcher @@ -168,15 +137,14 @@ defmodule Indexer.Block.Catchup.Fetcher do async_import_blobs(imported) end - defp stream_fetch_and_import(state, sequence) - when is_pid(sequence) do - ranges = Sequence.build_stream(sequence) - + defp stream_fetch_and_import(state, ranges) do TaskSupervisor - |> Task.Supervisor.async_stream(ranges, &fetch_and_import_range_from_sequence(state, &1, sequence), + |> Task.Supervisor.async_stream( + RangesHelper.split(ranges, blocks_batch_size()), + &fetch_and_import_missing_range(state, &1), max_concurrency: blocks_concurrency(), timeout: :infinity, - shutdown: @shutdown_after + shutdown: Application.get_env(:indexer, :graceful_shutdown_period) ) |> Stream.run() end @@ -184,13 +152,12 @@ defmodule Indexer.Block.Catchup.Fetcher do # Run at state.blocks_concurrency max_concurrency when called by `stream_import/1` @decorate trace( name: "fetch", - resource: "Indexer.Block.Catchup.Fetcher.fetch_and_import_range_from_sequence/3", + resource: "Indexer.Block.Catchup.Fetcher.fetch_and_import_missing_range/3", tracer: Tracer ) - defp fetch_and_import_range_from_sequence( + defp fetch_and_import_missing_range( %__MODULE__{block_fetcher: %Block.Fetcher{} = block_fetcher}, - first..last = range, - sequence + first..last = range ) do Logger.metadata(fetcher: :block_catchup, first_block_number: first, last_block_number: last) Process.flag(:trap_exit, true) @@ -201,9 +168,7 @@ defmodule Indexer.Block.Catchup.Fetcher do case result do {:ok, %{inserted: inserted, errors: errors}} -> - valid_errors = handle_null_rounds(errors) - errors = cap_seq(sequence, valid_errors) - retry(sequence, errors) + handle_null_rounds(errors) clear_missing_ranges(range, errors) {:ok, inserted: inserted} @@ -212,8 +177,6 @@ defmodule Indexer.Block.Catchup.Fetcher do Prometheus.Instrumenter.import_errors() Logger.error(fn -> ["failed to validate: ", inspect(changesets), ". Retrying."] end, step: step) - push_back(sequence, range) - error {:error, {:import = step, reason}} = error -> @@ -221,8 +184,6 @@ defmodule Indexer.Block.Catchup.Fetcher do Logger.error(fn -> [inspect(reason), ". Retrying."] end, step: step) if reason == :timeout, do: add_range_to_massive_blocks(range) - push_back(sequence, range) - error {:error, {step, reason}} = error -> @@ -233,8 +194,6 @@ defmodule Indexer.Block.Catchup.Fetcher do step: step ) - push_back(sequence, range) - error {:error, {step, failed_value, _changes_so_far}} = error -> @@ -245,8 +204,6 @@ defmodule Indexer.Block.Catchup.Fetcher do step: step ) - push_back(sequence, range) - error end rescue @@ -284,39 +241,6 @@ defmodule Indexer.Block.Catchup.Fetcher do |> MassiveBlock.insert_block_numbers() end - defp cap_seq(seq, errors) do - {not_founds, other_errors} = - Enum.split_with(errors, fn - %{code: 404, data: %{number: _}} -> true - _ -> false - end) - - case not_founds do - [] -> - Logger.debug("got blocks") - - other_errors - - _ -> - Sequence.cap(seq) - end - - other_errors - end - - defp push_back(sequence, range) do - case Sequence.push_back(sequence, range) do - :ok -> :ok - {:error, reason} -> Logger.error(fn -> ["Could not push back to Sequence: ", inspect(reason)] end) - end - end - - defp retry(sequence, block_errors) when is_list(block_errors) do - block_errors - |> block_errors_to_block_number_ranges() - |> Enum.map(&push_back(sequence, &1)) - end - defp clear_missing_ranges(initial_range, errors \\ []) do success_numbers = Enum.to_list(initial_range) -- Enum.map(errors, &block_error_to_number/1) @@ -325,12 +249,6 @@ defmodule Indexer.Block.Catchup.Fetcher do |> MissingRangesManipulator.clear_batch() end - defp block_errors_to_block_number_ranges(block_errors) when is_list(block_errors) do - block_errors - |> Enum.map(&block_error_to_number/1) - |> numbers_to_ranges() - end - defp block_error_to_number(%{data: %{number: number}}) when is_integer(number), do: number defp numbers_to_ranges([]), do: [] @@ -353,43 +271,4 @@ defmodule Indexer.Block.Catchup.Fetcher do fn range -> {:cont, range, nil} end ) end - - defp ranges_to_numbers(ranges) do - ranges - |> Enum.map(&Enum.to_list/1) - |> List.flatten() - end - - defp put_memory_monitor(sequence_options, %__MODULE__{memory_monitor: nil}) when is_list(sequence_options), - do: sequence_options - - defp put_memory_monitor(sequence_options, %__MODULE__{memory_monitor: memory_monitor}) - when is_list(sequence_options) do - Keyword.put(sequence_options, :memory_monitor, memory_monitor) - end - - @doc """ - Puts a list of block numbers to the front of the sequencing queue. - """ - @spec push_front([non_neg_integer()]) :: :ok | {:error, :queue_unavailable | :maximum_size | String.t()} - def push_front(block_numbers) do - if Process.whereis(@sequence_name) do - Enum.reduce_while(block_numbers, :ok, fn block_number, :ok -> - sequence_push_front(block_number) - end) - else - {:error, :queue_unavailable} - end - end - - defp sequence_push_front(block_number) do - if is_integer(block_number) do - case Sequence.push_front(@sequence_name, block_number..block_number) do - :ok -> {:cont, :ok} - {:error, _} = error -> {:halt, error} - end - else - Logger.warn(fn -> ["Received a non-integer block number: ", inspect(block_number)] end) - end - end end diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 7f1219ef2c..fccbab981a 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -76,6 +76,7 @@ defmodule Indexer.Block.Realtime.Fetcher do @impl GenServer def init(%{block_fetcher: %Block.Fetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments}) do Logger.metadata(fetcher: :block_realtime) + Process.flag(:trap_exit, true) {:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: :realtime, callback_module: __MODULE__}}, {:continue, {:init, subscribe_named_arguments}}} @@ -162,6 +163,11 @@ defmodule Indexer.Block.Realtime.Fetcher do {:noreply, state} end + @impl GenServer + def terminate(_reason, %__MODULE__{timer: timer}) do + Process.cancel_timer(timer) + end + if Application.compile_env(:explorer, :chain_type) == "stability" do defp fetch_validators_async do GenServer.cast(Indexer.Fetcher.Stability.Validator, :update_validators_list) diff --git a/apps/indexer/lib/indexer/temporary/uncataloged_token_transfers.ex b/apps/indexer/lib/indexer/temporary/uncataloged_token_transfers.ex index d048a33311..53664810aa 100644 --- a/apps/indexer/lib/indexer/temporary/uncataloged_token_transfers.ex +++ b/apps/indexer/lib/indexer/temporary/uncataloged_token_transfers.ex @@ -13,7 +13,7 @@ defmodule Indexer.Temporary.UncatalogedTokenTransfers do require Logger alias Explorer.Chain.TokenTransfer - alias Indexer.Block.Catchup.Fetcher + alias Explorer.Utility.MissingRangesManipulator alias Indexer.Temporary.UncatalogedTokenTransfers def child_spec([init_arguments]) do @@ -94,6 +94,11 @@ defmodule Indexer.Temporary.UncatalogedTokenTransfers do end defp async_push_front(block_numbers) do - Task.Supervisor.async_nolink(UncatalogedTokenTransfers.TaskSupervisor, Fetcher, :push_front, [block_numbers]) + Task.Supervisor.async_nolink( + UncatalogedTokenTransfers.TaskSupervisor, + MissingRangesManipulator, + :add_ranges_by_block_numbers, + [block_numbers] + ) end end diff --git a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs index f55ec9134f..8d9650a762 100644 --- a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs +++ b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs @@ -388,7 +388,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do assert :ok = Supervisor.terminate_child(pid, :task) - assert_receive {:DOWN, ^reference, :process, ^child_pid, :shutdown} + assert_receive {:DOWN, ^reference, :process, ^child_pid, :normal} end test "with other child_id returns {:error, :not_found}", %{pid: pid} do diff --git a/apps/indexer/test/indexer/block/catchup/missing_ranges_collector_test.exs b/apps/indexer/test/indexer/block/catchup/missing_ranges_collector_test.exs index dd36cd1e9e..77449d9004 100644 --- a/apps/indexer/test/indexer/block/catchup/missing_ranges_collector_test.exs +++ b/apps/indexer/test/indexer/block/catchup/missing_ranges_collector_test.exs @@ -16,23 +16,21 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do MissingRangesCollector.start_link([]) Process.sleep(1000) - assert [999_999..900_000//-1] = batch = MissingBlockRange.get_latest_batch(1) + assert [999_999..999_900//-1] = batch = MissingBlockRange.get_latest_batch(100) MissingBlockRange.clear_batch(batch) - assert [899_999..800_000//-1] = batch = MissingBlockRange.get_latest_batch(1) + assert [999_899..999_800//-1] = batch = MissingBlockRange.get_latest_batch(100) MissingBlockRange.clear_batch(batch) - assert [799_999..700_000//-1] = batch = MissingBlockRange.get_latest_batch(1) + assert [999_799..999_700//-1] = batch = MissingBlockRange.get_latest_batch(100) MissingBlockRange.clear_batch(batch) - insert(:block, number: 1_200_000) + insert(:block, number: 1_000_200) Process.sleep(1000) - assert [1_199_999..1_100_001//-1] = batch = MissingBlockRange.get_latest_batch(1) + assert [1_000_199..1_000_100//-1] = batch = MissingBlockRange.get_latest_batch(100) MissingBlockRange.clear_batch(batch) - assert [1_100_000..1_000_001//-1] = batch = MissingBlockRange.get_latest_batch(1) + assert [1_000_099..1_000_001//-1, 999_699..999_699//-1] = batch = MissingBlockRange.get_latest_batch(100) MissingBlockRange.clear_batch(batch) - assert [699_999..600_000//-1] = batch = MissingBlockRange.get_latest_batch(1) - MissingBlockRange.clear_batch(batch) - assert [599_999..500_124//-1, 500_122..500_000//-1] = MissingBlockRange.get_latest_batch(2) + assert [999_698..999_599//-1] = MissingBlockRange.get_latest_batch(100) end test "FIRST_BLOCK and LAST_BLOCK envs" do @@ -60,16 +58,14 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do test "infinite range" do Application.put_env(:indexer, :block_ranges, "1..5,3..5,2qw1..12,10..11a,,asd..qwe,10..latest") - insert(:block, number: 200_000) + insert(:block, number: 200) MissingRangesCollector.start_link([]) Process.sleep(500) - assert [199_999..100_010//-1] = batch = MissingBlockRange.get_latest_batch(1) - MissingBlockRange.clear_batch(batch) - assert [100_009..10//-1] = batch = MissingBlockRange.get_latest_batch(1) + assert [199..100//-1] = batch = MissingBlockRange.get_latest_batch(100) MissingBlockRange.clear_batch(batch) - assert [5..1//-1] = MissingBlockRange.get_latest_batch(1) + assert [99..10//-1, 5..1//-1] = MissingBlockRange.get_latest_batch(100) end test "finite range" do @@ -80,7 +76,7 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do MissingRangesCollector.start_link([]) Process.sleep(500) - assert [200..150//-1, 50..30//-1, 25..5//-1] = batch = MissingBlockRange.get_latest_batch(3) + assert [200..150//-1, 50..30//-1, 25..5//-1] = batch = MissingBlockRange.get_latest_batch(100) MissingBlockRange.clear_batch(batch) assert [] = MissingBlockRange.get_latest_batch() end @@ -96,7 +92,7 @@ defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do Process.sleep(500) assert [200..176//-1, 174..150//-1, 50..34//-1, 32..30//-1, 25..5//-1] = - batch = MissingBlockRange.get_latest_batch(5) + batch = MissingBlockRange.get_latest_batch(91) MissingBlockRange.clear_batch(batch) assert [] = MissingBlockRange.get_latest_batch() diff --git a/config/runtime.exs b/config/runtime.exs index 0e11ced48f..25d511d34f 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -594,7 +594,8 @@ config :indexer, ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_FETCHER_INIT_QUERY_LIMIT", 100_000), coin_balances_fetcher_init_limit: ConfigHelper.parse_integer_env_var("INDEXER_COIN_BALANCES_FETCHER_INIT_QUERY_LIMIT", 2000), - ipfs_gateway_url: System.get_env("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs") + ipfs_gateway_url: System.get_env("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs"), + graceful_shutdown_period: ConfigHelper.parse_time_env_var("INDEXER_GRACEFUL_SHUTDOWN_PERIOD", "5m") config :indexer, Indexer.Supervisor, enabled: !ConfigHelper.parse_bool_env_var("DISABLE_INDEXER") diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index e4dee6d526..44cb7ffbfc 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -225,6 +225,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false # INDEXER_FETCHER_INIT_QUERY_LIMIT= # INDEXER_TOKEN_BALANCES_FETCHER_INIT_QUERY_LIMIT= # INDEXER_COIN_BALANCES_FETCHER_INIT_QUERY_LIMIT= +# INDEXER_GRACEFUL_SHUTDOWN_PERIOD= # WITHDRAWALS_FIRST_BLOCK= # INDEXER_OPTIMISM_L1_RPC= # INDEXER_OPTIMISM_L1_BATCH_START_BLOCK=