feat: add blobs fetcher

pull/9168/head
Kirill Fedoseev 10 months ago
parent d49020ef89
commit 00a1dfb0d3
  1. 1
      apps/block_scout_web/lib/block_scout_web/api_router.ex
  2. 5
      apps/block_scout_web/lib/block_scout_web/chain.ex
  3. 24
      apps/block_scout_web/lib/block_scout_web/controllers/api/v2/blob_controller.ex
  4. 2
      apps/block_scout_web/lib/block_scout_web/controllers/api/v2/transaction_controller.ex
  5. 23
      apps/block_scout_web/lib/block_scout_web/paging_helper.ex
  6. 22
      apps/block_scout_web/lib/block_scout_web/views/api/v2/blob_view.ex
  7. 10
      apps/block_scout_web/lib/block_scout_web/views/api/v2/block_view.ex
  8. 31
      apps/block_scout_web/lib/block_scout_web/views/api/v2/transaction_view.ex
  9. 11
      apps/explorer/lib/explorer/chain.ex
  10. 65
      apps/explorer/lib/explorer/chain/beacon/reader.ex
  11. 6
      apps/explorer/lib/explorer/chain/block.ex
  12. 1
      apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex
  13. 13
      apps/explorer/priv/beacon/migrations/20240109102458_create_blobs_tables.exs
  14. 2
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  15. 10
      apps/indexer/lib/indexer/block/fetcher.ex
  16. 2
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  17. 160
      apps/indexer/lib/indexer/fetcher/beacon/blob.ex
  18. 77
      apps/indexer/lib/indexer/fetcher/beacon/client.ex
  19. 1
      apps/indexer/lib/indexer/supervisor.ex
  20. 15
      config/runtime.exs

@ -315,7 +315,6 @@ defmodule BlockScoutWeb.ApiRouter do
scope "/blobs" do scope "/blobs" do
if System.get_env("CHAIN_TYPE") == "ethereum" do if System.get_env("CHAIN_TYPE") == "ethereum" do
get("/", V2.BlobController, :blobs)
get("/:blob_hash_param", V2.BlobController, :blob) get("/:blob_hash_param", V2.BlobController, :blob)
end end
end end

@ -649,11 +649,6 @@ defmodule BlockScoutWeb.Chain do
%{"id" => msg_id} %{"id" => msg_id}
end end
# Beacon blob transactions
defp paging_params(%{block_number: block_number, index: index}) do
%{"block_number" => block_number, "index" => index}
end
@spec paging_params_with_fiat_value(CurrentTokenBalance.t()) :: %{ @spec paging_params_with_fiat_value(CurrentTokenBalance.t()) :: %{
required(String.t()) => Decimal.t() | non_neg_integer() | nil required(String.t()) => Decimal.t() | non_neg_integer() | nil
} }

@ -8,8 +8,8 @@ defmodule BlockScoutWeb.API.V2.BlobController do
split_list_by_page: 1 split_list_by_page: 1
] ]
alias Explorer.Chain.Beacon.Reader
alias Explorer.Chain alias Explorer.Chain
alias Explorer.Chain.Beacon.Reader
action_fallback(BlockScoutWeb.API.V2.FallbackController) action_fallback(BlockScoutWeb.API.V2.FallbackController)
@ -34,26 +34,4 @@ defmodule BlockScoutWeb.API.V2.BlobController do
end end
end end
end end
@doc """
Function to handle GET requests to `/api/v2/blobs` endpoint.
"""
@spec blobs(Plug.Conn.t(), map()) :: Plug.Conn.t()
def blobs(conn, params) do
{blobs_transactions, next_page} =
params
|> paging_options()
|> Keyword.put(:api?, true)
|> Reader.blobs_transactions()
|> split_list_by_page()
next_page_params = next_page_params(next_page, blobs_transactions, params)
conn
|> put_status(200)
|> render(:blobs_transactions, %{
blobs_transactions: blobs_transactions,
next_page_params: next_page_params
})
end
end end

@ -2,7 +2,7 @@ defmodule BlockScoutWeb.API.V2.TransactionController do
use BlockScoutWeb, :controller use BlockScoutWeb, :controller
import BlockScoutWeb.Account.AuthController, only: [current_user: 1] import BlockScoutWeb.Account.AuthController, only: [current_user: 1]
alias BlockScoutWeb.API.V2.{BlobView} alias BlockScoutWeb.API.V2.BlobView
import BlockScoutWeb.Chain, import BlockScoutWeb.Chain,
only: [ only: [

@ -9,7 +9,28 @@ defmodule BlockScoutWeb.PagingHelper do
@page_size 50 @page_size 50
@default_paging_options %PagingOptions{page_size: @page_size + 1} @default_paging_options %PagingOptions{page_size: @page_size + 1}
@allowed_filter_labels ["validated", "pending"] @allowed_filter_labels ["validated", "pending"]
@allowed_type_labels ["coin_transfer", "contract_call", "contract_creation", "token_transfer", "token_creation"]
case Application.compile_env(:explorer, :chain_type) do
"ethereum" ->
@allowed_type_labels [
"coin_transfer",
"contract_call",
"contract_creation",
"token_transfer",
"token_creation",
"blob_transaction"
]
_ ->
@allowed_type_labels [
"coin_transfer",
"contract_call",
"contract_creation",
"token_transfer",
"token_creation"
]
end
@allowed_token_transfer_type_labels ["ERC-20", "ERC-721", "ERC-1155"] @allowed_token_transfer_type_labels ["ERC-20", "ERC-721", "ERC-1155"]
@allowed_nft_token_type_labels ["ERC-721", "ERC-1155"] @allowed_nft_token_type_labels ["ERC-721", "ERC-1155"]

@ -5,7 +5,7 @@ defmodule BlockScoutWeb.API.V2.BlobView do
alias Explorer.Chain.Beacon.Blob alias Explorer.Chain.Beacon.Blob
def render("blob.json", %{blob: blob, transaction_hashes: transaction_hashes}) do def render("blob.json", %{blob: blob, transaction_hashes: transaction_hashes}) do
prepare_blob(blob) |> Map.put("transaction_hashes", transaction_hashes) blob |> prepare_blob() |> Map.put("transaction_hashes", transaction_hashes)
end end
def render("blob.json", %{transaction_hashes: transaction_hashes}) do def render("blob.json", %{transaction_hashes: transaction_hashes}) do
@ -16,27 +16,17 @@ defmodule BlockScoutWeb.API.V2.BlobView do
%{"items" => Enum.map(blobs, &prepare_blob(&1))} %{"items" => Enum.map(blobs, &prepare_blob(&1))}
end end
def render("blobs_transactions.json", %{blobs_transactions: blobs_transactions, next_page_params: next_page_params}) do
%{"items" => Enum.map(blobs_transactions, &prepare_blob_transaction(&1)), "next_page_params" => next_page_params}
end
@spec prepare_blob(Blob.t()) :: map() @spec prepare_blob(Blob.t()) :: map()
def prepare_blob(blob) do def prepare_blob(blob) do
%{ %{
"hash" => blob.hash, "hash" => blob.hash,
"blob_data" => blob.blob_data, "blob_data" => encode_binary(blob.blob_data),
"kzg_commitment" => blob.kzg_commitment, "kzg_commitment" => encode_binary(blob.kzg_commitment),
"kzg_proof" => blob.kzg_proof "kzg_proof" => encode_binary(blob.kzg_proof)
} }
end end
@spec prepare_blob_transaction(%{block_number: non_neg_integer(), blob_hashes: [Hash.t()], transaction_hash: Hash.t()}) :: defp encode_binary(binary) do
map() "0x" <> Base.encode16(binary, case: :lower)
def prepare_blob_transaction(blob_transaction) do
%{
"block_number" => blob_transaction.block_number,
"blob_hashes" => blob_transaction.blob_hashes,
"transaction_hash" => blob_transaction.transaction_hash
}
end end
end end

@ -28,22 +28,22 @@ defmodule BlockScoutWeb.API.V2.BlockView do
end end
def prepare_block(block, _conn, single_block? \\ false) do def prepare_block(block, _conn, single_block? \\ false) do
burnt_fees = Block.burnt_fees(block.transactions, block.base_fee_per_gas) burnt_fees_execution = Block.burnt_fees(block.transactions, block.base_fee_per_gas)
priority_fee = block.base_fee_per_gas && BlockPriorityFeeCounter.fetch(block.hash) priority_fee = block.base_fee_per_gas && BlockPriorityFeeCounter.fetch(block.hash)
transaction_fees = Block.transaction_fees(block.transactions) transaction_fees_execution = Block.transaction_fees(block.transactions)
{transaction_fees, burnt_fees, blob_gas_price} = {transaction_fees, burnt_fees, blob_gas_price} =
if Application.get_env(:explorer, :chain_type) == "ethereum" do if Application.get_env(:explorer, :chain_type) == "ethereum" do
blob_transaction_fees = Block.blob_transaction_fees(block.transactions) blob_transaction_fees = Block.blob_transaction_fees(block.transactions)
{ {
transaction_fees |> Decimal.add(blob_transaction_fees), transaction_fees_execution |> Decimal.add(blob_transaction_fees),
burnt_fees |> Decimal.add(blob_transaction_fees), burnt_fees_execution |> Decimal.add(blob_transaction_fees),
blob_transaction_fees |> Decimal.div(block.blob_gas_used) blob_transaction_fees |> Decimal.div(block.blob_gas_used)
} }
else else
{transaction_fees, burnt_fees, nil} {transaction_fees_execution, burnt_fees_execution, nil}
end end
%{ %{

@ -434,13 +434,13 @@ defmodule BlockScoutWeb.API.V2.TransactionView do
end end
defp chain_type_fields(result, transaction, single_tx?, conn, watchlist_names) do defp chain_type_fields(result, transaction, single_tx?, conn, watchlist_names) do
case single_tx? && Application.get_env(:explorer, :chain_type) do case {single_tx?, Application.get_env(:explorer, :chain_type)} do
"polygon_edge" -> {true, "polygon_edge"} ->
result result
|> Map.put("polygon_edge_deposit", polygon_edge_deposit(transaction.hash, conn)) |> Map.put("polygon_edge_deposit", polygon_edge_deposit(transaction.hash, conn))
|> Map.put("polygon_edge_withdrawal", polygon_edge_withdrawal(transaction.hash, conn)) |> Map.put("polygon_edge_withdrawal", polygon_edge_withdrawal(transaction.hash, conn))
"polygon_zkevm" -> {true, "polygon_zkevm"} ->
extended_result = extended_result =
result result
|> add_optional_transaction_field(transaction, "zkevm_batch_number", :zkevm_batch, :number) |> add_optional_transaction_field(transaction, "zkevm_batch_number", :zkevm_batch, :number)
@ -449,21 +449,20 @@ defmodule BlockScoutWeb.API.V2.TransactionView do
Map.put(extended_result, "zkevm_status", zkevm_status(extended_result)) Map.put(extended_result, "zkevm_status", zkevm_status(extended_result))
"suave" -> {true, "suave"} ->
suave_fields(transaction, result, single_tx?, conn, watchlist_names) suave_fields(transaction, result, single_tx?, conn, watchlist_names)
# TODO this will not preload blob fields if single_tx? is false. is it ok? there is no preload in block_controller.ex for such case as well {_, "ethereum"} ->
"ethereum" ->
beacon_blob_transaction = transaction.beacon_blob_transaction beacon_blob_transaction = transaction.beacon_blob_transaction
if !is_nil(beacon_blob_transaction) do if is_nil(beacon_blob_transaction) or beacon_blob_transaction == %Ecto.Association.NotLoaded{} do
result
else
result result
|> Map.put("max_fee_per_blob_gas", beacon_blob_transaction.max_fee_per_blob_gas) |> Map.put("max_fee_per_blob_gas", beacon_blob_transaction.max_fee_per_blob_gas)
|> Map.put("blob_versioned_hashes", beacon_blob_transaction.blob_versioned_hashes) |> Map.put("blob_versioned_hashes", beacon_blob_transaction.blob_versioned_hashes)
|> Map.put("blob_gas_used", beacon_blob_transaction.blob_gas_used) |> Map.put("blob_gas_used", beacon_blob_transaction.blob_gas_used)
|> Map.put("blob_gas_price", beacon_blob_transaction.blob_gas_price) |> Map.put("blob_gas_price", beacon_blob_transaction.blob_gas_price)
else
result
end end
_ -> _ ->
@ -777,7 +776,19 @@ defmodule BlockScoutWeb.API.V2.TransactionView do
| :rootstock_remasc | :rootstock_remasc
| :token_creation | :token_creation
| :token_transfer | :token_transfer
def tx_types(tx, types \\ [], stage \\ :token_transfer) | :blob_transaction
def tx_types(tx, types \\ [], stage \\ :blob_transaction)
def tx_types(%Transaction{type: type} = tx, types, :blob_transaction) do
types =
if type == 3 do
[:blob_transaction | types]
else
types
end
tx_types(tx, types, :token_transfer)
end
def tx_types(%Transaction{token_transfers: token_transfers} = tx, types, :token_transfer) do def tx_types(%Transaction{token_transfers: token_transfers} = tx, types, :token_transfer) do
types = types =

@ -66,7 +66,7 @@ defmodule Explorer.Chain do
Withdrawal Withdrawal
} }
alias Explorer.Chain.Beacon.{BlobTransaction, Blob} alias Explorer.Chain.Beacon.{Blob, BlobTransaction}
alias Explorer.Chain.Block.{EmissionReward, Reward} alias Explorer.Chain.Block.{EmissionReward, Reward}
alias Explorer.Chain.Cache.{ alias Explorer.Chain.Cache.{
@ -5028,6 +5028,11 @@ defmodule Explorer.Chain do
as: :created_token as: :created_token
) )
) )
:blob_transaction ->
dynamic
|> filter_blob_transaction_dynamic()
|> apply_filter_by_tx_type_to_transactions_inner(remain, query)
end end
end end
@ -5061,6 +5066,10 @@ defmodule Explorer.Chain do
dynamic([tx, created_token: created_token], ^dynamic or not is_nil(created_token)) dynamic([tx, created_token: created_token], ^dynamic or not is_nil(created_token))
end end
def filter_blob_transaction_dynamic(dynamic) do
dynamic([tx], ^dynamic or tx.type == 3)
end
def count_verified_contracts do def count_verified_contracts do
Repo.aggregate(SmartContract, :count, timeout: :infinity) Repo.aggregate(SmartContract, :count, timeout: :infinity)
end end

@ -3,6 +3,8 @@ defmodule Explorer.Chain.Beacon.Reader do
import Ecto.Query, import Ecto.Query,
only: [ only: [
subquery: 1,
preload: 2,
from: 2, from: 2,
limit: 2, limit: 2,
order_by: 3, order_by: 3,
@ -14,9 +16,9 @@ defmodule Explorer.Chain.Beacon.Reader do
import Explorer.Chain, only: [select_repo: 1] import Explorer.Chain, only: [select_repo: 1]
alias Explorer.Chain.Beacon.{BlobTransaction, Blob} alias Explorer.Chain.Beacon.{Blob, BlobTransaction}
alias Explorer.{Chain, PagingOptions, Repo} alias Explorer.{Chain, PagingOptions, Repo}
alias Explorer.Chain.{Transaction, Hash} alias Explorer.Chain.{Hash, Transaction}
def blob(hash, options) when is_list(options) do def blob(hash, options) when is_list(options) do
Blob Blob
@ -41,29 +43,48 @@ defmodule Explorer.Chain.Beacon.Reader do
|> select_repo(options).all() |> select_repo(options).all()
end end
def blobs_transactions(options) when is_list(options) do def stream_missed_blob_transactions_timestamps(min_block, max_block, initial, reducer, options \\ [])
paging_options = Keyword.get(options, :paging_options, Chain.default_paging_options()) when is_list(options) do
query =
from(
transaction_blob in subquery(
from(
blob_transaction in BlobTransaction,
select: %{
transaction_hash: blob_transaction.hash,
blob_hash: fragment("unnest(blob_versioned_hashes)")
}
)
),
inner_join: transaction in Transaction,
on: transaction_blob.transaction_hash == transaction.hash,
where: transaction.type == 3,
left_join: blob in Blob,
on: blob.hash == transaction_blob.blob_hash,
where: is_nil(blob.hash),
distinct: transaction.block_timestamp,
select: transaction.block_timestamp
)
BlobTransaction query
|> join(:inner, [bt], transaction in Transaction, on: bt.hash == transaction.hash) |> add_min_block_filter(min_block)
|> where([bt, transaction], transaction.type == 3) |> add_max_block_filter(min_block)
|> order_by([bt, transaction], desc: transaction.block_number, desc: transaction.index) |> Repo.stream_reduce(initial, reducer)
|> page_blobs_transactions(paging_options)
|> limit(^paging_options.page_size)
|> select([bt, transaction], %{
block_number: transaction.block_number,
index: transaction.index,
blob_hashes: bt.blob_versioned_hashes,
transaction_hash: bt.hash
})
|> select_repo(options).all()
end end
defp page_blobs_transactions(query, %PagingOptions{key: nil}), do: query defp add_min_block_filter(query, block_number) do
if is_integer(block_number) do
query |> where([_, transaction], transaction.block_number <= ^block_number)
else
query
end
end
defp page_blobs_transactions(query, %PagingOptions{key: {block_number, index}}) do defp add_max_block_filter(query, block_number) do
from([bt, transaction] in query, if is_integer(block_number) and block_number > 0 do
where: fragment("(?, ?) <= (?, ?)", transaction.block_number, transaction.index, ^block_number, ^index) query |> where([_, transaction], transaction.block_number >= ^block_number)
) else
query
end
end end
end end

@ -267,13 +267,13 @@ defmodule Explorer.Chain.Block do
@spec blob_transaction_fees([Transaction.t()]) :: Decimal.t() @spec blob_transaction_fees([Transaction.t()]) :: Decimal.t()
def blob_transaction_fees(transactions) do def blob_transaction_fees(transactions) do
Enum.reduce(transactions, Decimal.new(0), fn %{beacon_blob_transaction: beacon_blob_transaction}, acc -> Enum.reduce(transactions, Decimal.new(0), fn %{beacon_blob_transaction: beacon_blob_transaction}, acc ->
if !is_nil(beacon_blob_transaction) do if is_nil(beacon_blob_transaction) do
acc
else
beacon_blob_transaction.blob_gas_used beacon_blob_transaction.blob_gas_used
|> Decimal.new() |> Decimal.new()
|> Decimal.mult(gas_price_to_decimal(beacon_blob_transaction.blob_gas_price)) |> Decimal.mult(gas_price_to_decimal(beacon_blob_transaction.blob_gas_price))
|> Decimal.add(acc) |> Decimal.add(acc)
else
acc
end end
end) end)
end end

@ -39,6 +39,7 @@ defmodule Explorer.Chain.Import.Stage.BlockReferencing do
] ]
"ethereum" -> "ethereum" ->
# credo:disable-for-next-line
@default_runners ++ @default_runners ++
[ [
Runner.Beacon.BlobTransactions Runner.Beacon.BlobTransactions

@ -3,7 +3,11 @@ defmodule Explorer.Repo.Beacon.Migrations.CreateBlobsTables do
def change do def change do
create table(:beacon_blobs_transactions, primary_key: false) do create table(:beacon_blobs_transactions, primary_key: false) do
add(:hash, :bytea, null: false, primary_key: true) add(:hash, references(:transactions, column: :hash, on_delete: :delete_all, type: :bytea),
null: false,
primary_key: true
)
add(:max_fee_per_blob_gas, :numeric, precision: 100, null: false) add(:max_fee_per_blob_gas, :numeric, precision: 100, null: false)
add(:blob_gas_price, :numeric, precision: 100, null: false) add(:blob_gas_price, :numeric, precision: 100, null: false)
add(:blob_gas_used, :numeric, precision: 100, null: false) add(:blob_gas_used, :numeric, precision: 100, null: false)
@ -18,16 +22,13 @@ defmodule Explorer.Repo.Beacon.Migrations.CreateBlobsTables do
end end
create table(:beacon_blobs, primary_key: false) do create table(:beacon_blobs, primary_key: false) do
add(:hash, references(:transactions, column: :hash, on_delete: :delete_all, type: :bytea), add(:hash, :bytea, null: false, primary_key: true)
null: false,
primary_key: true
)
add(:blob_data, :bytea, null: true) add(:blob_data, :bytea, null: true)
add(:kzg_commitment, :bytea, null: true) add(:kzg_commitment, :bytea, null: true)
add(:kzg_proof, :bytea, null: true) add(:kzg_proof, :bytea, null: true)
timestamps(updated_at: false, null: false, type: :utc_datetime_usec) timestamps(updated_at: false, null: false, type: :utc_datetime_usec, default: fragment("now()"))
end end
end end
end end

@ -9,6 +9,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
import Indexer.Block.Fetcher, import Indexer.Block.Fetcher,
only: [ only: [
async_import_blobs: 1,
async_import_block_rewards: 1, async_import_block_rewards: 1,
async_import_coin_balances: 2, async_import_coin_balances: 2,
async_import_created_contract_codes: 1, async_import_created_contract_codes: 1,
@ -163,6 +164,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
async_import_uncles(imported) async_import_uncles(imported)
async_import_replaced_transactions(imported) async_import_replaced_transactions(imported)
async_import_token_instances(imported) async_import_token_instances(imported)
async_import_blobs(imported)
end end
defp stream_fetch_and_import(state, sequence) defp stream_fetch_and_import(state, sequence)

@ -19,6 +19,7 @@ defmodule Indexer.Block.Fetcher do
alias Indexer.Fetcher.TokenInstance.Realtime, as: TokenInstanceRealtime alias Indexer.Fetcher.TokenInstance.Realtime, as: TokenInstanceRealtime
alias Indexer.Fetcher.{ alias Indexer.Fetcher.{
Beacon.Blob,
BlockReward, BlockReward,
CoinBalance, CoinBalance,
ContractCode, ContractCode,
@ -384,6 +385,15 @@ defmodule Indexer.Block.Fetcher do
def async_import_replaced_transactions(_), do: :ok def async_import_replaced_transactions(_), do: :ok
def async_import_blobs(%{blocks: blocks}) do
timestamps =
blocks
|> Enum.filter(fn %{blob_gas_used: blob_gas_used} -> blob_gas_used > 0 end)
|> Enum.map(&Map.get(&1, :timestamp))
Blob.async_fetch(timestamps)
end
defp block_reward_errors_to_block_numbers(block_reward_errors) when is_list(block_reward_errors) do defp block_reward_errors_to_block_numbers(block_reward_errors) when is_list(block_reward_errors) do
Enum.map(block_reward_errors, &block_reward_error_to_block_number/1) Enum.map(block_reward_errors, &block_reward_error_to_block_number/1)
end end

@ -13,6 +13,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
import Indexer.Block.Fetcher, import Indexer.Block.Fetcher,
only: [ only: [
async_import_blobs: 1,
async_import_block_rewards: 1, async_import_block_rewards: 1,
async_import_created_contract_codes: 1, async_import_created_contract_codes: 1,
async_import_internal_transactions: 1, async_import_internal_transactions: 1,
@ -429,6 +430,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
async_import_token_instances(imported) async_import_token_instances(imported)
async_import_uncles(imported) async_import_uncles(imported)
async_import_replaced_transactions(imported) async_import_replaced_transactions(imported)
async_import_blobs(imported)
end end
defp balances( defp balances(

@ -0,0 +1,160 @@
defmodule Indexer.Fetcher.Beacon.Blob do
@moduledoc """
Fills beacon_blobs DB table.
"""
use Indexer.Fetcher, restart: :permanent
use Spandex.Decorators
require Logger
alias Explorer.Repo
alias Explorer.Chain.{Data, Hash}
alias Explorer.Chain.Beacon.{Blob, Reader}
alias Indexer.{BufferedTask, Tracer}
alias Indexer.Fetcher.Beacon.Blob.Supervisor, as: BlobSupervisor
alias Indexer.Fetcher.Beacon.Client
@behaviour BufferedTask
@default_max_batch_size 10
@default_max_concurrency 1
@doc """
Asynchronously fetches blobs for given `block_timestamp`.
"""
def async_fetch(block_timestamps) do
if BlobSupervisor.disabled?() do
:ok
else
BufferedTask.buffer(__MODULE__, block_timestamps |> Enum.map(&entry/1))
end
end
@spec child_spec([...]) :: %{
:id => any(),
:start => {atom(), atom(), list()},
optional(:modules) => :dynamic | [atom()],
optional(:restart) => :permanent | :temporary | :transient,
optional(:shutdown) => :brutal_kill | :infinity | non_neg_integer(),
optional(:significant) => boolean(),
optional(:type) => :supervisor | :worker
}
@doc false
# credo:disable-for-next-line Credo.Check.Design.DuplicatedCode
def child_spec([init_options, gen_server_options]) do
state =
:indexer
|> Application.get_env(__MODULE__)
|> Keyword.take([:start_block, :end_block, :reference_slot, :reference_timestamp, :slot_duration])
|> Enum.into(%{})
merged_init_options =
defaults()
|> Keyword.merge(init_options)
|> Keyword.put(:state, state)
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(initial, reducer, state) do
{:ok, final} =
Reader.stream_missed_blob_transactions_timestamps(
state.start_block,
state.end_block,
initial,
fn fields, acc ->
fields
|> entry()
|> reducer.(acc)
end
)
final
end
@impl BufferedTask
@decorate trace(
name: "fetch",
resource: "Indexer.Fetcher.Beacon.Blob.run/2",
service: :indexer,
tracer: Tracer
)
def run(entries, state) do
entry_count = Enum.count(entries)
Logger.metadata(count: entry_count)
Logger.debug(fn -> "fetching" end)
entries
|> Enum.map(&timestamp_to_slot(&1, state))
|> Client.get_blob_sidecars()
|> case do
{:ok, fetched_blobs, retries} ->
run_fetched_blobs(fetched_blobs)
if Enum.empty?(retries) do
:ok
else
{:retry, retries |> Enum.map(&Enum.at(entries, &1))}
end
end
end
defp entry(block_timestamp) do
DateTime.to_unix(block_timestamp)
end
defp timestamp_to_slot(block_timestamp, %{
reference_timestamp: reference_timestamp,
reference_slot: reference_slot,
slot_duration: slot_duration
}) do
((block_timestamp - reference_timestamp) |> div(slot_duration)) + reference_slot
end
defp run_fetched_blobs(fetched_blobs) do
blobs =
fetched_blobs
|> Enum.flat_map(fn %{"data" => blobs} -> blobs end)
|> Enum.map(&blob_entry/1)
Repo.insert_all(Blob, blobs, on_conflict: :nothing, conflict_target: [:hash])
end
def blob_entry(%{
"blob" => blob,
"kzg_commitment" => kzg_commitment,
"kzg_proof" => kzg_proof
}) do
{:ok, kzg_commitment} = Data.cast(kzg_commitment)
{:ok, blob} = Data.cast(blob)
{:ok, kzg_proof} = Data.cast(kzg_proof)
%{
hash: blob_hash(kzg_commitment.bytes),
blob_data: blob.bytes,
kzg_commitment: kzg_commitment.bytes,
kzg_proof: kzg_proof.bytes
}
end
def blob_hash(kzg_commitment) do
raw_hash = :crypto.hash(:sha256, kzg_commitment)
<<_::size(8), rest::binary>> = raw_hash
{:ok, hash} = Hash.Full.cast(<<1>> <> rest)
hash
end
defp defaults do
[
poll: false,
flush_interval: :timer.seconds(3),
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
task_supervisor: Indexer.Fetcher.Beacon.Blob.TaskSupervisor,
metadata: [fetcher: :beacon_blobs_sanitize]
]
end
end

@ -0,0 +1,77 @@
defmodule Indexer.Fetcher.Beacon.Client do
@moduledoc """
HTTP Client for Beacon Chain RPC
"""
alias HTTPoison.Response
require Logger
@request_error_msg "Error while sending request to beacon rpc"
def http_get_request(url) do
case HTTPoison.get(url) do
{:ok, %Response{body: body, status_code: 200}} ->
Jason.decode(body)
{:ok, %Response{body: body, status_code: _}} ->
{:error, body}
{:error, error} ->
old_truncate = Application.get_env(:logger, :truncate)
Logger.configure(truncate: :infinity)
Logger.error(fn ->
[
"Error while sending request to beacon rpc: #{url}: ",
inspect(error, limit: :infinity, printable_limit: :infinity)
]
end)
Logger.configure(truncate: old_truncate)
{:error, @request_error_msg}
end
end
def get_blob_sidecars(slots) when is_list(slots) do
{oks, errors_with_retries} =
slots
|> Enum.map(&get_blob_sidecars/1)
|> Enum.with_index()
|> Enum.map(&first_if_ok/1)
|> Enum.split_with(&successful?/1)
{errors, retries} = errors_with_retries |> Enum.unzip()
if !Enum.empty?(errors) do
Logger.error(fn ->
[
"Errors while fetching blob sidecars (failed for #{Enum.count(errors)}/#{Enum.count(slots)}) from beacon rpc: ",
inspect(Enum.take(errors, 3), limit: :infinity, printable_limit: :infinity)
]
end)
end
{:ok, oks |> Enum.map(fn {_, blob} -> blob end), retries}
end
def get_blob_sidecars(slot) do
http_get_request(blob_sidecars_url(slot))
end
defp first_if_ok({{:ok, _} = first, _}), do: first
defp first_if_ok(res), do: res
defp successful?({:ok, _}), do: true
defp successful?(_), do: false
def get_header(slot) do
http_get_request(header_url(slot))
end
def blob_sidecars_url(slot), do: "#{base_url()}" <> "/eth/v1/beacon/blob_sidecars/" <> to_string(slot)
def header_url(slot), do: "#{base_url()}" <> "/eth/v1/beacon/headers/" <> to_string(slot)
def base_url do
Application.get_env(:indexer, Indexer.Fetcher.Beacon)[:beacon_rpc]
end
end

@ -139,6 +139,7 @@ defmodule Indexer.Supervisor do
configure(TransactionBatch.Supervisor, [ configure(TransactionBatch.Supervisor, [
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor] [json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]
]), ]),
{Indexer.Fetcher.Beacon.Blob.Supervisor, [[memory_monitor: memory_monitor]]},
# Out-of-band fetchers # Out-of-band fetchers
{EmptyBlocksSanitizer.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments]]}, {EmptyBlocksSanitizer.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments]]},

@ -679,6 +679,21 @@ config :indexer, Indexer.Fetcher.RootstockData,
max_concurrency: ConfigHelper.parse_integer_env_var("INDEXER_ROOTSTOCK_DATA_FETCHER_CONCURRENCY", 5), max_concurrency: ConfigHelper.parse_integer_env_var("INDEXER_ROOTSTOCK_DATA_FETCHER_CONCURRENCY", 5),
db_batch_size: ConfigHelper.parse_integer_env_var("INDEXER_ROOTSTOCK_DATA_FETCHER_DB_BATCH_SIZE", 300) db_batch_size: ConfigHelper.parse_integer_env_var("INDEXER_ROOTSTOCK_DATA_FETCHER_DB_BATCH_SIZE", 300)
config :indexer, Indexer.Fetcher.Beacon, beacon_rpc: System.get_env("INDEXER_BEACON_RPC_URL")
config :indexer, Indexer.Fetcher.Beacon.Blob.Supervisor,
disabled?:
ConfigHelper.chain_type() != "ethereum" ||
ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_BEACON_BLOB_SANITIZE_FETCHER")
config :indexer, Indexer.Fetcher.Beacon.Blob,
slot_duration: ConfigHelper.parse_integer_env_var("INDEXER_BEACON_BLOB_SANITIZE_FETCHER_SLOT_DURATION", 12),
reference_slot: ConfigHelper.parse_integer_env_var("INDEXER_BEACON_BLOB_SANITIZE_FETCHER_REFERENCE_SLOT", 8_206_822),
reference_timestamp:
ConfigHelper.parse_integer_env_var("INDEXER_BEACON_BLOB_SANITIZE_FETCHER_REFERENCE_TIMESTAMP", 1_705_305_887),
start_block: ConfigHelper.parse_integer_env_var("INDEXER_BEACON_BLOB_SANITIZE_FETCHER_START_BLOCK", 8_206_822),
end_block: ConfigHelper.parse_integer_env_var("INDEXER_BEACON_BLOB_SANITIZE_FETCHER_END_BLOCK", 0)
Code.require_file("#{config_env()}.exs", "config/runtime") Code.require_file("#{config_env()}.exs", "config/runtime")
for config <- "../apps/*/config/runtime/#{config_env()}.exs" |> Path.expand(__DIR__) |> Path.wildcard() do for config <- "../apps/*/config/runtime/#{config_env()}.exs" |> Path.expand(__DIR__) |> Path.wildcard() do

Loading…
Cancel
Save