From b7458900c755eca48478a877999ec59883e49ef3 Mon Sep 17 00:00:00 2001 From: varasev <33550681+varasev@users.noreply.github.com> Date: Thu, 21 Nov 2024 15:51:37 +0400 Subject: [PATCH] chore: OP modules improvements (#11073) * Add new envs for OP stack * Fix updating logs filter in OP Deposits fetcher * Add fallback envs for OP * Add socket notifier for OP batches * Update common-blockscout.env * Set infinity timeout for select db queries * Support transactions without `to` field * Add some docs * mix format * Restore OP fetcher after reorg and restart * Add specs and docs * Fix spelling * Refactoring and hardcode INDEXER_BEACON_BLOB_FETCHER_* envs * mix format * Update spelling * Small fix for Indexer.Fetcher.Optimism.Deposit * Rewrite Indexer.Fetcher.Optimism.Deposit * Update common-blockscout.env * Add todo comments for deprecated socket topic * Fix for the new websocket channel * Add todo comment --------- Co-authored-by: POA <33550681+poa@users.noreply.github.com> --- .../channels/optimism_channel.ex | 20 + .../channels/optimism_deposit_channel.ex | 22 - .../block_scout_web/channels/user_socket.ex | 1 - .../channels/user_socket_v2.ex | 3 +- .../lib/block_scout_web/notifier.ex | 16 +- .../lib/block_scout_web/notifiers/optimism.ex | 40 ++ .../block_scout_web/realtime_event_handler.ex | 7 +- .../views/api/v2/optimism_view.ex | 62 +- .../lib/ethereum_jsonrpc/transaction.ex | 175 +++--- .../lib/explorer/chain/events/publisher.ex | 5 +- .../lib/explorer/chain/events/subscriber.ex | 5 +- .../lib/explorer/chain/optimism/deposit.ex | 18 +- .../explorer/chain/optimism/dispute_game.ex | 2 +- .../explorer/chain/optimism/frame_sequence.ex | 101 +++- .../chain/optimism/frame_sequence_blob.ex | 2 +- .../explorer/chain/optimism/output_root.ex | 34 +- .../chain/optimism/transaction_batch.ex | 4 +- .../lib/explorer/chain/optimism/withdrawal.ex | 36 +- .../chain/optimism/withdrawal_event.ex | 32 + apps/indexer/lib/indexer/fetcher/optimism.ex | 123 +++- .../lib/indexer/fetcher/optimism/deposit.ex | 559 +++++------------- .../indexer/fetcher/optimism/output_root.ex | 42 +- .../fetcher/optimism/transaction_batch.ex | 228 +++++-- .../indexer/fetcher/optimism/withdrawal.ex | 105 ++-- .../fetcher/optimism/withdrawal_event.ex | 44 +- .../fetcher/rollup_l1_reorg_monitor.ex | 1 + apps/indexer/lib/indexer/helper.ex | 15 + config/runtime.exs | 12 +- cspell.json | 1 + docker-compose/envs/common-blockscout.env | 8 +- 30 files changed, 1020 insertions(+), 703 deletions(-) create mode 100644 apps/block_scout_web/lib/block_scout_web/channels/optimism_channel.ex delete mode 100644 apps/block_scout_web/lib/block_scout_web/channels/optimism_deposit_channel.ex create mode 100644 apps/block_scout_web/lib/block_scout_web/notifiers/optimism.ex diff --git a/apps/block_scout_web/lib/block_scout_web/channels/optimism_channel.ex b/apps/block_scout_web/lib/block_scout_web/channels/optimism_channel.ex new file mode 100644 index 0000000000..6fa69e4ebb --- /dev/null +++ b/apps/block_scout_web/lib/block_scout_web/channels/optimism_channel.ex @@ -0,0 +1,20 @@ +defmodule BlockScoutWeb.OptimismChannel do + @moduledoc """ + Establishes pub/sub channel for live updates of OP related events. + """ + use BlockScoutWeb, :channel + + def join("optimism:new_batch", _params, socket) do + {:ok, %{}, socket} + end + + def join("optimism:new_deposits", _params, socket) do + {:ok, %{}, socket} + end + + # todo: the `optimism_deposits:new_deposits` socket topic is for backward compatibility + # for the frontend and should be removed after the frontend starts to use the `optimism:new_deposits` + def join("optimism_deposits:new_deposits", _params, socket) do + {:ok, %{}, socket} + end +end diff --git a/apps/block_scout_web/lib/block_scout_web/channels/optimism_deposit_channel.ex b/apps/block_scout_web/lib/block_scout_web/channels/optimism_deposit_channel.ex deleted file mode 100644 index 3f2c513f9b..0000000000 --- a/apps/block_scout_web/lib/block_scout_web/channels/optimism_deposit_channel.ex +++ /dev/null @@ -1,22 +0,0 @@ -defmodule BlockScoutWeb.OptimismDepositChannel do - @moduledoc """ - Establishes pub/sub channel for live updates of Optimism deposit events. - """ - use BlockScoutWeb, :channel - - intercept(["deposits"]) - - def join("optimism_deposits:new_deposits", _params, socket) do - {:ok, %{}, socket} - end - - def handle_out( - "deposits", - %{deposits: deposits}, - %Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket - ) do - push(socket, "deposits", %{deposits: Enum.count(deposits)}) - - {:noreply, socket} - end -end diff --git a/apps/block_scout_web/lib/block_scout_web/channels/user_socket.ex b/apps/block_scout_web/lib/block_scout_web/channels/user_socket.ex index 5d51597e35..3badb9d1b7 100644 --- a/apps/block_scout_web/lib/block_scout_web/channels/user_socket.ex +++ b/apps/block_scout_web/lib/block_scout_web/channels/user_socket.ex @@ -5,7 +5,6 @@ defmodule BlockScoutWeb.UserSocket do channel("addresses:*", BlockScoutWeb.AddressChannel) channel("blocks:*", BlockScoutWeb.BlockChannel) channel("exchange_rate:*", BlockScoutWeb.ExchangeRateChannel) - channel("optimism_deposits:*", BlockScoutWeb.OptimismDepositChannel) channel("rewards:*", BlockScoutWeb.RewardChannel) channel("transactions:*", BlockScoutWeb.TransactionChannel) channel("tokens:*", BlockScoutWeb.TokenChannel) diff --git a/apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex b/apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex index 57cdf442c9..696b3b0de4 100644 --- a/apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex +++ b/apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex @@ -7,7 +7,6 @@ defmodule BlockScoutWeb.UserSocketV2 do channel("addresses:*", BlockScoutWeb.AddressChannel) channel("blocks:*", BlockScoutWeb.BlockChannel) channel("exchange_rate:*", BlockScoutWeb.ExchangeRateChannel) - channel("optimism_deposits:*", BlockScoutWeb.OptimismDepositChannel) channel("rewards:*", BlockScoutWeb.RewardChannel) channel("transactions:*", BlockScoutWeb.TransactionChannel) channel("tokens:*", BlockScoutWeb.TokenChannel) @@ -16,6 +15,8 @@ defmodule BlockScoutWeb.UserSocketV2 do case Application.compile_env(:explorer, :chain_type) do :arbitrum -> channel("arbitrum:*", BlockScoutWeb.ArbitrumChannel) + # todo: change `optimism*"` to `optimism:*` after the deprecated `optimism_deposits:new_deposits` topic is removed + :optimism -> channel("optimism*", BlockScoutWeb.OptimismChannel) _ -> nil end diff --git a/apps/block_scout_web/lib/block_scout_web/notifier.ex b/apps/block_scout_web/lib/block_scout_web/notifier.ex index dead3bb875..9ed9e00027 100644 --- a/apps/block_scout_web/lib/block_scout_web/notifier.ex +++ b/apps/block_scout_web/lib/block_scout_web/notifier.ex @@ -35,6 +35,9 @@ defmodule BlockScoutWeb.Notifier do :arbitrum -> @chain_type_specific_events ~w(new_arbitrum_batches new_messages_to_arbitrum_amount)a + :optimism -> + @chain_type_specific_events ~w(new_optimism_batches new_optimism_deposits)a + _ -> nil end @@ -275,10 +278,6 @@ defmodule BlockScoutWeb.Notifier do Endpoint.broadcast("addresses:#{to_string(address_hash)}", "changed_bytecode", %{}) end - def handle_event({:chain_event, :optimism_deposits, :realtime, deposits}) do - broadcast_optimism_deposits(deposits, "optimism_deposits:new_deposits", "deposits") - end - def handle_event({:chain_event, :smart_contract_was_verified = event, :on_demand, [address_hash]}) do broadcast_automatic_verification_events(event, address_hash) end @@ -303,6 +302,11 @@ defmodule BlockScoutWeb.Notifier do # credo:disable-for-next-line Credo.Check.Design.AliasUsage do: BlockScoutWeb.Notifiers.Arbitrum.handle_event(event) + :optimism -> + def handle_event({:chain_event, topic, _, _} = event) when topic in @chain_type_specific_events, + # credo:disable-for-next-line Credo.Check.Design.AliasUsage + do: BlockScoutWeb.Notifiers.Optimism.handle_event(event) + _ -> nil end @@ -457,10 +461,6 @@ defmodule BlockScoutWeb.Notifier do end end - defp broadcast_optimism_deposits(deposits, deposit_channel, event) do - Endpoint.broadcast(deposit_channel, event, %{deposits: deposits}) - end - defp broadcast_transactions_websocket_v2(transactions) do pending_transactions = Enum.filter(transactions, fn diff --git a/apps/block_scout_web/lib/block_scout_web/notifiers/optimism.ex b/apps/block_scout_web/lib/block_scout_web/notifiers/optimism.ex new file mode 100644 index 0000000000..7d9151c9ea --- /dev/null +++ b/apps/block_scout_web/lib/block_scout_web/notifiers/optimism.ex @@ -0,0 +1,40 @@ +defmodule BlockScoutWeb.Notifiers.Optimism do + @moduledoc """ + Module to handle and broadcast OP related events. + """ + + alias BlockScoutWeb.Endpoint + + require Logger + + def handle_event({:chain_event, :new_optimism_batches, :realtime, batches}) do + batches + |> Enum.sort_by(& &1.internal_id, :asc) + |> Enum.each(fn batch -> + Endpoint.broadcast("optimism:new_batch", "new_optimism_batch", %{ + batch: batch + }) + end) + end + + def handle_event({:chain_event, :new_optimism_deposits, :realtime, deposits}) do + deposits_count = Enum.count(deposits) + + if deposits_count > 0 do + Endpoint.broadcast("optimism:new_deposits", "new_optimism_deposits", %{ + deposits: deposits_count + }) + + # todo: the `optimism_deposits:new_deposits` socket topic is for backward compatibility + # for the frontend and should be removed after the frontend starts to use the `optimism:new_deposits` + Endpoint.broadcast("optimism_deposits:new_deposits", "deposits", %{ + deposits: deposits_count + }) + end + end + + def handle_event(event) do + Logger.warning("Unknown broadcasted event #{inspect(event)}.") + nil + end +end diff --git a/apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex b/apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex index f77ae623f8..c0fb18915f 100644 --- a/apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex +++ b/apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex @@ -19,6 +19,12 @@ defmodule BlockScoutWeb.RealtimeEventHandler do Subscriber.to(:new_messages_to_arbitrum_amount, :realtime) end + :optimism -> + def chain_type_specific_subscriptions do + Subscriber.to(:new_optimism_batches, :realtime) + Subscriber.to(:new_optimism_deposits, :realtime) + end + _ -> def chain_type_specific_subscriptions do nil @@ -32,7 +38,6 @@ defmodule BlockScoutWeb.RealtimeEventHandler do Subscriber.to(:block_rewards, :realtime) Subscriber.to(:internal_transactions, :realtime) Subscriber.to(:internal_transactions, :on_demand) - Subscriber.to(:optimism_deposits, :realtime) Subscriber.to(:token_transfers, :realtime) Subscriber.to(:addresses, :on_demand) Subscriber.to(:address_coin_balances, :on_demand) diff --git a/apps/block_scout_web/lib/block_scout_web/views/api/v2/optimism_view.ex b/apps/block_scout_web/lib/block_scout_web/views/api/v2/optimism_view.ex index f0217f268c..2c68d4425a 100644 --- a/apps/block_scout_web/lib/block_scout_web/views/api/v2/optimism_view.ex +++ b/apps/block_scout_web/lib/block_scout_web/views/api/v2/optimism_view.ex @@ -7,7 +7,7 @@ defmodule BlockScoutWeb.API.V2.OptimismView do alias Explorer.{Chain, Repo} alias Explorer.Helper, as: ExplorerHelper alias Explorer.Chain.{Block, Transaction} - alias Explorer.Chain.Optimism.{FrameSequenceBlob, Withdrawal} + alias Explorer.Chain.Optimism.{FrameSequence, FrameSequenceBlob, Withdrawal} @doc """ Function to render GET requests to `/api/v2/optimism/txn-batches` endpoint. @@ -66,19 +66,7 @@ defmodule BlockScoutWeb.API.V2.OptimismView do |> Enum.map(fn batch -> from..to//_ = batch.l2_block_range - %{ - "internal_id" => batch.id, - "l1_timestamp" => batch.l1_timestamp, - "l2_block_start" => from, - "l2_block_end" => to, - "transaction_count" => batch.transaction_count, - # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `transaction_count` property - "tx_count" => batch.transaction_count, - "l1_transaction_hashes" => batch.l1_transaction_hashes, - # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `l1_transaction_hashes` property - "l1_tx_hashes" => batch.l1_transaction_hashes, - "batch_data_container" => batch.batch_data_container - } + render_base_info_for_batch(batch.id, from, to, batch.transaction_count, batch) end) %{ @@ -272,6 +260,52 @@ defmodule BlockScoutWeb.API.V2.OptimismView do count end + # Transforms an L1 batch into a map format for HTTP response. + # + # This function processes an Optimism L1 batch and converts it into a map that + # includes basic batch information. + # + # ## Parameters + # - `internal_id`: The internal ID of the batch. + # - `l2_block_number_from`: Start L2 block number of the batch block range. + # - `l2_block_number_to`: End L2 block number of the batch block range. + # - `transaction_count`: The L2 transaction count included into the blocks of the range. + # - `batch`: Either an `Explorer.Chain.Optimism.FrameSequence` entry or a map with + # the corresponding fields. + # + # ## Returns + # - A map with detailed information about the batch formatted for use in JSON HTTP responses. + @spec render_base_info_for_batch( + non_neg_integer(), + non_neg_integer(), + non_neg_integer(), + non_neg_integer(), + FrameSequence.t() + | %{:l1_timestamp => DateTime.t(), :l1_transaction_hashes => list(), optional(any()) => any()} + ) :: %{ + :internal_id => non_neg_integer(), + :l1_timestamp => DateTime.t(), + :l2_block_start => non_neg_integer(), + :l2_block_end => non_neg_integer(), + :transaction_count => non_neg_integer(), + # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `transaction_count` property + :tx_count => non_neg_integer(), + :l1_transaction_hashes => list(), + # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `l1_transaction_hashes` property + :l1_tx_hashes => list(), + :batch_data_container => :in_blob4844 | :in_celestia | :in_calldata | nil + } + defp render_base_info_for_batch(internal_id, l2_block_number_from, l2_block_number_to, transaction_count, batch) do + FrameSequence.prepare_base_info_for_batch( + internal_id, + l2_block_number_from, + l2_block_number_to, + transaction_count, + batch.batch_data_container, + batch + ) + end + @doc """ Extends the json output for a block using Optimism frame sequence (bound with the provided L2 block) - adds info about L1 batch to the output. diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex index 8611bdb696..a584ae1d50 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex @@ -307,24 +307,31 @@ defmodule EthereumJSONRPC.Transaction do |> chain_type_fields(elixir) end - def do_elixir_to_params( - %{ - "blockHash" => block_hash, - "blockNumber" => block_number, - "from" => from_address_hash, - "gas" => gas, - "gasPrice" => gas_price, - "hash" => hash, - "input" => input, - "nonce" => nonce, - "to" => to_address_hash, - "transactionIndex" => index, - "value" => value, - "type" => type, - "maxPriorityFeePerGas" => max_priority_fee_per_gas, - "maxFeePerGas" => max_fee_per_gas - } = transaction - ) do + # Converts a map of the transaction parameters to the map with the corresponding atom parameters. + # + # ## Parameters + # - `transaction`: The input map. + # + # ## Returns + # - The resulting map. + @spec do_elixir_to_params(%{String.t() => any()}) :: %{atom() => any()} + defp do_elixir_to_params( + %{ + "blockHash" => block_hash, + "blockNumber" => block_number, + "from" => from_address_hash, + "gas" => gas, + "gasPrice" => gas_price, + "hash" => hash, + "input" => input, + "nonce" => nonce, + "transactionIndex" => index, + "value" => value, + "type" => type, + "maxPriorityFeePerGas" => max_priority_fee_per_gas, + "maxFeePerGas" => max_fee_per_gas + } = transaction + ) do result = %{ block_hash: block_hash, block_number: block_number, @@ -335,7 +342,7 @@ defmodule EthereumJSONRPC.Transaction do index: index, input: input, nonce: nonce, - to_address_hash: to_address_hash, + to_address_hash: Map.get(transaction, "to"), value: value, transaction_index: index, type: type, @@ -355,23 +362,22 @@ defmodule EthereumJSONRPC.Transaction do # txpool_content method on Erigon node returns transaction data # without gas price - def do_elixir_to_params( - %{ - "blockHash" => block_hash, - "blockNumber" => block_number, - "from" => from_address_hash, - "gas" => gas, - "hash" => hash, - "input" => input, - "nonce" => nonce, - "to" => to_address_hash, - "transactionIndex" => index, - "value" => value, - "type" => type, - "maxPriorityFeePerGas" => max_priority_fee_per_gas, - "maxFeePerGas" => max_fee_per_gas - } = transaction - ) do + defp do_elixir_to_params( + %{ + "blockHash" => block_hash, + "blockNumber" => block_number, + "from" => from_address_hash, + "gas" => gas, + "hash" => hash, + "input" => input, + "nonce" => nonce, + "transactionIndex" => index, + "value" => value, + "type" => type, + "maxPriorityFeePerGas" => max_priority_fee_per_gas, + "maxFeePerGas" => max_fee_per_gas + } = transaction + ) do result = %{ block_hash: block_hash, block_number: block_number, @@ -382,7 +388,7 @@ defmodule EthereumJSONRPC.Transaction do index: index, input: input, nonce: nonce, - to_address_hash: to_address_hash, + to_address_hash: Map.get(transaction, "to"), value: value, transaction_index: index, type: type, @@ -401,22 +407,21 @@ defmodule EthereumJSONRPC.Transaction do end # for legacy transactions without maxPriorityFeePerGas and maxFeePerGas - def do_elixir_to_params( - %{ - "blockHash" => block_hash, - "blockNumber" => block_number, - "from" => from_address_hash, - "gas" => gas, - "gasPrice" => gas_price, - "hash" => hash, - "input" => input, - "nonce" => nonce, - "to" => to_address_hash, - "transactionIndex" => index, - "value" => value, - "type" => type - } = transaction - ) do + defp do_elixir_to_params( + %{ + "blockHash" => block_hash, + "blockNumber" => block_number, + "from" => from_address_hash, + "gas" => gas, + "gasPrice" => gas_price, + "hash" => hash, + "input" => input, + "nonce" => nonce, + "transactionIndex" => index, + "value" => value, + "type" => type + } = transaction + ) do result = %{ block_hash: block_hash, block_number: block_number, @@ -427,7 +432,7 @@ defmodule EthereumJSONRPC.Transaction do index: index, input: input, nonce: nonce, - to_address_hash: to_address_hash, + to_address_hash: Map.get(transaction, "to"), value: value, transaction_index: index, type: type @@ -443,21 +448,20 @@ defmodule EthereumJSONRPC.Transaction do end # for legacy transactions without type, maxPriorityFeePerGas and maxFeePerGas - def do_elixir_to_params( - %{ - "blockHash" => block_hash, - "blockNumber" => block_number, - "from" => from_address_hash, - "gas" => gas, - "gasPrice" => gas_price, - "hash" => hash, - "input" => input, - "nonce" => nonce, - "to" => to_address_hash, - "transactionIndex" => index, - "value" => value - } = transaction - ) do + defp do_elixir_to_params( + %{ + "blockHash" => block_hash, + "blockNumber" => block_number, + "from" => from_address_hash, + "gas" => gas, + "gasPrice" => gas_price, + "hash" => hash, + "input" => input, + "nonce" => nonce, + "transactionIndex" => index, + "value" => value + } = transaction + ) do result = %{ block_hash: block_hash, block_number: block_number, @@ -468,7 +472,7 @@ defmodule EthereumJSONRPC.Transaction do index: index, input: input, nonce: nonce, - to_address_hash: to_address_hash, + to_address_hash: Map.get(transaction, "to"), value: value, transaction_index: index } @@ -483,21 +487,20 @@ defmodule EthereumJSONRPC.Transaction do end # for transactions without gasPrice, maxPriorityFeePerGas and maxFeePerGas - def do_elixir_to_params( - %{ - "blockHash" => block_hash, - "blockNumber" => block_number, - "from" => from_address_hash, - "gas" => gas, - "hash" => hash, - "input" => input, - "nonce" => nonce, - "to" => to_address_hash, - "transactionIndex" => index, - "type" => type, - "value" => value - } = transaction - ) do + defp do_elixir_to_params( + %{ + "blockHash" => block_hash, + "blockNumber" => block_number, + "from" => from_address_hash, + "gas" => gas, + "hash" => hash, + "input" => input, + "nonce" => nonce, + "transactionIndex" => index, + "type" => type, + "value" => value + } = transaction + ) do result = %{ block_hash: block_hash, block_number: block_number, @@ -508,7 +511,7 @@ defmodule EthereumJSONRPC.Transaction do index: index, input: input, nonce: nonce, - to_address_hash: to_address_hash, + to_address_hash: Map.get(transaction, "to"), value: value, transaction_index: index, type: type diff --git a/apps/explorer/lib/explorer/chain/events/publisher.ex b/apps/explorer/lib/explorer/chain/events/publisher.ex index adea72f104..a0048248cf 100644 --- a/apps/explorer/lib/explorer/chain/events/publisher.ex +++ b/apps/explorer/lib/explorer/chain/events/publisher.ex @@ -5,7 +5,7 @@ defmodule Explorer.Chain.Events.Publisher do @common_allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions - last_block_number optimism_deposits token_transfers transactions contract_verification_result + last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a @@ -14,6 +14,9 @@ defmodule Explorer.Chain.Events.Publisher do :arbitrum -> @chain_type_specific_allowed_events ~w(new_arbitrum_batches new_messages_to_arbitrum_amount)a + :optimism -> + @chain_type_specific_allowed_events ~w(new_optimism_batches new_optimism_deposits)a + _ -> @chain_type_specific_allowed_events ~w()a end diff --git a/apps/explorer/lib/explorer/chain/events/subscriber.ex b/apps/explorer/lib/explorer/chain/events/subscriber.ex index 3e76f65796..741422fac3 100644 --- a/apps/explorer/lib/explorer/chain/events/subscriber.ex +++ b/apps/explorer/lib/explorer/chain/events/subscriber.ex @@ -5,7 +5,7 @@ defmodule Explorer.Chain.Events.Subscriber do @common_allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions - last_block_number optimism_deposits token_transfers transactions contract_verification_result + last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a @@ -14,6 +14,9 @@ defmodule Explorer.Chain.Events.Subscriber do :arbitrum -> @chain_type_specific_allowed_broadcast_events ~w(new_arbitrum_batches new_messages_to_arbitrum_amount)a + :optimism -> + @chain_type_specific_allowed_broadcast_events ~w(new_optimism_batches new_optimism_deposits)a + _ -> @chain_type_specific_allowed_broadcast_events ~w()a end diff --git a/apps/explorer/lib/explorer/chain/optimism/deposit.ex b/apps/explorer/lib/explorer/chain/optimism/deposit.ex index 36543fb59d..021c4a11d4 100644 --- a/apps/explorer/lib/explorer/chain/optimism/deposit.ex +++ b/apps/explorer/lib/explorer/chain/optimism/deposit.ex @@ -52,6 +52,22 @@ defmodule Explorer.Chain.Optimism.Deposit do ) end + @doc """ + Forms a query to remove all Deposits with the specified L1 block number. + Used by the `Indexer.Fetcher.Optimism.Deposit` module. + + ## Parameters + - `l1_block_number`: The L1 block number for which the Deposits should be removed + from the `op_deposits` database table. + + ## Returns + - A query which can be used by the `delete_all` function. + """ + @spec remove_deposits_query(non_neg_integer()) :: Ecto.Queryable.t() + def remove_deposits_query(l1_block_number) do + from(d in __MODULE__, where: d.l1_block_number == ^l1_block_number) + end + @doc """ Lists `t:Explorer.Chain.Optimism.Deposit.t/0`'s' in descending order based on l1_block_number and l2_transaction_hash. @@ -74,7 +90,7 @@ defmodule Explorer.Chain.Optimism.Deposit do |> join_association(:l2_transaction, :required) |> page_deposits(paging_options) |> limit(^paging_options.page_size) - |> select_repo(options).all() + |> select_repo(options).all(timeout: :infinity) end end diff --git a/apps/explorer/lib/explorer/chain/optimism/dispute_game.ex b/apps/explorer/lib/explorer/chain/optimism/dispute_game.ex index 1ef62c4cde..4ad8318b9f 100644 --- a/apps/explorer/lib/explorer/chain/optimism/dispute_game.ex +++ b/apps/explorer/lib/explorer/chain/optimism/dispute_game.ex @@ -78,7 +78,7 @@ defmodule Explorer.Chain.Optimism.DisputeGame do base_query |> page_dispute_games(paging_options) |> limit(^paging_options.page_size) - |> select_repo(options).all() + |> select_repo(options).all(timeout: :infinity) end defp page_dispute_games(query, %PagingOptions{key: nil}), do: query diff --git a/apps/explorer/lib/explorer/chain/optimism/frame_sequence.ex b/apps/explorer/lib/explorer/chain/optimism/frame_sequence.ex index 66dbe4cd0d..3ec97e2b7a 100644 --- a/apps/explorer/lib/explorer/chain/optimism/frame_sequence.ex +++ b/apps/explorer/lib/explorer/chain/optimism/frame_sequence.ex @@ -88,7 +88,8 @@ defmodule Explorer.Chain.Optimism.FrameSequence do ## Parameters - `internal_id`: Batch'es internal id. - - `options`: A keyword list of options that may include whether to use a replica database. + - `options`: A keyword list of options that may include whether to use a replica database + and/or whether to include blobs (true by default). ## Returns - A map with info about L1 batch having the specified id. @@ -108,30 +109,94 @@ defmodule Explorer.Chain.Optimism.FrameSequence do l2_block_number_to = TransactionBatch.edge_l2_block_number(internal_id, :max) transaction_count = Transaction.transaction_count_for_block_range(l2_block_number_from..l2_block_number_to) - {batch_data_container, blobs} = FrameSequenceBlob.list(internal_id, options) - - result = %{ - "internal_id" => internal_id, - "l1_timestamp" => batch.l1_timestamp, - "l2_block_start" => l2_block_number_from, - "l2_block_end" => l2_block_number_to, - "transaction_count" => transaction_count, - # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `transaction_count` property - "tx_count" => transaction_count, - "l1_transaction_hashes" => batch.l1_transaction_hashes, - # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `l1_transaction_hashes` property - "l1_tx_hashes" => batch.l1_transaction_hashes, - "batch_data_container" => batch_data_container - } + {batch_data_container, blobs} = + if Keyword.get(options, :include_blobs?, true) do + FrameSequenceBlob.list(internal_id, options) + else + {nil, []} + end + + result = + prepare_base_info_for_batch( + internal_id, + l2_block_number_from, + l2_block_number_to, + transaction_count, + batch_data_container, + batch + ) if Enum.empty?(blobs) do result else - Map.put(result, "blobs", blobs) + Map.put(result, :blobs, blobs) end end end + @doc """ + Transforms an L1 batch into a map format for HTTP response. + + This function processes an Optimism L1 batch and converts it into a map that + includes basic batch information. + + ## Parameters + - `internal_id`: The internal ID of the batch. + - `l2_block_number_from`: Start L2 block number of the batch block range. + - `l2_block_number_to`: End L2 block number of the batch block range. + - `transaction_count`: The L2 transaction count included into the blocks of the range. + - `batch_data_container`: Designates where the batch info is stored: :in_blob4844, :in_celestia, or :in_calldata. + Can be `nil` if the container is unknown. + - `batch`: Either an `Explorer.Chain.Optimism.FrameSequence` entry or a map with + the corresponding fields. + + ## Returns + - A map with detailed information about the batch formatted for use in JSON HTTP responses. + """ + @spec prepare_base_info_for_batch( + non_neg_integer(), + non_neg_integer(), + non_neg_integer(), + non_neg_integer(), + :in_blob4844 | :in_celestia | :in_calldata | nil, + __MODULE__.t() + | %{:l1_timestamp => DateTime.t(), :l1_transaction_hashes => list(), optional(any()) => any()} + ) :: %{ + :internal_id => non_neg_integer(), + :l1_timestamp => DateTime.t(), + :l2_block_start => non_neg_integer(), + :l2_block_end => non_neg_integer(), + :transaction_count => non_neg_integer(), + # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `transaction_count` property + :tx_count => non_neg_integer(), + :l1_transaction_hashes => list(), + # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `l1_transaction_hashes` property + :l1_tx_hashes => list(), + :batch_data_container => :in_blob4844 | :in_celestia | :in_calldata | nil + } + def prepare_base_info_for_batch( + internal_id, + l2_block_number_from, + l2_block_number_to, + transaction_count, + batch_data_container, + batch + ) do + %{ + :internal_id => internal_id, + :l1_timestamp => batch.l1_timestamp, + :l2_block_start => l2_block_number_from, + :l2_block_end => l2_block_number_to, + :transaction_count => transaction_count, + # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `transaction_count` property + :tx_count => transaction_count, + :l1_transaction_hashes => batch.l1_transaction_hashes, + # todo: keep next line for compatibility with frontend and remove when new frontend is bound to `l1_transaction_hashes` property + :l1_tx_hashes => batch.l1_transaction_hashes, + :batch_data_container => batch_data_container + } + end + @doc """ Lists `t:Explorer.Chain.Optimism.FrameSequence.t/0`'s' in descending order based on id. @@ -167,7 +232,7 @@ defmodule Explorer.Chain.Optimism.FrameSequence do base_query |> page_frame_sequences(paging_options) |> limit(^paging_options.page_size) - |> select_repo(options).all() + |> select_repo(options).all(timeout: :infinity) end end diff --git a/apps/explorer/lib/explorer/chain/optimism/frame_sequence_blob.ex b/apps/explorer/lib/explorer/chain/optimism/frame_sequence_blob.ex index 66d65984fa..001c3c2f31 100644 --- a/apps/explorer/lib/explorer/chain/optimism/frame_sequence_blob.ex +++ b/apps/explorer/lib/explorer/chain/optimism/frame_sequence_blob.ex @@ -73,7 +73,7 @@ defmodule Explorer.Chain.Optimism.FrameSequenceBlob do ) query - |> repo.all() + |> repo.all(timeout: :infinity) |> filter_blobs_by_type() end diff --git a/apps/explorer/lib/explorer/chain/optimism/output_root.ex b/apps/explorer/lib/explorer/chain/optimism/output_root.ex index e32b4a7f35..0d68cb7c3b 100644 --- a/apps/explorer/lib/explorer/chain/optimism/output_root.ex +++ b/apps/explorer/lib/explorer/chain/optimism/output_root.ex @@ -58,7 +58,7 @@ defmodule Explorer.Chain.Optimism.OutputRoot do base_query |> page_output_roots(paging_options) |> limit(^paging_options.page_size) - |> select_repo(options).all() + |> select_repo(options).all(timeout: :infinity) end end @@ -67,4 +67,36 @@ defmodule Explorer.Chain.Optimism.OutputRoot do defp page_output_roots(query, %PagingOptions{key: {index}}) do from(r in query, where: r.l2_output_index < ^index) end + + @doc """ + Forms a query to find the last Output Root's L1 block number and transaction hash. + Used by the `Indexer.Fetcher.Optimism.OutputRoot` module. + + ## Returns + - A query which can be used by the `Repo.one` function. + """ + @spec last_root_l1_block_number_query() :: Ecto.Queryable.t() + def last_root_l1_block_number_query do + from(root in __MODULE__, + select: {root.l1_block_number, root.l1_transaction_hash}, + order_by: [desc: root.l2_output_index], + limit: 1 + ) + end + + @doc """ + Forms a query to remove all Output Roots related to the specified L1 block number. + Used by the `Indexer.Fetcher.Optimism.OutputRoot` module. + + ## Parameters + - `l1_block_number`: The L1 block number for which the Output Roots should be removed + from the `op_output_roots` database table. + + ## Returns + - A query which can be used by the `delete_all` function. + """ + @spec remove_roots_query(non_neg_integer()) :: Ecto.Queryable.t() + def remove_roots_query(l1_block_number) do + from(root in __MODULE__, where: root.l1_block_number == ^l1_block_number) + end end diff --git a/apps/explorer/lib/explorer/chain/optimism/transaction_batch.ex b/apps/explorer/lib/explorer/chain/optimism/transaction_batch.ex index 2ce8032da2..a1853e0116 100644 --- a/apps/explorer/lib/explorer/chain/optimism/transaction_batch.ex +++ b/apps/explorer/lib/explorer/chain/optimism/transaction_batch.ex @@ -133,7 +133,7 @@ defmodule Explorer.Chain.Optimism.TransactionBatch do |> join_association(:frame_sequence, :required) |> page_transaction_batches(paging_options) |> limit(^paging_options.page_size) - |> select_repo(options).all() + |> select_repo(options).all(timeout: :infinity) end end @@ -174,7 +174,7 @@ defmodule Explorer.Chain.Optimism.TransactionBatch do |> limit(^paging_options.page_size) |> order_by(desc: :number) |> join_associations(necessity_by_association) - |> select_repo(options).all() + |> select_repo(options).all(timeout: :infinity) end defp page_blocks(query, %PagingOptions{key: nil}), do: query diff --git a/apps/explorer/lib/explorer/chain/optimism/withdrawal.ex b/apps/explorer/lib/explorer/chain/optimism/withdrawal.ex index 011273f092..917e14e4a1 100644 --- a/apps/explorer/lib/explorer/chain/optimism/withdrawal.ex +++ b/apps/explorer/lib/explorer/chain/optimism/withdrawal.ex @@ -84,7 +84,7 @@ defmodule Explorer.Chain.Optimism.Withdrawal do base_query |> page_optimism_withdrawals(paging_options) |> limit(^paging_options.page_size) - |> select_repo(options).all() + |> select_repo(options).all(timeout: :infinity) end end @@ -94,6 +94,38 @@ defmodule Explorer.Chain.Optimism.Withdrawal do from(w in query, where: w.msg_nonce < ^nonce) end + @doc """ + Forms a query to find the last Withdrawal's L2 block number and transaction hash. + Used by the `Indexer.Fetcher.Optimism.Withdrawal` module. + + ## Returns + - A query which can be used by the `Repo.one` function. + """ + @spec last_withdrawal_l2_block_number_query() :: Ecto.Queryable.t() + def last_withdrawal_l2_block_number_query do + from(w in __MODULE__, + select: {w.l2_block_number, w.l2_transaction_hash}, + order_by: [desc: w.msg_nonce], + limit: 1 + ) + end + + @doc """ + Forms a query to remove all Withdrawals related to the specified L2 block number. + Used by the `Indexer.Fetcher.Optimism.Withdrawal` module. + + ## Parameters + - `l2_block_number`: The L2 block number for which the Withdrawals should be removed + from the `op_withdrawals` database table. + + ## Returns + - A query which can be used by the `delete_all` function. + """ + @spec remove_withdrawals_query(non_neg_integer()) :: Ecto.Queryable.t() + def remove_withdrawals_query(l2_block_number) do + from(w in __MODULE__, where: w.l2_block_number == ^l2_block_number) + end + @doc """ Gets withdrawal statuses for Optimism Withdrawal transaction. For each withdrawal associated with this transaction, @@ -117,7 +149,7 @@ defmodule Explorer.Chain.Optimism.Withdrawal do ) query - |> Repo.replica().all() + |> Repo.replica().all(timeout: :infinity) |> Enum.map(fn w -> msg_nonce = Bitwise.band( diff --git a/apps/explorer/lib/explorer/chain/optimism/withdrawal_event.ex b/apps/explorer/lib/explorer/chain/optimism/withdrawal_event.ex index bac79ac951..aebc4ad4eb 100644 --- a/apps/explorer/lib/explorer/chain/optimism/withdrawal_event.ex +++ b/apps/explorer/lib/explorer/chain/optimism/withdrawal_event.ex @@ -34,4 +34,36 @@ defmodule Explorer.Chain.Optimism.WithdrawalEvent do |> cast(attrs, @required_attrs ++ @optional_attrs) |> validate_required(@required_attrs) end + + @doc """ + Forms a query to find the last Withdrawal L1 event's block number and transaction hash. + Used by the `Indexer.Fetcher.Optimism.WithdrawalEvent` module. + + ## Returns + - A query which can be used by the `Repo.one` function. + """ + @spec last_event_l1_block_number_query() :: Ecto.Queryable.t() + def last_event_l1_block_number_query do + from(event in __MODULE__, + select: {event.l1_block_number, event.l1_transaction_hash}, + order_by: [desc: event.l1_timestamp], + limit: 1 + ) + end + + @doc """ + Forms a query to remove all Withdrawal L1 events related to the specified L1 block number. + Used by the `Indexer.Fetcher.Optimism.WithdrawalEvent` module. + + ## Parameters + - `l1_block_number`: The L1 block number for which the events should be removed + from the `op_withdrawal_events` database table. + + ## Returns + - A query which can be used by the `delete_all` function. + """ + @spec remove_events_query(non_neg_integer()) :: Ecto.Queryable.t() + def remove_events_query(l1_block_number) do + from(event in __MODULE__, where: event.l1_block_number == ^l1_block_number) + end end diff --git a/apps/indexer/lib/indexer/fetcher/optimism.ex b/apps/indexer/lib/indexer/fetcher/optimism.ex index 9e4cc382c8..11ba4aa5f1 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism.ex @@ -19,11 +19,11 @@ defmodule Indexer.Fetcher.Optimism do alias EthereumJSONRPC.Block.ByNumber alias EthereumJSONRPC.Contract + alias Explorer.Repo alias Indexer.Helper @fetcher_name :optimism @block_check_interval_range_size 100 - @eth_get_logs_range_size 250 @finite_retries_number 3 def child_spec(start_link_arguments) do @@ -183,10 +183,6 @@ defmodule Indexer.Fetcher.Optimism do Helper.repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries) end - def get_logs_range_size do - @eth_get_logs_range_size - end - @doc """ Forms JSON RPC named arguments for the given RPC URL. """ @@ -207,24 +203,34 @@ defmodule Indexer.Fetcher.Optimism do end @doc """ - Does initializations for `Indexer.Fetcher.Optimism.WithdrawalEvent` or `Indexer.Fetcher.Optimism.OutputRoot` module. - Contains common code used by both modules. + Does initializations for `Indexer.Fetcher.Optimism.WithdrawalEvent`, `Indexer.Fetcher.Optimism.OutputRoot`, or + `Indexer.Fetcher.Optimism.Deposit` module. Contains common code used by these modules. ## Parameters - - `output_oracle`: An address of L2OutputOracle contract on L1. Must be `nil` if the `caller` is not `OutputRoot` module. + - `output_oracle`: An address of L2OutputOracle contract on L1. + Must be `nil` if the `caller` is not `Indexer.Fetcher.Optimism.OutputRoot` module. - `caller`: The module that called this function. ## Returns - - A map for the `handle_continue` handler of the calling module. + - A resulting map for the `handle_continue` handler of the calling module. """ @spec init_continue(binary() | nil, module()) :: {:noreply, map()} | {:stop, :normal, %{}} def init_continue(output_oracle, caller) - when caller in [Indexer.Fetcher.Optimism.WithdrawalEvent, Indexer.Fetcher.Optimism.OutputRoot] do + when caller in [ + Indexer.Fetcher.Optimism.Deposit, + Indexer.Fetcher.Optimism.WithdrawalEvent, + Indexer.Fetcher.Optimism.OutputRoot + ] do {contract_name, table_name, start_block_note} = - if caller == Indexer.Fetcher.Optimism.WithdrawalEvent do - {"Optimism Portal", "op_withdrawal_events", "Withdrawals L1"} - else - {"Output Oracle", "op_output_roots", "Output Roots"} + case caller do + Indexer.Fetcher.Optimism.Deposit -> + {"Optimism Portal", "op_deposits", "Deposits"} + + Indexer.Fetcher.Optimism.WithdrawalEvent -> + {"Optimism Portal", "op_withdrawal_events", "Withdrawals L1"} + + _ -> + {"Output Oracle", "op_output_roots", "Output Roots"} end optimism_env = Application.get_all_env(:indexer)[__MODULE__] @@ -238,21 +244,20 @@ defmodule Indexer.Fetcher.Optimism do json_rpc_named_arguments = json_rpc_named_arguments(optimism_l1_rpc), {optimism_portal, start_block_l1} <- read_system_config(system_config, json_rpc_named_arguments), {:contract_is_valid, true} <- - {:contract_is_valid, - caller == Indexer.Fetcher.Optimism.WithdrawalEvent or Helper.address_correct?(output_oracle)}, + {:contract_is_valid, caller != Indexer.Fetcher.Optimism.OutputRoot or Helper.address_correct?(output_oracle)}, true <- start_block_l1 > 0, - {last_l1_block_number, last_l1_transaction_hash} <- caller.get_last_l1_item(), + {last_l1_block_number, last_l1_transaction_hash, last_l1_transaction} <- + caller.get_last_l1_item(json_rpc_named_arguments), {:start_block_l1_valid, true} <- {:start_block_l1_valid, start_block_l1 <= last_l1_block_number || last_l1_block_number == 0}, - {:ok, last_l1_transaction} <- get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments), {:l1_transaction_not_found, false} <- {:l1_transaction_not_found, !is_nil(last_l1_transaction_hash) && is_nil(last_l1_transaction)}, {:ok, block_check_interval, last_safe_block} <- get_block_check_interval(json_rpc_named_arguments) do contract_address = - if caller == Indexer.Fetcher.Optimism.WithdrawalEvent do - optimism_portal - else + if caller == Indexer.Fetcher.Optimism.OutputRoot do output_oracle + else + optimism_portal end start_block = max(start_block_l1, last_l1_block_number) @@ -266,6 +271,7 @@ defmodule Indexer.Fetcher.Optimism do start_block: start_block, end_block: last_safe_block, json_rpc_named_arguments: json_rpc_named_arguments, + eth_get_logs_range_size: optimism_env[:l1_eth_get_logs_range_size], stop: false }} else @@ -307,7 +313,7 @@ defmodule Indexer.Fetcher.Optimism do {:stop, :normal, %{}} nil -> - Logger.error("Cannot read SystemConfig contract.") + Logger.error("Cannot read SystemConfig contract and fallback envs are not correctly defined.") {:stop, :normal, %{}} _ -> @@ -325,6 +331,9 @@ defmodule Indexer.Fetcher.Optimism do Gets `OptimismPortal` contract address from the `SystemConfig` contract and the number of a start block (from which all Optimism fetchers should start). + If SystemConfig has obsolete implementation, the values are fallen back from the corresponding + env variables (INDEXER_OPTIMISM_L1_PORTAL_CONTRACT and INDEXER_OPTIMISM_L1_START_BLOCK). + ## Parameters - `contract_address`: An address of SystemConfig contract. - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection. @@ -348,7 +357,7 @@ defmodule Indexer.Fetcher.Optimism do &json_rpc/2, [requests, json_rpc_named_arguments], error_message, - Helper.infinite_retries_number() + Helper.finite_retries_number() ) do {:ok, responses} -> "0x000000000000000000000000" <> optimism_portal = Enum.at(responses, 0).result @@ -356,7 +365,11 @@ defmodule Indexer.Fetcher.Optimism do {"0x" <> optimism_portal, start_block} _ -> - nil + env = Application.get_all_env(:indexer)[__MODULE__] + + if Helper.address_correct?(env[:portal]) and not is_nil(env[:start_block_l1]) do + {env[:portal], env[:start_block_l1]} + end end end @@ -380,4 +393,66 @@ defmodule Indexer.Fetcher.Optimism do optimism_config = Application.get_all_env(:indexer)[__MODULE__] not is_nil(optimism_config[:optimism_l1_system_config]) end + + @doc """ + Determines the last saved block number, the last saved transaction hash, and the transaction info for + a certain entity defined by the passed functions. + + Used by the OP fetcher modules to start fetching from a correct block number + after reorg has occurred. + + ## Parameters + - `layer`: Just for logging purposes. Can be `:L1` or `:L2` depending on the layer of the entity. + - `last_block_number_query_fun`: A function which will be called to form database query + to get the latest item in the corresponding database table. + - `remove_query_fun`: A function which will be called to form database query to remove the entity rows + created due to reorg from the corresponding table. + - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection. + Used to get transaction info by its hash from the RPC node. + Can be `nil` if the transaction info is not needed. + + ## Returns + - A tuple `{last_block_number, last_transaction_hash, last_transaction}` where + `last_block_number` is the last block number found in the corresponding table (0 if not found), + `last_transaction_hash` is the last transaction hash found in the corresponding table (nil if not found), + `last_transaction` is the transaction info got from the RPC (nil if not found or not needed). + - A tuple `{:error, message}` in case the `eth_getTransactionByHash` RPC request failed. + """ + @spec get_last_item(:L1 | :L2, function(), function(), EthereumJSONRPC.json_rpc_named_arguments() | nil) :: + {non_neg_integer(), binary() | nil, map() | nil} | {:error, any()} + def get_last_item(layer, last_block_number_query_fun, remove_query_fun, json_rpc_named_arguments \\ nil) + when is_function(last_block_number_query_fun, 0) and is_function(remove_query_fun, 1) do + {last_block_number, last_transaction_hash} = + last_block_number_query_fun.() + |> Repo.one() + |> Kernel.||({0, nil}) + + with {:empty_hash, false} <- {:empty_hash, is_nil(last_transaction_hash)}, + {:empty_json_rpc_named_arguments, false} <- + {:empty_json_rpc_named_arguments, is_nil(json_rpc_named_arguments)}, + {:ok, last_transaction} <- get_transaction_by_hash(last_transaction_hash, json_rpc_named_arguments), + {:empty_transaction, false} <- {:empty_transaction, is_nil(last_transaction)} do + {last_block_number, last_transaction_hash, last_transaction} + else + {:empty_hash, true} -> + {last_block_number, nil, nil} + + {:empty_json_rpc_named_arguments, true} -> + {last_block_number, last_transaction_hash, nil} + + {:error, _} = error -> + error + + {:empty_transaction, true} -> + Logger.error( + "Cannot find last #{layer} transaction from RPC by its hash (#{last_transaction_hash}). Probably, there was a reorg on #{layer} chain. Trying to check preceding transaction..." + ) + + last_block_number + |> remove_query_fun.() + |> Repo.delete_all() + + get_last_item(layer, last_block_number_query_fun, remove_query_fun, json_rpc_named_arguments) + end + end end diff --git a/apps/indexer/lib/indexer/fetcher/optimism/deposit.ex b/apps/indexer/lib/indexer/fetcher/optimism/deposit.ex index 8e7155d5a0..5adb4a0e0b 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism/deposit.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism/deposit.ex @@ -10,37 +10,23 @@ defmodule Indexer.Fetcher.Optimism.Deposit do import Ecto.Query - import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1, request: 1] - import Explorer.Helper, only: [decode_data: 2, parse_integer: 1] + import EthereumJSONRPC, only: [quantity_to_integer: 1] + import Explorer.Helper, only: [decode_data: 2] alias EthereumJSONRPC.Block.ByNumber alias EthereumJSONRPC.Blocks alias Explorer.{Chain, Repo} alias Explorer.Chain.Events.Publisher alias Explorer.Chain.Optimism.Deposit + alias Explorer.Chain.RollupReorgMonitorQueue alias Indexer.Fetcher.Optimism alias Indexer.Helper - defstruct [ - :batch_size, - :start_block, - :from_block, - :safe_block, - :optimism_portal, - :json_rpc_named_arguments, - :transaction_type, - mode: :catch_up, - filter_id: nil, - check_interval: nil - ] - # 32-byte signature of the event TransactionDeposited(address indexed from, address indexed to, uint256 indexed version, bytes opaqueData) @transaction_deposited_event "0xb3813568d9991fc951961fcb4c784893574240a28925604d09fc577c55bb7c32" - @retry_interval_minutes 3 - @retry_interval :timer.minutes(@retry_interval_minutes) - @address_prefix "0x000000000000000000000000" - @batch_size 500 + @fetcher_name :optimism_deposits + @address_prefix "0x000000000000000000000000" def child_spec(start_link_arguments) do spec = %{ @@ -63,266 +49,100 @@ defmodule Indexer.Fetcher.Optimism.Deposit do end @impl GenServer - def handle_continue(:ok, state) do + def handle_continue(:ok, _state) do Logger.metadata(fetcher: @fetcher_name) - env = Application.get_all_env(:indexer)[__MODULE__] - optimism_env = Application.get_all_env(:indexer)[Optimism] - system_config = optimism_env[:optimism_l1_system_config] - optimism_l1_rpc = optimism_env[:optimism_l1_rpc] - - with {:system_config_valid, true} <- {:system_config_valid, Helper.address_correct?(system_config)}, - {:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(optimism_l1_rpc)}, - json_rpc_named_arguments = Optimism.json_rpc_named_arguments(optimism_l1_rpc), - {optimism_portal, start_block_l1} <- Optimism.read_system_config(system_config, json_rpc_named_arguments), - true <- start_block_l1 > 0, - {last_l1_block_number, last_l1_transaction_hash} <- get_last_l1_item(), - {:ok, last_l1_transaction} <- - Optimism.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments), - {:l1_transaction_not_found, false} <- - {:l1_transaction_not_found, !is_nil(last_l1_transaction_hash) && is_nil(last_l1_transaction)}, - {safe_block, _} = Helper.get_safe_block(json_rpc_named_arguments), - {:start_block_l1_valid, true} <- - {:start_block_l1_valid, - (start_block_l1 <= last_l1_block_number || last_l1_block_number == 0) && start_block_l1 <= safe_block} do - start_block = max(start_block_l1, last_l1_block_number) - - if start_block > safe_block do - Process.send(self(), :switch_to_realtime, []) - else - Process.send(self(), :fetch, []) - end - - {:noreply, - %__MODULE__{ - start_block: start_block, - from_block: start_block, - safe_block: safe_block, - optimism_portal: optimism_portal, - json_rpc_named_arguments: json_rpc_named_arguments, - batch_size: parse_integer(env[:batch_size]) || @batch_size, - transaction_type: env[:transaction_type] - }} - else - {:start_block_l1_valid, false} -> - Logger.error("Invalid L1 Start Block value. Please, check the value and op_deposits table.") - {:stop, :normal, state} - - {:rpc_l1_undefined, true} -> - Logger.error("L1 RPC URL is not defined.") - {:stop, :normal, state} - - {:system_config_valid, false} -> - Logger.error("SystemConfig contract address is invalid or undefined.") - {:stop, :normal, state} - - {:error, error_data} -> - Logger.error("Cannot get last L1 transaction from RPC by its hash due to the RPC error: #{inspect(error_data)}") - - {:stop, :normal, state} - - {:l1_transaction_not_found, true} -> - Logger.error( - "Cannot find last L1 transaction from RPC by its hash. Probably, there was a reorg on L1 chain. Please, check op_deposits table." - ) - - {:stop, :normal, state} - - nil -> - Logger.error("Cannot read SystemConfig contract.") - {:stop, :normal, state} - - _ -> - Logger.error("Optimism deposits L1 Start Block is invalid or zero.") - {:stop, :normal, state} - end + Optimism.init_continue(nil, __MODULE__) end @impl GenServer def handle_info( - :fetch, - %__MODULE__{ + :continue, + %{ + contract_address: optimism_portal, + block_check_interval: block_check_interval, start_block: start_block, - from_block: from_block, - safe_block: safe_block, - optimism_portal: optimism_portal, + end_block: end_block, json_rpc_named_arguments: json_rpc_named_arguments, - mode: :catch_up, - batch_size: batch_size, - transaction_type: transaction_type + eth_get_logs_range_size: eth_get_logs_range_size } = state ) do - to_block = min(from_block + batch_size, safe_block) + # credo:disable-for-next-line + time_before = Timex.now() + + transaction_type = Application.get_all_env(:indexer)[__MODULE__][:transaction_type] + + chunks_number = ceil((end_block - start_block + 1) / eth_get_logs_range_size) + chunk_range = Range.new(0, max(chunks_number - 1, 0), 1) + + last_written_block = + chunk_range + |> Enum.reduce_while(start_block - 1, fn current_chunk, _ -> + chunk_start = start_block + eth_get_logs_range_size * current_chunk + chunk_end = min(chunk_start + eth_get_logs_range_size - 1, end_block) + + if chunk_end >= chunk_start do + Helper.log_blocks_chunk_handling(chunk_start, chunk_end, start_block, end_block, nil, :L1) - with {:logs, {:ok, logs}} <- - {:logs, + {:ok, result} = Optimism.get_logs( - from_block, - to_block, + chunk_start, + chunk_end, optimism_portal, @transaction_deposited_event, json_rpc_named_arguments, - 3 - )}, - _ = Helper.log_blocks_chunk_handling(from_block, to_block, start_block, safe_block, nil, :L1), - deposits = events_to_deposits(logs, transaction_type, json_rpc_named_arguments), - {:import, {:ok, _imported}} <- - {:import, Chain.import(%{optimism_deposits: %{params: deposits}, timeout: :infinity})} do - Publisher.broadcast(%{optimism_deposits: deposits}, :realtime) - - Helper.log_blocks_chunk_handling( - from_block, - to_block, - start_block, - safe_block, - "#{Enum.count(deposits)} TransactionDeposited event(s)", - :L1 - ) + Helper.infinite_retries_number() + ) + + deposit_events = prepare_events(result, transaction_type, json_rpc_named_arguments) + + {:ok, _} = + Chain.import(%{ + optimism_deposits: %{params: deposit_events}, + timeout: :infinity + }) + + Publisher.broadcast(%{new_optimism_deposits: deposit_events}, :realtime) + + Helper.log_blocks_chunk_handling( + chunk_start, + chunk_end, + start_block, + end_block, + "#{Enum.count(deposit_events)} TransactionDeposited event(s)", + :L1 + ) + end + + reorg_block = RollupReorgMonitorQueue.reorg_block_pop(__MODULE__) - if to_block == safe_block do - Logger.info("Fetched all L1 blocks (#{start_block}..#{safe_block}), switching to realtime mode.") - Process.send(self(), :switch_to_realtime, []) - {:noreply, state} + if !is_nil(reorg_block) && reorg_block > 0 do + {deleted_count, _} = Repo.delete_all(from(d in Deposit, where: d.l1_block_number >= ^reorg_block)) + + log_deleted_rows_count(reorg_block, deleted_count) + + {:halt, if(reorg_block <= chunk_end, do: reorg_block - 1, else: chunk_end)} + else + {:cont, chunk_end} + end + end) + + new_start_block = last_written_block + 1 + + {:ok, new_end_block} = + Optimism.get_block_number_by_tag("latest", json_rpc_named_arguments, Helper.infinite_retries_number()) + + delay = + if new_end_block == last_written_block do + # there is no new block, so wait for some time to let the chain issue the new block + max(block_check_interval - Timex.diff(Timex.now(), time_before, :milliseconds), 0) else - Process.send(self(), :fetch, []) - {:noreply, %{state | from_block: to_block + 1}} + 0 end - else - {:logs, {:error, _error}} -> - Logger.error("Cannot fetch logs. Retrying in #{@retry_interval_minutes} minutes...") - Process.send_after(self(), :fetch, @retry_interval) - {:noreply, state} - - {:import, {:error, error}} -> - Logger.error("Cannot import logs due to #{inspect(error)}. Retrying in #{@retry_interval_minutes} minutes...") - Process.send_after(self(), :fetch, @retry_interval) - {:noreply, state} - - {:import, {:error, step, failed_value, _changes_so_far}} -> - Logger.error( - "Failed to import #{inspect(failed_value)} during #{step}. Retrying in #{@retry_interval_minutes} minutes..." - ) - - Process.send_after(self(), :fetch, @retry_interval) - {:noreply, state} - end - end - @impl GenServer - def handle_info( - :switch_to_realtime, - %__MODULE__{ - from_block: from_block, - safe_block: safe_block, - optimism_portal: optimism_portal, - json_rpc_named_arguments: json_rpc_named_arguments, - batch_size: batch_size, - mode: :catch_up, - transaction_type: transaction_type - } = state - ) do - with {:check_interval, {:ok, check_interval, new_safe}} <- - {:check_interval, Optimism.get_block_check_interval(json_rpc_named_arguments)}, - {:catch_up, _, false} <- {:catch_up, new_safe, new_safe - safe_block + 1 > batch_size}, - {:logs, {:ok, logs}} <- - {:logs, - Optimism.get_logs( - max(safe_block, from_block), - "latest", - optimism_portal, - @transaction_deposited_event, - json_rpc_named_arguments, - 3 - )}, - {:ok, filter_id} <- - get_new_filter( - max(safe_block, from_block), - "latest", - optimism_portal, - @transaction_deposited_event, - json_rpc_named_arguments - ) do - handle_new_logs(logs, transaction_type, json_rpc_named_arguments) - Process.send(self(), :fetch, []) - {:noreply, %{state | mode: :realtime, filter_id: filter_id, check_interval: check_interval}} - else - {:catch_up, new_safe, true} -> - Process.send(self(), :fetch, []) - {:noreply, %{state | safe_block: new_safe}} - - {:logs, {:error, error}} -> - Logger.error("Failed to get logs while switching to realtime mode, reason: #{inspect(error)}") - Process.send_after(self(), :switch_to_realtime, @retry_interval) - {:noreply, state} - - {:error, _error} -> - Logger.error("Failed to set logs filter. Retrying in #{@retry_interval_minutes} minutes...") - Process.send_after(self(), :switch_to_realtime, @retry_interval) - {:noreply, state} - - {:check_interval, {:error, _error}} -> - Logger.error("Failed to calculate check_interval. Retrying in #{@retry_interval_minutes} minutes...") - Process.send_after(self(), :switch_to_realtime, @retry_interval) - {:noreply, state} - end - end + Process.send_after(self(), :continue, delay) - @impl GenServer - def handle_info( - :fetch, - %__MODULE__{ - json_rpc_named_arguments: json_rpc_named_arguments, - mode: :realtime, - filter_id: filter_id, - check_interval: check_interval, - transaction_type: transaction_type - } = state - ) do - case get_filter_changes(filter_id, json_rpc_named_arguments) do - {:ok, logs} -> - handle_new_logs(logs, transaction_type, json_rpc_named_arguments) - Process.send_after(self(), :fetch, check_interval) - {:noreply, state} - - {:error, :filter_not_found} -> - Logger.error("The old filter not found on the node. Creating new filter...") - Process.send(self(), :update_filter, []) - {:noreply, state} - - {:error, _error} -> - Logger.error("Failed to set logs filter. Retrying in #{@retry_interval_minutes} minutes...") - Process.send_after(self(), :fetch, @retry_interval) - {:noreply, state} - end - end - - @impl GenServer - def handle_info( - :update_filter, - %__MODULE__{ - optimism_portal: optimism_portal, - json_rpc_named_arguments: json_rpc_named_arguments, - mode: :realtime - } = state - ) do - {last_l1_block_number, _} = get_last_l1_item() - - case get_new_filter( - last_l1_block_number + 1, - "latest", - optimism_portal, - @transaction_deposited_event, - json_rpc_named_arguments - ) do - {:ok, filter_id} -> - Process.send(self(), :fetch, []) - {:noreply, %{state | filter_id: filter_id}} - - {:error, _error} -> - Logger.error("Failed to set logs filter. Retrying in #{@retry_interval_minutes} minutes...") - Process.send_after(self(), :update_filter, @retry_interval) - {:noreply, state} - end + {:noreply, %{state | start_block: new_start_block, end_block: new_end_block}} end @impl GenServer @@ -331,82 +151,25 @@ defmodule Indexer.Fetcher.Optimism.Deposit do {:noreply, state} end - @impl GenServer - def terminate( - _reason, - %__MODULE__{ - json_rpc_named_arguments: json_rpc_named_arguments - } = state - ) do - if state.filter_id do - Logger.info("Optimism deposits fetcher is terminating, uninstalling filter") - uninstall_filter(state.filter_id, json_rpc_named_arguments) - end - end - - @impl GenServer - def terminate(:normal, _state) do - :ok - end - - defp handle_new_logs(logs, transaction_type, json_rpc_named_arguments) do - {reorgs, logs_to_parse, min_block, max_block, cnt} = - logs - |> Enum.reduce({MapSet.new(), [], nil, 0, 0}, fn - %{"removed" => true, "blockNumber" => block_number}, {reorgs, logs_to_parse, min_block, max_block, cnt} -> - {MapSet.put(reorgs, block_number), logs_to_parse, min_block, max_block, cnt} - - %{"blockNumber" => block_number} = log, {reorgs, logs_to_parse, min_block, max_block, cnt} -> - { - reorgs, - [log | logs_to_parse], - min(min_block, quantity_to_integer(block_number)), - max(max_block, quantity_to_integer(block_number)), - cnt + 1 - } - end) - - handle_reorgs(reorgs) - - unless Enum.empty?(logs_to_parse) do - deposits = events_to_deposits(logs_to_parse, transaction_type, json_rpc_named_arguments) - {:ok, _imported} = Chain.import(%{optimism_deposits: %{params: deposits}, timeout: :infinity}) - - Publisher.broadcast(%{optimism_deposits: deposits}, :realtime) - - Helper.log_blocks_chunk_handling( - min_block, - max_block, - min_block, - max_block, - "#{cnt} TransactionDeposited event(s)", - :L1 + defp log_deleted_rows_count(reorg_block, count) do + if count > 0 do + Logger.warning( + "As L1 reorg was detected, all rows with l1_block_number >= #{reorg_block} were removed from the op_deposits table. Number of removed rows: #{count}." ) end end - defp events_to_deposits(logs, transaction_type, json_rpc_named_arguments) do + defp prepare_events(events, transaction_type, json_rpc_named_arguments) do timestamps = - logs - |> Enum.reduce(MapSet.new(), fn %{"blockNumber" => block_number_quantity}, acc -> - block_number = quantity_to_integer(block_number_quantity) - MapSet.put(acc, block_number) + events + |> get_blocks_by_events(json_rpc_named_arguments, Helper.infinite_retries_number()) + |> Enum.reduce(%{}, fn block, acc -> + block_number = quantity_to_integer(Map.get(block, "number")) + {:ok, timestamp} = DateTime.from_unix(quantity_to_integer(Map.get(block, "timestamp"))) + Map.put(acc, block_number, timestamp) end) - |> MapSet.to_list() - |> get_block_timestamps_by_numbers(json_rpc_named_arguments) - |> case do - {:ok, timestamps} -> - timestamps - - {:error, error} -> - Logger.error( - "Failed to get L1 block timestamps for deposits due to #{inspect(error)}. Timestamps will be set to null." - ) - - %{} - end - Enum.map(logs, &event_to_deposit(&1, timestamps, transaction_type)) + Enum.map(events, &event_to_deposit(&1, timestamps, transaction_type)) end defp event_to_deposit( @@ -440,11 +203,13 @@ defmodule Indexer.Fetcher.Optimism.Deposit do msg_value::binary-size(32), value::binary-size(32), gas_limit::binary-size(8), - is_creation::binary-size(1), + _is_creation::binary-size(1), data::binary >> ] = decode_data(opaque_data, [:bytes]) + is_system = <<0>> + rlp_encoded = ExRLP.encode( [ @@ -454,7 +219,7 @@ defmodule Indexer.Fetcher.Optimism.Deposit do msg_value |> String.replace_leading(<<0>>, <<>>), value |> String.replace_leading(<<0>>, <<>>), gas_limit |> String.replace_leading(<<0>>, <<>>), - is_creation |> String.replace_leading(<<0>>, <<>>), + is_system |> String.replace_leading(<<0>>, <<>>), data ], encoding: :hex @@ -483,98 +248,70 @@ defmodule Indexer.Fetcher.Optimism.Deposit do } end - defp handle_reorgs(reorgs) do - if MapSet.size(reorgs) > 0 do - Logger.warning("L1 reorg detected. The following L1 blocks were removed: #{inspect(MapSet.to_list(reorgs))}") - - {deleted_count, _} = Repo.delete_all(from(d in Deposit, where: d.l1_block_number in ^reorgs)) - - if deleted_count > 0 do - Logger.warning( - "As L1 reorg was detected, all affected rows were removed from the op_deposits table. Number of removed rows: #{deleted_count}." - ) - end - end - end - - defp get_block_timestamps_by_numbers(numbers, json_rpc_named_arguments, retries \\ 3) do - id_to_params = - numbers - |> Stream.map(fn number -> %{number: number} end) - |> Stream.with_index() - |> Enum.into(%{}, fn {params, id} -> {id, params} end) - - request = Blocks.requests(id_to_params, &ByNumber.request(&1, false)) - error_message = &"Cannot fetch timestamps for blocks #{numbers}. Error: #{inspect(&1)}" + @doc """ + Determines the last saved L1 block number, the last saved transaction hash, and the transaction info for L1 Deposit events. - case Optimism.repeated_request(request, error_message, json_rpc_named_arguments, retries) do - {:ok, response} -> - %Blocks{blocks_params: blocks_params} = Blocks.from_responses(response, id_to_params) + Used by the `Indexer.Fetcher.Optimism` module to start fetching from a correct block number + after reorg has occurred. - {:ok, - blocks_params - |> Enum.reduce(%{}, fn %{number: number, timestamp: timestamp}, acc -> Map.put_new(acc, number, timestamp) end)} + ## Parameters + - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection. + Used to get transaction info by its hash from the RPC node. - err -> - err - end + ## Returns + - A tuple `{last_block_number, last_transaction_hash, last_transaction}` where + `last_block_number` is the last block number found in the corresponding table (0 if not found), + `last_transaction_hash` is the last transaction hash found in the corresponding table (nil if not found), + `last_transaction` is the transaction info got from the RPC (nil if not found). + - A tuple `{:error, message}` in case the `eth_getTransactionByHash` RPC request failed. + """ + @spec get_last_l1_item(EthereumJSONRPC.json_rpc_named_arguments()) :: + {non_neg_integer(), binary() | nil, map() | nil} | {:error, any()} + def get_last_l1_item(json_rpc_named_arguments) do + Optimism.get_last_item( + :L1, + &Deposit.last_deposit_l1_block_number_query/0, + &Deposit.remove_deposits_query/1, + json_rpc_named_arguments + ) end - defp get_new_filter(from_block, to_block, address, topic0, json_rpc_named_arguments, retries \\ 3) do - processed_from_block = if is_integer(from_block), do: integer_to_quantity(from_block), else: from_block - processed_to_block = if is_integer(to_block), do: integer_to_quantity(to_block), else: to_block - - req = - request(%{ - id: 0, - method: "eth_newFilter", - params: [ - %{ - fromBlock: processed_from_block, - toBlock: processed_to_block, - address: address, - topics: [topic0] - } - ] - }) - - error_message = &"Cannot create new log filter. Error: #{inspect(&1)}" - - Optimism.repeated_request(req, error_message, json_rpc_named_arguments, retries) + @doc """ + Returns L1 RPC URL for this module. + """ + @spec l1_rpc_url() :: binary() | nil + def l1_rpc_url do + Optimism.l1_rpc_url() end - defp get_filter_changes(filter_id, json_rpc_named_arguments, retries \\ 3) do - req = - request(%{ - id: 0, - method: "eth_getFilterChanges", - params: [filter_id] - }) - - error_message = &"Cannot fetch filter changes. Error: #{inspect(&1)}" + @doc """ + Determines if `Indexer.Fetcher.RollupL1ReorgMonitor` module must be up + before this fetcher starts. - case Optimism.repeated_request(req, error_message, json_rpc_named_arguments, retries) do - {:error, %{code: _, message: "filter not found"}} -> {:error, :filter_not_found} - response -> response - end + ## Returns + - `true` if the reorg monitor must be active, `false` otherwise. + """ + @spec requires_l1_reorg_monitor?() :: boolean() + def requires_l1_reorg_monitor? do + Optimism.requires_l1_reorg_monitor?() end - defp uninstall_filter(filter_id, json_rpc_named_arguments, retries \\ 1) do - req = - request(%{ - id: 0, - method: "eth_getFilterChanges", - params: [filter_id] - }) - - error_message = &"Cannot uninstall filter. Error: #{inspect(&1)}" + defp get_blocks_by_events(events, json_rpc_named_arguments, retries) do + request = + events + |> Enum.reduce(%{}, fn event, acc -> + Map.put(acc, event["blockNumber"], 0) + end) + |> Stream.map(fn {block_number, _} -> %{number: block_number} end) + |> Stream.with_index() + |> Enum.into(%{}, fn {params, id} -> {id, params} end) + |> Blocks.requests(&ByNumber.request(&1, false, false)) - Optimism.repeated_request(req, error_message, json_rpc_named_arguments, retries) - end + error_message = &"Cannot fetch blocks with batch request. Error: #{inspect(&1)}. Request: #{inspect(request)}" - defp get_last_l1_item do - Deposit.last_deposit_l1_block_number_query() - |> Repo.one() - |> Kernel.||({0, nil}) + case Optimism.repeated_request(request, error_message, json_rpc_named_arguments, retries) do + {:ok, results} -> Enum.map(results, fn %{result: result} -> result end) + {:error, _} -> [] + end end end diff --git a/apps/indexer/lib/indexer/fetcher/optimism/output_root.ex b/apps/indexer/lib/indexer/fetcher/optimism/output_root.ex index c032a878d2..6223d2f731 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism/output_root.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism/output_root.ex @@ -67,20 +67,21 @@ defmodule Indexer.Fetcher.Optimism.OutputRoot do start_block: start_block, end_block: end_block, json_rpc_named_arguments: json_rpc_named_arguments, + eth_get_logs_range_size: eth_get_logs_range_size, stop: stop } = state ) do # credo:disable-for-next-line time_before = Timex.now() - chunks_number = ceil((end_block - start_block + 1) / Optimism.get_logs_range_size()) + chunks_number = ceil((end_block - start_block + 1) / eth_get_logs_range_size) chunk_range = Range.new(0, max(chunks_number - 1, 0), 1) last_written_block = chunk_range |> Enum.reduce_while(start_block - 1, fn current_chunk, _ -> - chunk_start = start_block + Optimism.get_logs_range_size() * current_chunk - chunk_end = min(chunk_start + Optimism.get_logs_range_size() - 1, end_block) + chunk_start = start_block + eth_get_logs_range_size * current_chunk + chunk_end = min(chunk_start + eth_get_logs_range_size - 1, end_block) if chunk_end >= chunk_start do IndexerHelper.log_blocks_chunk_handling(chunk_start, chunk_end, start_block, end_block, nil, :L1) @@ -184,17 +185,32 @@ defmodule Indexer.Fetcher.Optimism.OutputRoot do end end - def get_last_l1_item do - query = - from(root in OutputRoot, - select: {root.l1_block_number, root.l1_transaction_hash}, - order_by: [desc: root.l2_output_index], - limit: 1 - ) + @doc """ + Determines the last saved L1 block number, the last saved transaction hash, and the transaction info for Output Roots. + + Used by the `Indexer.Fetcher.Optimism` module to start fetching from a correct block number + after reorg has occurred. - query - |> Repo.one() - |> Kernel.||({0, nil}) + ## Parameters + - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection. + Used to get transaction info by its hash from the RPC node. + + ## Returns + - A tuple `{last_block_number, last_transaction_hash, last_transaction}` where + `last_block_number` is the last block number found in the corresponding table (0 if not found), + `last_transaction_hash` is the last transaction hash found in the corresponding table (nil if not found), + `last_transaction` is the transaction info got from the RPC (nil if not found). + - A tuple `{:error, message}` in case the `eth_getTransactionByHash` RPC request failed. + """ + @spec get_last_l1_item(EthereumJSONRPC.json_rpc_named_arguments()) :: + {non_neg_integer(), binary() | nil, map() | nil} | {:error, any()} + def get_last_l1_item(json_rpc_named_arguments) do + Optimism.get_last_item( + :L1, + &OutputRoot.last_root_l1_block_number_query/0, + &OutputRoot.remove_roots_query/1, + json_rpc_named_arguments + ) end @doc """ diff --git a/apps/indexer/lib/indexer/fetcher/optimism/transaction_batch.ex b/apps/indexer/lib/indexer/fetcher/optimism/transaction_batch.ex index 8185dec52f..aa0623e059 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism/transaction_batch.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism/transaction_batch.ex @@ -28,11 +28,13 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do import Explorer.Helper, only: [parse_integer: 1] + alias Ecto.Multi alias EthereumJSONRPC.Block.ByHash alias EthereumJSONRPC.{Blocks, Contract} alias Explorer.{Chain, Repo} alias Explorer.Chain.Beacon.Blob, as: BeaconBlob alias Explorer.Chain.{Block, Hash, RollupReorgMonitorQueue} + alias Explorer.Chain.Events.Publisher alias Explorer.Chain.Optimism.{FrameSequence, FrameSequenceBlob} alias Explorer.Chain.Optimism.TransactionBatch, as: OptimismTransactionBatch alias HTTPoison.Response @@ -42,10 +44,18 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do alias Indexer.Helper alias Varint.LEB128 - @fetcher_name :optimism_transaction_batches + @beacon_blob_fetcher_reference_slot_eth 8_500_000 + @beacon_blob_fetcher_reference_timestamp_eth 1_708_824_023 + @beacon_blob_fetcher_reference_slot_sepolia 4_400_000 + @beacon_blob_fetcher_reference_timestamp_sepolia 1_708_533_600 + @beacon_blob_fetcher_reference_slot_holesky 1_000_000 + @beacon_blob_fetcher_reference_timestamp_holesky 1_707_902_400 + @beacon_blob_fetcher_slot_duration 12 + @chain_id_eth 1 + @chain_id_sepolia 11_155_111 + @chain_id_holesky 17000 - # Optimism chain block time is a constant (2 seconds) - @op_chain_block_time 2 + @fetcher_name :optimism_transaction_batches @compressor_brotli 1 @@ -101,7 +111,6 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do {:batch_inbox_valid, true} <- {:batch_inbox_valid, Helper.address_correct?(batch_inbox)}, {:batch_submitter_valid, true} <- {:batch_submitter_valid, Helper.address_correct?(batch_submitter)}, - false <- is_nil(start_block_l1), true <- start_block_l1 > 0, chunk_size = parse_integer(env[:blocks_chunk_size]), {:chunk_size_valid, true} <- {:chunk_size_valid, !is_nil(chunk_size) && chunk_size > 0}, @@ -115,6 +124,14 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do Optimism.get_block_check_interval(json_rpc_named_arguments) do start_block = max(start_block_l1, last_l1_block_number) + chain_id_l1 = fetch_chain_id(json_rpc_named_arguments) + + if is_nil(chain_id_l1) do + Logger.warning( + "Cannot get Chain ID from the L1 RPC. The module will use fallback values from INDEXER_BEACON_BLOB_FETCHER_* env variables." + ) + end + Process.send(self(), :continue, []) {:noreply, @@ -129,8 +146,10 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do chunk_size: chunk_size, incomplete_channels: %{}, genesis_block_l2: env[:genesis_block_l2], + block_duration: optimism_env[:block_duration], json_rpc_named_arguments: json_rpc_named_arguments, - json_rpc_named_arguments_l2: json_rpc_named_arguments_l2 + json_rpc_named_arguments_l2: json_rpc_named_arguments_l2, + chain_id_l1: chain_id_l1 }} else {:system_config_valid, false} -> @@ -182,7 +201,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do {:stop, :normal, state} {:system_config_read, nil} -> - Logger.error("Cannot read SystemConfig contract.") + Logger.error("Cannot read SystemConfig contract and fallback envs are not correctly defined.") {:stop, :normal, state} _ -> @@ -213,8 +232,10 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do # - `chunk_size`: max number of L1 blocks in one chunk # - `incomplete_channels`: intermediate map of channels (incomplete frame sequences) in memory # - `genesis_block_l2`: Optimism BedRock upgrade L2 block number (used when parsing span batches) + # - `block_duration`: L2 block duration in seconds (used when parsing span batches) # - `json_rpc_named_arguments`: data to connect to L1 RPC server # - `json_rpc_named_arguments_l2`: data to connect to L2 RPC server + # - `chain_id_l1`: chain ID of L1 layer. @impl GenServer def handle_info( :continue, @@ -229,8 +250,10 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do chunk_size: chunk_size, incomplete_channels: incomplete_channels, genesis_block_l2: genesis_block_l2, + block_duration: block_duration, json_rpc_named_arguments: json_rpc_named_arguments, - json_rpc_named_arguments_l2: json_rpc_named_arguments_l2 + json_rpc_named_arguments_l2: json_rpc_named_arguments_l2, + chain_id_l1: chain_id_l1 } = state ) do time_before = Timex.now() @@ -260,10 +283,10 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do Range.new(chunk_start, chunk_end), batch_inbox, batch_submitter, - genesis_block_l2, + {genesis_block_l2, block_duration}, incomplete_channels_acc, {json_rpc_named_arguments, json_rpc_named_arguments_l2}, - {eip4844_blobs_api_url, celestia_blobs_api_url}, + {eip4844_blobs_api_url, celestia_blobs_api_url, chain_id_l1}, Helper.infinite_retries_number() ) @@ -285,6 +308,14 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do remove_prev_frame_sequences(inserted) set_frame_sequences_view_ready(sequences) + Publisher.broadcast( + %{ + new_optimism_batches: + Enum.map(sequences, &FrameSequence.batch_by_internal_id(&1.id, include_blobs?: false)) + }, + :realtime + ) + Helper.log_blocks_chunk_handling( chunk_start, chunk_end, @@ -402,28 +433,62 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do block.timestamp end + # Determines the last saved L1 block number, the last saved transaction hash, and the transaction info for batches. + # + # Utilized to start fetching from a correct block number after reorg has occurred. + # + # ## Parameters + # - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection. + # Used to get transaction info by its hash from the RPC node. + # + # ## Returns + # - A tuple `{last_block_number, last_transaction_hash, last_transaction}` where + # `last_block_number` is the last block number found in the corresponding table (0 if not found), + # `last_transaction_hash` is the last transaction hash found in the corresponding table (nil if not found), + # `last_transaction` is the transaction info got from the RPC (nil if not found). + @spec get_last_l1_item(EthereumJSONRPC.json_rpc_named_arguments()) :: {non_neg_integer(), binary() | nil, map() | nil} defp get_last_l1_item(json_rpc_named_arguments) do - l1_transaction_hashes = + result = Repo.one( from( tb in OptimismTransactionBatch, inner_join: fs in FrameSequence, on: fs.id == tb.frame_sequence_id, - select: fs.l1_transaction_hashes, + select: {fs.id, fs.l1_transaction_hashes}, order_by: [desc: tb.l2_block_number], limit: 1 ) ) - if is_nil(l1_transaction_hashes) do - {0, nil, nil} + with {:empty_hashes, false} <- {:empty_hashes, is_nil(result)}, + l1_transaction_hashes = elem(result, 1), + last_l1_transaction_hash = List.last(l1_transaction_hashes), + {:ok, last_l1_transaction} = + Optimism.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments), + {:empty_transaction, false, last_l1_transaction_hash} <- + {:empty_transaction, is_nil(last_l1_transaction), last_l1_transaction_hash} do + last_l1_block_number = quantity_to_integer(Map.get(last_l1_transaction, "blockNumber", 0)) + {last_l1_block_number, last_l1_transaction_hash, last_l1_transaction} else - last_l1_transaction_hash = List.last(l1_transaction_hashes) + {:empty_hashes, true} -> + {0, nil, nil} - {:ok, last_l1_transaction} = Optimism.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments) + {:empty_transaction, true, last_l1_transaction_hash} -> + Logger.error( + "Cannot find last L1 transaction from RPC by its hash (#{last_l1_transaction_hash}). Probably, there was a reorg on L1 chain. Trying to check preceding frame sequence..." + ) - last_l1_block_number = quantity_to_integer(Map.get(last_l1_transaction || %{}, "blockNumber", 0)) - {last_l1_block_number, last_l1_transaction_hash, last_l1_transaction} + id = elem(result, 0) + + Multi.new() + |> Multi.delete_all( + :delete_transaction_batches, + from(tb in OptimismTransactionBatch, where: tb.frame_sequence_id == ^id) + ) + |> Multi.delete_all(:delete_frame_sequence, from(fs in FrameSequence, where: fs.id == ^id)) + |> Repo.transaction() + + get_last_l1_item(json_rpc_named_arguments) end end @@ -431,7 +496,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do block_range, batch_inbox, batch_submitter, - genesis_block_l2, + {genesis_block_l2, block_duration}, incomplete_channels, {json_rpc_named_arguments, json_rpc_named_arguments_l2}, blobs_api_url, @@ -443,7 +508,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do |> transactions_filter(batch_submitter, batch_inbox) |> get_transaction_batches_inner( blocks_params, - genesis_block_l2, + {genesis_block_l2, block_duration}, incomplete_channels, json_rpc_named_arguments_l2, blobs_api_url @@ -471,7 +536,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do block_range, batch_inbox, batch_submitter, - genesis_block_l2, + {genesis_block_l2, block_duration}, incomplete_channels, {json_rpc_named_arguments, json_rpc_named_arguments_l2}, blobs_api_url, @@ -481,7 +546,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do end end - defp eip4844_blobs_to_inputs(_transaction_hash, _blob_versioned_hashes, _block_timestamp, "") do + defp eip4844_blobs_to_inputs(_transaction_hash, _blob_versioned_hashes, _block_timestamp, "", _chain_id_l1) do Logger.error( "Cannot read EIP-4844 blobs from the Blockscout Blobs API as the API URL is not defined. Please, check INDEXER_OPTIMISM_L1_BATCH_BLOCKSCOUT_BLOBS_API_URL env variable." ) @@ -493,7 +558,8 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do transaction_hash, blob_versioned_hashes, block_timestamp, - blobs_api_url + blobs_api_url, + chain_id_l1 ) do blob_versioned_hashes |> Enum.reduce([], fn blob_hash, inputs_acc -> @@ -530,7 +596,8 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do transaction_hash, blob_hash, block_timestamp, - inputs_acc + inputs_acc, + chain_id_l1 ) end end) @@ -541,13 +608,38 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do transaction_hash, blob_hash, block_timestamp, - inputs_acc + inputs_acc, + chain_id_l1 ) do beacon_config = - :indexer - |> Application.get_env(Blob) - |> Keyword.take([:reference_slot, :reference_timestamp, :slot_duration]) - |> Enum.into(%{}) + case chain_id_l1 do + @chain_id_eth -> + %{ + reference_slot: @beacon_blob_fetcher_reference_slot_eth, + reference_timestamp: @beacon_blob_fetcher_reference_timestamp_eth, + slot_duration: @beacon_blob_fetcher_slot_duration + } + + @chain_id_sepolia -> + %{ + reference_slot: @beacon_blob_fetcher_reference_slot_sepolia, + reference_timestamp: @beacon_blob_fetcher_reference_timestamp_sepolia, + slot_duration: @beacon_blob_fetcher_slot_duration + } + + @chain_id_holesky -> + %{ + reference_slot: @beacon_blob_fetcher_reference_slot_holesky, + reference_timestamp: @beacon_blob_fetcher_reference_timestamp_holesky, + slot_duration: @beacon_blob_fetcher_slot_duration + } + + _ -> + :indexer + |> Application.get_env(Blob) + |> Keyword.take([:reference_slot, :reference_timestamp, :slot_duration]) + |> Enum.into(%{}) + end {:ok, fetched_blobs} = block_timestamp @@ -665,10 +757,10 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do defp get_transaction_batches_inner( transactions_filtered, blocks_params, - genesis_block_l2, + {genesis_block_l2, block_duration}, incomplete_channels, json_rpc_named_arguments_l2, - {eip4844_blobs_api_url, celestia_blobs_api_url} + {eip4844_blobs_api_url, celestia_blobs_api_url, chain_id_l1} ) do transactions_filtered |> Enum.reduce({:ok, incomplete_channels, [], [], []}, fn transaction, @@ -684,7 +776,8 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do transaction.hash, transaction.blob_versioned_hashes, block_timestamp, - eip4844_blobs_api_url + eip4844_blobs_api_url, + chain_id_l1 ) first_byte(transaction.input) == 0xCE -> @@ -706,7 +799,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do blocks_params, new_incomplete_channels_acc, {new_batches_acc, new_sequences_acc, new_blobs_acc}, - genesis_block_l2, + {genesis_block_l2, block_duration}, json_rpc_named_arguments_l2 ) end @@ -720,7 +813,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do blocks_params, incomplete_channels_acc, {batches_acc, sequences_acc, blobs_acc}, - genesis_block_l2, + {genesis_block_l2, block_duration}, json_rpc_named_arguments_l2 ) do frame = input_to_frame(input.bytes) @@ -766,7 +859,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do batches_acc, sequences_acc, blobs_acc, - genesis_block_l2, + {genesis_block_l2, block_duration}, json_rpc_named_arguments_l2 ) else @@ -782,7 +875,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do batches_acc, sequences_acc, blobs_acc, - genesis_block_l2, + {genesis_block_l2, block_duration}, json_rpc_named_arguments_l2 ) do frame_sequence_last = List.first(sequences_acc) @@ -849,7 +942,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do bytes, frame_sequence_id, channel.l1_timestamp, - genesis_block_l2, + {genesis_block_l2, block_duration}, json_rpc_named_arguments_l2 ) @@ -1119,7 +1212,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do bytes, id, l1_timestamp, - genesis_block_l2, + {genesis_block_l2, block_duration}, json_rpc_named_arguments_l2 ) do uncompressed_bytes = @@ -1144,7 +1237,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do version <= 2 -> # parsing the span batch - handle_v1_batch(content, id, l1_timestamp, genesis_block_l2, batch_acc) + handle_v1_batch(content, id, l1_timestamp, genesis_block_l2, block_duration, batch_acc) true -> Logger.error("Unsupported batch version ##{version}") @@ -1188,7 +1281,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do [batch | batch_acc] end - defp handle_v1_batch(content, frame_sequence_id, l1_timestamp, genesis_block_l2, batch_acc) do + defp handle_v1_batch(content, frame_sequence_id, l1_timestamp, genesis_block_l2, block_duration, batch_acc) do {rel_timestamp, content_remainder} = LEB128.decode(content) # skip l1_origin_num @@ -1202,12 +1295,12 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do |> LEB128.decode() # the first and last L2 blocks in the span - span_start = div(rel_timestamp, @op_chain_block_time) + genesis_block_l2 + span_start = div(rel_timestamp, block_duration) + genesis_block_l2 span_end = span_start + block_count - 1 cond do - rem(rel_timestamp, @op_chain_block_time) != 0 -> - Logger.error("rel_timestamp is not divisible by #{@op_chain_block_time}. We ignore the span batch.") + rem(rel_timestamp, block_duration) != 0 -> + Logger.error("rel_timestamp is not divisible by #{block_duration}. We ignore the span batch.") batch_acc @@ -1326,6 +1419,22 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do end) end + # Reads some public getters of SystemConfig contract and returns retrieved values. + # Gets the number of a start block (from which this fetcher should start), + # the inbox address, and the batcher (batch submitter) address. + # + # If SystemConfig has obsolete implementation, the values are fallen back from the corresponding + # env variables (INDEXER_OPTIMISM_L1_START_BLOCK, INDEXER_OPTIMISM_L1_BATCH_INBOX, INDEXER_OPTIMISM_L1_BATCH_SUBMITTER). + # + # ## Parameters + # - `contract_address`: An address of SystemConfig contract. + # - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection. + # + # ## Returns + # - A tuple: {start_block, inbox, submitter}. + # - `nil` in case of error. + @spec read_system_config(String.t(), EthereumJSONRPC.json_rpc_named_arguments()) :: + {non_neg_integer(), String.t(), String.t()} | nil defp read_system_config(contract_address, json_rpc_named_arguments) do requests = [ # startBlock() public getter @@ -1342,7 +1451,7 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do &json_rpc/2, [requests, json_rpc_named_arguments], error_message, - Helper.infinite_retries_number() + Helper.finite_retries_number() ) do {:ok, responses} -> start_block = quantity_to_integer(Enum.at(responses, 0).result) @@ -1350,6 +1459,39 @@ defmodule Indexer.Fetcher.Optimism.TransactionBatch do "0x000000000000000000000000" <> batch_submitter = Enum.at(responses, 2).result {start_block, String.downcase("0x" <> batch_inbox), String.downcase("0x" <> batch_submitter)} + _ -> + start_block = Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:start_block_l1] + env = Application.get_all_env(:indexer)[__MODULE__] + + if not is_nil(start_block) and Helper.address_correct?(env[:inbox]) and Helper.address_correct?(env[:submitter]) do + {start_block, String.downcase(env[:inbox]), String.downcase(env[:submitter])} + end + end + end + + # Fetches the chain id from the RPC. + # + # ## Parameters + # - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection. + # + # ## Returns + # - The chain id as unsigned integer. + # - `nil` if the request failed. + @spec fetch_chain_id(EthereumJSONRPC.json_rpc_named_arguments()) :: non_neg_integer() | nil + defp fetch_chain_id(json_rpc_named_arguments) do + error_message = &"Cannot read `eth_chainId`. Error: #{inspect(&1)}" + + request = EthereumJSONRPC.request(%{id: 0, method: "eth_chainId", params: []}) + + case Helper.repeated_call( + &json_rpc/2, + [request, json_rpc_named_arguments], + error_message, + Helper.infinite_retries_number() + ) do + {:ok, response} -> + quantity_to_integer(response) + _ -> nil end diff --git a/apps/indexer/lib/indexer/fetcher/optimism/withdrawal.ex b/apps/indexer/lib/indexer/fetcher/optimism/withdrawal.ex index 113a56fbce..0c64506ba4 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism/withdrawal.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism/withdrawal.ex @@ -56,13 +56,12 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do start_block_l2 = parse_integer(env[:start_block_l2]), false <- is_nil(start_block_l2), true <- start_block_l2 > 0, - {last_l2_block_number, last_l2_transaction_hash} <- get_last_l2_item(), + {last_l2_block_number, last_l2_transaction_hash, last_l2_transaction} <- + get_last_l2_item(json_rpc_named_arguments), {safe_block, safe_block_is_latest} = Helper.get_safe_block(json_rpc_named_arguments), {:start_block_l2_valid, true} <- {:start_block_l2_valid, (start_block_l2 <= last_l2_block_number || last_l2_block_number == 0) && start_block_l2 <= safe_block}, - {:ok, last_l2_transaction} <- - Optimism.get_transaction_by_hash(last_l2_transaction_hash, json_rpc_named_arguments), {:l2_transaction_not_found, false} <- {:l2_transaction_not_found, !is_nil(last_l2_transaction_hash) && is_nil(last_l2_transaction)} do Process.send(self(), :continue, []) @@ -74,7 +73,9 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do safe_block: safe_block, safe_block_is_latest: safe_block_is_latest, message_passer: env[:message_passer], - json_rpc_named_arguments: json_rpc_named_arguments + json_rpc_named_arguments: json_rpc_named_arguments, + eth_get_logs_range_size: + Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:l2_eth_get_logs_range_size] }} else {:start_block_l2_undefined, true} -> @@ -113,10 +114,11 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do %{ start_block_l2: start_block_l2, message_passer: message_passer, - json_rpc_named_arguments: json_rpc_named_arguments + json_rpc_named_arguments: json_rpc_named_arguments, + eth_get_logs_range_size: eth_get_logs_range_size } = state ) do - fill_msg_nonce_gaps(start_block_l2, message_passer, json_rpc_named_arguments) + fill_msg_nonce_gaps(start_block_l2, message_passer, json_rpc_named_arguments, eth_get_logs_range_size) Process.send(self(), :find_new_events, []) {:noreply, state} end @@ -129,17 +131,18 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do safe_block: safe_block, safe_block_is_latest: safe_block_is_latest, message_passer: message_passer, - json_rpc_named_arguments: json_rpc_named_arguments + json_rpc_named_arguments: json_rpc_named_arguments, + eth_get_logs_range_size: eth_get_logs_range_size } = state ) do # find and fill all events between start_block and "safe" block # the "safe" block can be "latest" (when safe_block_is_latest == true) - fill_block_range(start_block, safe_block, message_passer, json_rpc_named_arguments) + fill_block_range(start_block, safe_block, message_passer, json_rpc_named_arguments, eth_get_logs_range_size) if not safe_block_is_latest do # find and fill all events between "safe" and "latest" block (excluding "safe") {:ok, latest_block} = Optimism.get_block_number_by_tag("latest", json_rpc_named_arguments) - fill_block_range(safe_block + 1, latest_block, message_passer, json_rpc_named_arguments) + fill_block_range(safe_block + 1, latest_block, message_passer, json_rpc_named_arguments, eth_get_logs_range_size) end {:stop, :normal, state} @@ -183,7 +186,8 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do w.msg_nonce, ^nonce_max ) - ) + ), + timeout: :infinity ) end @@ -198,7 +202,8 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do w.msg_nonce, ^nonce_min ) - ) + ), + timeout: :infinity ) end @@ -254,24 +259,31 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do Enum.count(withdrawals) end - defp fill_block_range(l2_block_start, l2_block_end, message_passer, json_rpc_named_arguments, scan_db) do + defp fill_block_range( + l2_block_start, + l2_block_end, + message_passer, + json_rpc_named_arguments, + eth_get_logs_range_size, + scan_db + ) do chunks_number = if scan_db do 1 else - ceil((l2_block_end - l2_block_start + 1) / Optimism.get_logs_range_size()) + ceil((l2_block_end - l2_block_start + 1) / eth_get_logs_range_size) end chunk_range = Range.new(0, max(chunks_number - 1, 0), 1) Enum.reduce(chunk_range, 0, fn current_chunk, withdrawals_count_acc -> - chunk_start = l2_block_start + Optimism.get_logs_range_size() * current_chunk + chunk_start = l2_block_start + eth_get_logs_range_size * current_chunk chunk_end = if scan_db do l2_block_end else - min(chunk_start + Optimism.get_logs_range_size() - 1, l2_block_end) + min(chunk_start + eth_get_logs_range_size - 1, l2_block_end) end Helper.log_blocks_chunk_handling(chunk_start, chunk_end, l2_block_start, l2_block_end, nil, :L2) @@ -298,23 +310,30 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do end) end - defp fill_block_range(start_block, end_block, message_passer, json_rpc_named_arguments) do + defp fill_block_range(start_block, end_block, message_passer, json_rpc_named_arguments, eth_get_logs_range_size) do if start_block <= end_block do - fill_block_range(start_block, end_block, message_passer, json_rpc_named_arguments, true) - fill_msg_nonce_gaps(start_block, message_passer, json_rpc_named_arguments, false) - {last_l2_block_number, _} = get_last_l2_item() + fill_block_range(start_block, end_block, message_passer, json_rpc_named_arguments, eth_get_logs_range_size, true) + fill_msg_nonce_gaps(start_block, message_passer, json_rpc_named_arguments, eth_get_logs_range_size, false) + {last_l2_block_number, _, _} = get_last_l2_item() fill_block_range( max(start_block, last_l2_block_number), end_block, message_passer, json_rpc_named_arguments, + eth_get_logs_range_size, false ) end end - defp fill_msg_nonce_gaps(start_block_l2, message_passer, json_rpc_named_arguments, scan_db \\ true) do + defp fill_msg_nonce_gaps( + start_block_l2, + message_passer, + json_rpc_named_arguments, + eth_get_logs_range_size, + scan_db \\ true + ) do nonce_min = Repo.aggregate(OptimismWithdrawal, :min, :msg_nonce) nonce_max = Repo.aggregate(OptimismWithdrawal, :max, :msg_nonce) @@ -332,7 +351,14 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do |> Enum.zip(new_ends) |> Enum.each(fn {l2_block_start, l2_block_end} -> withdrawals_count = - fill_block_range(l2_block_start, l2_block_end, message_passer, json_rpc_named_arguments, scan_db) + fill_block_range( + l2_block_start, + l2_block_end, + message_passer, + json_rpc_named_arguments, + eth_get_logs_range_size, + scan_db + ) if withdrawals_count > 0 do log_fill_msg_nonce_gaps(scan_db, l2_block_start, l2_block_end, withdrawals_count) @@ -340,22 +366,35 @@ defmodule Indexer.Fetcher.Optimism.Withdrawal do end) if scan_db do - fill_msg_nonce_gaps(start_block_l2, message_passer, json_rpc_named_arguments, false) + fill_msg_nonce_gaps(start_block_l2, message_passer, json_rpc_named_arguments, eth_get_logs_range_size, false) end end end - defp get_last_l2_item do - query = - from(w in OptimismWithdrawal, - select: {w.l2_block_number, w.l2_transaction_hash}, - order_by: [desc: w.msg_nonce], - limit: 1 - ) - - query - |> Repo.one() - |> Kernel.||({0, nil}) + # Determines the last saved L2 block number, the last saved transaction hash, and the transaction info for withdrawals. + # + # Utilized to start fetching from a correct block number after reorg has occurred. + # + # ## Parameters + # - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection. + # Used to get transaction info by its hash from the RPC node. + # Can be `nil` if the transaction info is not needed. + # + # ## Returns + # - A tuple `{last_block_number, last_transaction_hash, last_transaction}` where + # `last_block_number` is the last block number found in the corresponding table (0 if not found), + # `last_transaction_hash` is the last transaction hash found in the corresponding table (nil if not found), + # `last_transaction` is the transaction info got from the RPC (nil if not found or not needed). + # - A tuple `{:error, message}` in case the `eth_getTransactionByHash` RPC request failed. + @spec get_last_l2_item(EthereumJSONRPC.json_rpc_named_arguments() | nil) :: + {non_neg_integer(), binary() | nil, map() | nil} | {:error, any()} + defp get_last_l2_item(json_rpc_named_arguments \\ nil) do + Optimism.get_last_item( + :L2, + &OptimismWithdrawal.last_withdrawal_l2_block_number_query/0, + &OptimismWithdrawal.remove_withdrawals_query/1, + json_rpc_named_arguments + ) end defp log_fill_msg_nonce_gaps(scan_db, l2_block_start, l2_block_end, withdrawals_count) do diff --git a/apps/indexer/lib/indexer/fetcher/optimism/withdrawal_event.ex b/apps/indexer/lib/indexer/fetcher/optimism/withdrawal_event.ex index 3cb2a55d4a..5efb323906 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism/withdrawal_event.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism/withdrawal_event.ex @@ -69,20 +69,21 @@ defmodule Indexer.Fetcher.Optimism.WithdrawalEvent do block_check_interval: block_check_interval, start_block: start_block, end_block: end_block, - json_rpc_named_arguments: json_rpc_named_arguments + json_rpc_named_arguments: json_rpc_named_arguments, + eth_get_logs_range_size: eth_get_logs_range_size } = state ) do # credo:disable-for-next-line time_before = Timex.now() - chunks_number = ceil((end_block - start_block + 1) / Optimism.get_logs_range_size()) + chunks_number = ceil((end_block - start_block + 1) / eth_get_logs_range_size) chunk_range = Range.new(0, max(chunks_number - 1, 0), 1) last_written_block = chunk_range |> Enum.reduce_while(start_block - 1, fn current_chunk, _ -> - chunk_start = start_block + Optimism.get_logs_range_size() * current_chunk - chunk_end = min(chunk_start + Optimism.get_logs_range_size() - 1, end_block) + chunk_start = start_block + eth_get_logs_range_size * current_chunk + chunk_end = min(chunk_start + eth_get_logs_range_size - 1, end_block) if chunk_end >= chunk_start do Helper.log_blocks_chunk_handling(chunk_start, chunk_end, start_block, end_block, nil, :L1) @@ -245,17 +246,32 @@ defmodule Indexer.Fetcher.Optimism.WithdrawalEvent do |> Map.values() end - def get_last_l1_item do - query = - from(we in WithdrawalEvent, - select: {we.l1_block_number, we.l1_transaction_hash}, - order_by: [desc: we.l1_timestamp], - limit: 1 - ) + @doc """ + Determines the last saved L1 block number, the last saved transaction hash, and the transaction info for L1 Withdrawal events. + + Used by the `Indexer.Fetcher.Optimism` module to start fetching from a correct block number + after reorg has occurred. - query - |> Repo.one() - |> Kernel.||({0, nil}) + ## Parameters + - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection. + Used to get transaction info by its hash from the RPC node. + + ## Returns + - A tuple `{last_block_number, last_transaction_hash, last_transaction}` where + `last_block_number` is the last block number found in the corresponding table (0 if not found), + `last_transaction_hash` is the last transaction hash found in the corresponding table (nil if not found), + `last_transaction` is the transaction info got from the RPC (nil if not found). + - A tuple `{:error, message}` in case the `eth_getTransactionByHash` RPC request failed. + """ + @spec get_last_l1_item(EthereumJSONRPC.json_rpc_named_arguments()) :: + {non_neg_integer(), binary() | nil, map() | nil} | {:error, any()} + def get_last_l1_item(json_rpc_named_arguments) do + Optimism.get_last_item( + :L1, + &WithdrawalEvent.last_event_l1_block_number_query/0, + &WithdrawalEvent.remove_events_query/1, + json_rpc_named_arguments + ) end @doc """ diff --git a/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex b/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex index 825dec00da..b2f9541b2b 100644 --- a/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex +++ b/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex @@ -20,6 +20,7 @@ defmodule Indexer.Fetcher.RollupL1ReorgMonitor do @modules_can_use_reorg_monitor (case Application.compile_env(:explorer, :chain_type) do :optimism -> [ + Indexer.Fetcher.Optimism.Deposit, Indexer.Fetcher.Optimism.OutputRoot, Indexer.Fetcher.Optimism.TransactionBatch, Indexer.Fetcher.Optimism.WithdrawalEvent diff --git a/apps/indexer/lib/indexer/helper.ex b/apps/indexer/lib/indexer/helper.ex index d01e030d99..60c634ad7d 100644 --- a/apps/indexer/lib/indexer/helper.ex +++ b/apps/indexer/lib/indexer/helper.ex @@ -178,6 +178,21 @@ defmodule Indexer.Helper do repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries) end + @doc """ + Returns a number of attempts for RPC requests sending by indexer modules. + The number is defined by @finite_retries_number attribute. + """ + @spec finite_retries_number() :: non_neg_integer() + def finite_retries_number do + @finite_retries_number + end + + @doc """ + Returns a big number of attempts for RPC requests sending by indexer modules + (simulating an infinite number of attempts). The number is defined by + @infinite_retries_number attribute. + """ + @spec infinite_retries_number() :: non_neg_integer() def infinite_retries_number do @infinite_retries_number end diff --git a/config/runtime.exs b/config/runtime.exs index b440a741fc..1c69447732 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -884,10 +884,14 @@ config :indexer, Indexer.Fetcher.Optimism.WithdrawalEvent.Supervisor, enabled: C config :indexer, Indexer.Fetcher.Optimism, optimism_l1_rpc: System.get_env("INDEXER_OPTIMISM_L1_RPC"), - optimism_l1_system_config: System.get_env("INDEXER_OPTIMISM_L1_SYSTEM_CONFIG_CONTRACT") + optimism_l1_system_config: System.get_env("INDEXER_OPTIMISM_L1_SYSTEM_CONFIG_CONTRACT"), + l1_eth_get_logs_range_size: ConfigHelper.parse_integer_env_var("INDEXER_OPTIMISM_L1_ETH_GET_LOGS_RANGE_SIZE", 250), + l2_eth_get_logs_range_size: ConfigHelper.parse_integer_env_var("INDEXER_OPTIMISM_L2_ETH_GET_LOGS_RANGE_SIZE", 250), + block_duration: ConfigHelper.parse_integer_env_var("INDEXER_OPTIMISM_BLOCK_DURATION", 2), + start_block_l1: ConfigHelper.parse_integer_or_nil_env_var("INDEXER_OPTIMISM_L1_START_BLOCK"), + portal: System.get_env("INDEXER_OPTIMISM_L1_PORTAL_CONTRACT") config :indexer, Indexer.Fetcher.Optimism.Deposit, - batch_size: System.get_env("INDEXER_OPTIMISM_L1_DEPOSITS_BATCH_SIZE"), transaction_type: ConfigHelper.parse_integer_env_var("INDEXER_OPTIMISM_L1_DEPOSITS_TRANSACTION_TYPE", 126) config :indexer, Indexer.Fetcher.Optimism.OutputRoot, @@ -902,7 +906,9 @@ config :indexer, Indexer.Fetcher.Optimism.TransactionBatch, blocks_chunk_size: System.get_env("INDEXER_OPTIMISM_L1_BATCH_BLOCKS_CHUNK_SIZE", "4"), eip4844_blobs_api_url: System.get_env("INDEXER_OPTIMISM_L1_BATCH_BLOCKSCOUT_BLOBS_API_URL", ""), celestia_blobs_api_url: System.get_env("INDEXER_OPTIMISM_L1_BATCH_CELESTIA_BLOBS_API_URL", ""), - genesis_block_l2: ConfigHelper.parse_integer_or_nil_env_var("INDEXER_OPTIMISM_L2_BATCH_GENESIS_BLOCK_NUMBER") + genesis_block_l2: ConfigHelper.parse_integer_or_nil_env_var("INDEXER_OPTIMISM_L2_BATCH_GENESIS_BLOCK_NUMBER"), + inbox: System.get_env("INDEXER_OPTIMISM_L1_BATCH_INBOX"), + submitter: System.get_env("INDEXER_OPTIMISM_L1_BATCH_SUBMITTER") config :indexer, Indexer.Fetcher.Withdrawal.Supervisor, disabled?: System.get_env("INDEXER_DISABLE_WITHDRAWALS_FETCHER", "true") == "true" diff --git a/cspell.json b/cspell.json index e939b1a694..0d89d80635 100644 --- a/cspell.json +++ b/cspell.json @@ -261,6 +261,7 @@ "histoday", "hljs", "Hodl", + "holesky", "HOPR", "httpoison", "hyperledger", diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index f31d65b6ff..c3608980e2 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -284,13 +284,19 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false # WITHDRAWALS_FIRST_BLOCK= # INDEXER_OPTIMISM_L1_RPC= # INDEXER_OPTIMISM_L1_SYSTEM_CONFIG_CONTRACT= +# INDEXER_OPTIMISM_L1_PORTAL_CONTRACT= +# INDEXER_OPTIMISM_L1_START_BLOCK= +# INDEXER_OPTIMISM_L1_BATCH_INBOX= +# INDEXER_OPTIMISM_L1_BATCH_SUBMITTER= # INDEXER_OPTIMISM_L1_BATCH_BLOCKS_CHUNK_SIZE= # INDEXER_OPTIMISM_L2_BATCH_GENESIS_BLOCK_NUMBER= +# INDEXER_OPTIMISM_BLOCK_DURATION= # INDEXER_OPTIMISM_L1_OUTPUT_ORACLE_CONTRACT= # INDEXER_OPTIMISM_L2_WITHDRAWALS_START_BLOCK= # INDEXER_OPTIMISM_L2_MESSAGE_PASSER_CONTRACT= -# INDEXER_OPTIMISM_L1_DEPOSITS_BATCH_SIZE= # INDEXER_OPTIMISM_L1_DEPOSITS_TRANSACTION_TYPE= +# INDEXER_OPTIMISM_L1_ETH_GET_LOGS_RANGE_SIZE= +# INDEXER_OPTIMISM_L2_ETH_GET_LOGS_RANGE_SIZE= # INDEXER_SCROLL_L1_RPC= # INDEXER_SCROLL_L1_MESSENGER_CONTRACT= # INDEXER_SCROLL_L1_MESSENGER_START_BLOCK=