Massive blocks fetcher (#9486)

* Massive blocks fetcher

* Improve massive blocks fetcher log

* Update apps/explorer/lib/explorer/utility/massive_block.ex

Co-authored-by: Maxim Filonov <53992153+sl1depengwyn@users.noreply.github.com>

* Add low priority queue for MassiveBlocksFetcher

---------

Co-authored-by: Maxim Filonov <53992153+sl1depengwyn@users.noreply.github.com>
pull/9640/head
Qwerty5Uiop 8 months ago committed by GitHub
parent de905162a0
commit 400b45b145
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 42
      apps/explorer/lib/explorer/utility/massive_block.ex
  3. 11
      apps/explorer/priv/repo/migrations/20240226074456_create_massive_blocks.exs
  4. 20
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  5. 88
      apps/indexer/lib/indexer/block/catchup/massive_blocks_fetcher.ex
  6. 3
      apps/indexer/lib/indexer/block/catchup/supervisor.ex
  7. 7
      apps/indexer/lib/indexer/block/fetcher.ex

@ -5,6 +5,7 @@
### Features
- [#9490](https://github.com/blockscout/blockscout/pull/9490) - Add blob transaction counter and filter in block view
- [#9486](https://github.com/blockscout/blockscout/pull/9486) - Massive blocks fetcher
- [#9473](https://github.com/blockscout/blockscout/pull/9473) - Add user_op interpretation
- [#9461](https://github.com/blockscout/blockscout/pull/9461) - Fetch blocks without internal transactions backwards
- [#9460](https://github.com/blockscout/blockscout/pull/9460) - Optimism chain type

@ -0,0 +1,42 @@
defmodule Explorer.Utility.MassiveBlock do
@moduledoc """
Module is responsible for keeping the block numbers that are too large for regular import
and need more time to complete.
"""
use Explorer.Schema
alias Explorer.Repo
@primary_key false
typed_schema "massive_blocks" do
field(:number, :integer, primary_key: true)
timestamps()
end
@doc false
def changeset(massive_block \\ %__MODULE__{}, params) do
cast(massive_block, params, [:number])
end
def get_last_block_number(except_numbers) do
__MODULE__
|> where([mb], mb.number not in ^except_numbers)
|> select([mb], max(mb.number))
|> Repo.one()
end
def insert_block_numbers(numbers) do
now = DateTime.utc_now()
params = Enum.map(numbers, &%{number: &1, inserted_at: now, updated_at: now})
Repo.insert_all(__MODULE__, params, on_conflict: {:replace, [:updated_at]}, conflict_target: :number)
end
def delete_block_number(number) do
__MODULE__
|> where([mb], mb.number == ^number)
|> Repo.delete_all()
end
end

@ -0,0 +1,11 @@
defmodule Explorer.Repo.Migrations.CreateMassiveBlocks do
use Ecto.Migration
def change do
create table(:massive_blocks, primary_key: false) do
add(:number, :bigint, primary_key: true)
timestamps()
end
end
end

@ -25,7 +25,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
alias Ecto.Changeset
alias Explorer.Chain
alias Explorer.Chain.NullRoundHeight
alias Explorer.Utility.MissingRangesManipulator
alias Explorer.Utility.{MassiveBlock, MissingRangesManipulator}
alias Indexer.{Block, Tracer}
alias Indexer.Block.Catchup.{Sequence, TaskSupervisor}
alias Indexer.Memory.Shrinkable
@ -219,6 +219,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
{:error, {:import = step, reason}} = error ->
Prometheus.Instrumenter.import_errors()
Logger.error(fn -> [inspect(reason), ". Retrying."] end, step: step)
if reason == :timeout, do: add_range_to_massive_blocks(range)
push_back(sequence, range)
@ -250,6 +251,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
rescue
exception ->
if timeout_exception?(exception), do: add_range_to_massive_blocks(range)
Logger.error(fn -> [Exception.format(:error, exception, __STACKTRACE__), ?\n, ?\n, "Retrying."] end)
{:error, exception}
end
@ -268,6 +270,20 @@ defmodule Indexer.Block.Catchup.Fetcher do
other_errors
end
defp timeout_exception?(%{message: message}) when is_binary(message) do
String.match?(message, ~r/due to a timeout/)
end
defp timeout_exception?(_exception), do: false
defp add_range_to_massive_blocks(range) do
clear_missing_ranges(range)
range
|> Enum.to_list()
|> MassiveBlock.insert_block_numbers()
end
defp cap_seq(seq, errors) do
{not_founds, other_errors} =
Enum.split_with(errors, fn
@ -301,7 +317,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
|> Enum.map(&push_back(sequence, &1))
end
defp clear_missing_ranges(initial_range, errors) do
defp clear_missing_ranges(initial_range, errors \\ []) do
success_numbers = Enum.to_list(initial_range) -- Enum.map(errors, &block_error_to_number/1)
success_numbers

@ -0,0 +1,88 @@
defmodule Indexer.Block.Catchup.MassiveBlocksFetcher do
@moduledoc """
Fetches and indexes blocks by numbers from massive_blocks table.
"""
use GenServer
require Logger
alias Explorer.Utility.MassiveBlock
alias Indexer.Block.Fetcher
@increased_interval 10000
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(_) do
send_new_task()
{:ok, %{block_fetcher: generate_block_fetcher(), low_priority_blocks: []}}
end
@impl true
def handle_info(:task, %{low_priority_blocks: low_priority_blocks} = state) do
{result, new_low_priority_blocks} =
case MassiveBlock.get_last_block_number(low_priority_blocks) do
nil ->
case low_priority_blocks do
[number | rest] ->
failed_blocks = process_block(state.block_fetcher, number)
{:processed, rest ++ failed_blocks}
[] ->
{:empty, []}
end
number ->
failed_blocks = process_block(state.block_fetcher, number)
{:processed, low_priority_blocks ++ failed_blocks}
end
case result do
:processed -> send_new_task()
:empty -> send_new_task(@increased_interval)
end
{:noreply, %{state | low_priority_blocks: new_low_priority_blocks}}
end
def handle_info(_, state) do
{:noreply, state}
end
defp process_block(block_fetcher, number) do
case Fetcher.fetch_and_import_range(block_fetcher, number..number, %{timeout: :infinity}) do
{:ok, _result} ->
Logger.info("MassiveBlockFetcher successfully proceed block #{inspect(number)}")
MassiveBlock.delete_block_number(number)
[]
{:error, error} ->
Logger.error("MassiveBlockFetcher failed: #{inspect(error)}")
[number]
end
end
defp generate_block_fetcher do
receipts_batch_size = Application.get_env(:indexer, :receipts_batch_size)
receipts_concurrency = Application.get_env(:indexer, :receipts_concurrency)
json_rpc_named_arguments = Application.get_env(:indexer, :json_rpc_named_arguments)
%Fetcher{
broadcast: :catchup,
callback_module: Indexer.Block.Catchup.Fetcher,
json_rpc_named_arguments: json_rpc_named_arguments,
receipts_batch_size: receipts_batch_size,
receipts_concurrency: receipts_concurrency
}
end
defp send_new_task(interval \\ 0) do
Process.send_after(self(), :task, interval)
end
end

@ -5,7 +5,7 @@ defmodule Indexer.Block.Catchup.Supervisor do
use Supervisor
alias Indexer.Block.Catchup.{BoundIntervalSupervisor, MissingRangesCollector}
alias Indexer.Block.Catchup.{BoundIntervalSupervisor, MassiveBlocksFetcher, MissingRangesCollector}
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
@ -31,6 +31,7 @@ defmodule Indexer.Block.Catchup.Supervisor do
[
{MissingRangesCollector, []},
{Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor},
{MassiveBlocksFetcher, []},
{BoundIntervalSupervisor, [bound_interval_supervisor_arguments, [name: BoundIntervalSupervisor]]}
],
strategy: :one_for_one

@ -118,7 +118,7 @@ defmodule Indexer.Block.Fetcher do
end
@decorate span(tracer: Tracer)
@spec fetch_and_import_range(t, Range.t()) ::
@spec fetch_and_import_range(t, Range.t(), map) ::
{:ok, %{inserted: %{}, errors: [EthereumJSONRPC.Transport.error()]}}
| {:error,
{step :: atom(), reason :: [Ecto.Changeset.t()] | term()}
@ -129,7 +129,8 @@ defmodule Indexer.Block.Fetcher do
callback_module: callback_module,
json_rpc_named_arguments: json_rpc_named_arguments
} = state,
_.._ = range
_.._ = range,
additional_options \\ %{}
)
when callback_module != nil do
{fetch_time, fetched_blocks} =
@ -228,7 +229,7 @@ defmodule Indexer.Block.Fetcher do
{:ok, inserted} <-
__MODULE__.import(
state,
import_options(basic_import_options, chain_type_import_options)
basic_import_options |> Map.merge(additional_options) |> import_options(chain_type_import_options)
),
{:tx_actions, {:ok, inserted_tx_actions}} <-
{:tx_actions,

Loading…
Cancel
Save