From ce5e2f54e4976e421cdd55de44e4c6d293b58a9a Mon Sep 17 00:00:00 2001 From: varasev <33550681+varasev@users.noreply.github.com> Date: Mon, 11 Mar 2024 14:45:41 +0300 Subject: [PATCH] Support Optimism Ecotone upgrade by Indexer.Fetcher.Optimism.TxnBatch module (#9571) * Draft for Ecotone support by Indexer.Fetcher.Optimism.TxnBatch module * Small refactoring * Use RollupL1ReorgMonitor for Optimism * Move EIP-4844 decode function to Explorer.Chain.Optimism.TxnBatch * Small refactoring * Update changelog * Update cspell.json * Extend logs * Fix init_continue in Indexer.Fetcher.Optimism * mix format --------- Co-authored-by: POA <33550681+poa@users.noreply.github.com> --- CHANGELOG.md | 1 + .../lib/ethereum_jsonrpc/transaction.ex | 4 +- .../lib/explorer/chain/events/publisher.ex | 2 +- .../lib/explorer/chain/events/subscriber.ex | 2 +- .../lib/explorer/chain/optimism/txn_batch.ex | 95 +++++++ .../lib/indexer/fetcher/beacon/blob.ex | 14 +- apps/indexer/lib/indexer/fetcher/optimism.ex | 104 +------- .../indexer/fetcher/optimism/output_root.ex | 10 +- .../lib/indexer/fetcher/optimism/txn_batch.ex | 252 ++++++++++++++---- .../fetcher/optimism/withdrawal_event.ex | 10 +- .../fetcher/rollup_l1_reorg_monitor.ex | 32 ++- apps/indexer/lib/indexer/supervisor.ex | 3 +- config/runtime.exs | 2 +- cspell.json | 2 + 14 files changed, 346 insertions(+), 187 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89d3ef995c..98e77e1aa9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ ### Chore +- [#9571](https://github.com/blockscout/blockscout/pull/9571) - Support Optimism Ecotone upgrade by Indexer.Fetcher.Optimism.TxnBatch module - [#9562](https://github.com/blockscout/blockscout/pull/9562) - Add cancun evm version - [#9260](https://github.com/blockscout/blockscout/pull/9260) - Optimism Delta upgrade support by Indexer.Fetcher.OptimismTxnBatch module - [#8740](https://github.com/blockscout/blockscout/pull/8740) - Add delay to Indexer.Fetcher.OptimismTxnBatch module initialization diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex index 67f172c1a6..b3fb9c55ec 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex @@ -489,9 +489,11 @@ defmodule EthereumJSONRPC.Transaction do ]) "optimism" -> + # we need to put blobVersionedHashes for Indexer.Fetcher.Optimism.TxnBatch module put_if_present(elixir, params, [ {"l1TxOrigin", :l1_tx_origin}, - {"l1BlockNumber", :l1_block_number} + {"l1BlockNumber", :l1_block_number}, + {"blobVersionedHashes", :blob_versioned_hashes} ]) "suave" -> diff --git a/apps/explorer/lib/explorer/chain/events/publisher.ex b/apps/explorer/lib/explorer/chain/events/publisher.ex index 1b45c84c94..d6e4aa52b7 100644 --- a/apps/explorer/lib/explorer/chain/events/publisher.ex +++ b/apps/explorer/lib/explorer/chain/events/publisher.ex @@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Publisher do Publishes events related to the Chain context. """ - @allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits optimism_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a + @allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a def broadcast(_data, false), do: :ok diff --git a/apps/explorer/lib/explorer/chain/events/subscriber.ex b/apps/explorer/lib/explorer/chain/events/subscriber.ex index c161bb124a..0a285f73f5 100644 --- a/apps/explorer/lib/explorer/chain/events/subscriber.ex +++ b/apps/explorer/lib/explorer/chain/events/subscriber.ex @@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Subscriber do Subscribes to events related to the Chain context. """ - @allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits optimism_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a + @allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a @allowed_broadcast_types ~w(catchup realtime on_demand contract_verification_result)a diff --git a/apps/explorer/lib/explorer/chain/optimism/txn_batch.ex b/apps/explorer/lib/explorer/chain/optimism/txn_batch.ex index fea454afd3..7ba61b2f2e 100644 --- a/apps/explorer/lib/explorer/chain/optimism/txn_batch.ex +++ b/apps/explorer/lib/explorer/chain/optimism/txn_batch.ex @@ -12,6 +12,11 @@ defmodule Explorer.Chain.Optimism.TxnBatch do @required_attrs ~w(l2_block_number frame_sequence_id)a + @blob_size 4096 * 32 + @encoding_version 0 + @max_blob_data_size (4 * 31 + 3) * 1024 - 4 + @rounds 1024 + @type t :: %__MODULE__{ l2_block_number: non_neg_integer(), frame_sequence_id: non_neg_integer(), @@ -53,6 +58,96 @@ defmodule Explorer.Chain.Optimism.TxnBatch do |> select_repo(options).all() end + @doc """ + Decodes EIP-4844 blob to the raw data. Returns `nil` if the blob is invalid. + """ + @spec decode_eip4844_blob(binary()) :: binary() | nil + def decode_eip4844_blob(b) do + <> = b + + if version != @encoding_version or output_len > @max_blob_data_size do + raise "Blob version or data size is incorrect" + end + + output = first_output <> :binary.copy(<<0>>, @max_blob_data_size - 27) + + opos = 28 + ipos = 32 + {encoded_byte1, opos, ipos, output} = decode_eip4844_field_element(b, opos, ipos, output) + {encoded_byte2, opos, ipos, output} = decode_eip4844_field_element(b, opos, ipos, output) + {encoded_byte3, opos, ipos, output} = decode_eip4844_field_element(b, opos, ipos, output) + {opos, output} = reassemble_eip4844_bytes(opos, encoded_byte0, encoded_byte1, encoded_byte2, encoded_byte3, output) + + {_opos, ipos, output} = + Enum.reduce_while(Range.new(1, @rounds - 1), {opos, ipos, output}, fn _i, {opos_acc, ipos_acc, output_acc} -> + if opos_acc >= output_len do + {:halt, {opos_acc, ipos_acc, output_acc}} + else + {encoded_byte0, opos_acc, ipos_acc, output_acc} = + decode_eip4844_field_element(b, opos_acc, ipos_acc, output_acc) + + {encoded_byte1, opos_acc, ipos_acc, output_acc} = + decode_eip4844_field_element(b, opos_acc, ipos_acc, output_acc) + + {encoded_byte2, opos_acc, ipos_acc, output_acc} = + decode_eip4844_field_element(b, opos_acc, ipos_acc, output_acc) + + {encoded_byte3, opos_acc, ipos_acc, output_acc} = + decode_eip4844_field_element(b, opos_acc, ipos_acc, output_acc) + + {opos_acc, output_acc} = + reassemble_eip4844_bytes(opos_acc, encoded_byte0, encoded_byte1, encoded_byte2, encoded_byte3, output_acc) + + {:cont, {opos_acc, ipos_acc, output_acc}} + end + end) + + Enum.each(Range.new(output_len, byte_size(output) - 1), fn i -> + <<0>> = binary_part(output, i, 1) + end) + + output = binary_part(output, 0, output_len) + + Enum.each(Range.new(ipos, @blob_size - 1), fn i -> + <<0>> = binary_part(b, i, 1) + end) + + output + rescue + _ -> nil + end + + defp decode_eip4844_field_element(b, opos, ipos, output) do + <<_::binary-size(ipos), ipos_byte::size(8), insert::binary-size(32), _::binary>> = b + + if Bitwise.band(ipos_byte, 0b11000000) == 0 do + <> = output + + {ipos_byte, opos + 32, ipos + 32, output_before_opos <> insert <> rest} + end + end + + defp reassemble_eip4844_bytes(opos, encoded_byte0, encoded_byte1, encoded_byte2, encoded_byte3, output) do + opos = opos - 1 + + x = Bitwise.bor(Bitwise.band(encoded_byte0, 0b00111111), Bitwise.bsl(Bitwise.band(encoded_byte1, 0b00110000), 2)) + y = Bitwise.bor(Bitwise.band(encoded_byte1, 0b00001111), Bitwise.bsl(Bitwise.band(encoded_byte3, 0b00001111), 4)) + z = Bitwise.bor(Bitwise.band(encoded_byte2, 0b00111111), Bitwise.bsl(Bitwise.band(encoded_byte3, 0b00110000), 2)) + + new_output = + output + |> replace_byte(z, opos - 32) + |> replace_byte(y, opos - 32 * 2) + |> replace_byte(x, opos - 32 * 3) + + {opos, new_output} + end + + defp replace_byte(bytes, byte, pos) do + <> = bytes + bytes_before <> <> <> bytes_after + end + defp page_txn_batches(query, %PagingOptions{key: nil}), do: query defp page_txn_batches(query, %PagingOptions{key: {block_number}}) do diff --git a/apps/indexer/lib/indexer/fetcher/beacon/blob.ex b/apps/indexer/lib/indexer/fetcher/beacon/blob.ex index eaa1f7dd09..ba72821108 100644 --- a/apps/indexer/lib/indexer/fetcher/beacon/blob.ex +++ b/apps/indexer/lib/indexer/fetcher/beacon/blob.ex @@ -105,11 +105,15 @@ defmodule Indexer.Fetcher.Beacon.Blob do DateTime.to_unix(block_timestamp) end - defp timestamp_to_slot(block_timestamp, %{ - reference_timestamp: reference_timestamp, - reference_slot: reference_slot, - slot_duration: slot_duration - }) do + @doc """ + Converts block timestamp to the slot number. + """ + @spec timestamp_to_slot(non_neg_integer(), map()) :: non_neg_integer() + def timestamp_to_slot(block_timestamp, %{ + reference_timestamp: reference_timestamp, + reference_slot: reference_slot, + slot_duration: slot_duration + }) do ((block_timestamp - reference_timestamp) |> div(slot_duration)) + reference_slot end diff --git a/apps/indexer/lib/indexer/fetcher/optimism.ex b/apps/indexer/lib/indexer/fetcher/optimism.ex index 62d5fe1cd0..cd367d59f4 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism.ex @@ -20,8 +20,7 @@ defmodule Indexer.Fetcher.Optimism do import Explorer.Helper, only: [parse_integer: 1] alias EthereumJSONRPC.Block.ByNumber - alias Explorer.Chain.Events.{Publisher, Subscriber} - alias Indexer.{BoundQueue, Helper} + alias Indexer.Helper @fetcher_name :optimism @block_check_interval_range_size 100 @@ -46,59 +45,7 @@ defmodule Indexer.Fetcher.Optimism do @impl GenServer def init(_args) do Logger.metadata(fetcher: @fetcher_name) - - modules_using_reorg_monitor = [ - Indexer.Fetcher.Optimism.TxnBatch, - Indexer.Fetcher.Optimism.OutputRoot, - Indexer.Fetcher.Optimism.WithdrawalEvent - ] - - reorg_monitor_not_needed = - modules_using_reorg_monitor - |> Enum.all?(fn module -> - is_nil(Application.get_all_env(:indexer)[module][:start_block_l1]) - end) - - if reorg_monitor_not_needed do - :ignore - else - optimism_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:optimism_l1_rpc] - - json_rpc_named_arguments = json_rpc_named_arguments(optimism_l1_rpc) - - {:ok, %{}, {:continue, json_rpc_named_arguments}} - end - end - - @impl GenServer - def handle_continue(json_rpc_named_arguments, _state) do - {:ok, block_check_interval, _} = get_block_check_interval(json_rpc_named_arguments) - Process.send(self(), :reorg_monitor, []) - - {:noreply, - %{block_check_interval: block_check_interval, json_rpc_named_arguments: json_rpc_named_arguments, prev_latest: 0}} - end - - @impl GenServer - def handle_info( - :reorg_monitor, - %{ - block_check_interval: block_check_interval, - json_rpc_named_arguments: json_rpc_named_arguments, - prev_latest: prev_latest - } = state - ) do - {:ok, latest} = get_block_number_by_tag("latest", json_rpc_named_arguments, Helper.infinite_retries_number()) - - if latest < prev_latest do - Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.") - - Publisher.broadcast([{:optimism_reorg_block, latest}], :realtime) - end - - Process.send_after(self(), :reorg_monitor, block_check_interval) - - {:noreply, %{state | prev_latest: latest}} + :ignore end @doc """ @@ -289,7 +236,8 @@ defmodule Indexer.Fetcher.Optimism do end with {:start_block_l1_undefined, false} <- {:start_block_l1_undefined, is_nil(env[:start_block_l1])}, - {:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(Indexer.Fetcher.Optimism))}, + {:reorg_monitor_started, true} <- + {:reorg_monitor_started, !is_nil(Process.whereis(Indexer.Fetcher.RollupL1ReorgMonitor))}, optimism_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:optimism_l1_rpc], {:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(optimism_l1_rpc)}, {:contract_is_valid, true} <- {:contract_is_valid, Helper.address_correct?(contract_address)}, @@ -305,8 +253,6 @@ defmodule Indexer.Fetcher.Optimism do {:ok, block_check_interval, last_safe_block} <- get_block_check_interval(json_rpc_named_arguments) do start_block = max(start_block_l1, last_l1_block_number) - Subscriber.to(:optimism_reorg_block, :realtime) - Process.send(self(), :continue, []) {:noreply, @@ -361,46 +307,4 @@ defmodule Indexer.Fetcher.Optimism do def repeated_request(req, error_message, json_rpc_named_arguments, retries) do Helper.repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries) end - - def reorg_block_pop(fetcher_name) do - table_name = reorg_table_name(fetcher_name) - - case BoundQueue.pop_front(reorg_queue_get(table_name)) do - {:ok, {block_number, updated_queue}} -> - :ets.insert(table_name, {:queue, updated_queue}) - block_number - - {:error, :empty} -> - nil - end - end - - def reorg_block_push(fetcher_name, block_number) do - table_name = reorg_table_name(fetcher_name) - {:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number) - :ets.insert(table_name, {:queue, updated_queue}) - end - - defp reorg_queue_get(table_name) do - if :ets.whereis(table_name) == :undefined do - :ets.new(table_name, [ - :set, - :named_table, - :public, - read_concurrency: true, - write_concurrency: true - ]) - end - - with info when info != :undefined <- :ets.info(table_name), - [{_, value}] <- :ets.lookup(table_name, :queue) do - value - else - _ -> %BoundQueue{} - end - end - - defp reorg_table_name(fetcher_name) do - :"#{fetcher_name}#{:_reorgs}" - end end diff --git a/apps/indexer/lib/indexer/fetcher/optimism/output_root.ex b/apps/indexer/lib/indexer/fetcher/optimism/output_root.ex index ca35b71a83..cbc21a8ddb 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism/output_root.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism/output_root.ex @@ -14,7 +14,7 @@ defmodule Indexer.Fetcher.Optimism.OutputRoot do alias Explorer.{Chain, Helper, Repo} alias Explorer.Chain.Optimism.OutputRoot - alias Indexer.Fetcher.Optimism + alias Indexer.Fetcher.{Optimism, RollupL1ReorgMonitor} alias Indexer.Helper, as: IndexerHelper @fetcher_name :optimism_output_roots @@ -105,7 +105,7 @@ defmodule Indexer.Fetcher.Optimism.OutputRoot do ) end - reorg_block = Optimism.reorg_block_pop(@fetcher_name) + reorg_block = RollupL1ReorgMonitor.reorg_block_pop(__MODULE__) if !is_nil(reorg_block) && reorg_block > 0 do {deleted_count, _} = Repo.delete_all(from(r in OutputRoot, where: r.l1_block_number >= ^reorg_block)) @@ -136,12 +136,6 @@ defmodule Indexer.Fetcher.Optimism.OutputRoot do {:noreply, %{state | start_block: new_start_block, end_block: new_end_block}} end - @impl GenServer - def handle_info({:chain_event, :optimism_reorg_block, :realtime, block_number}, state) do - Optimism.reorg_block_push(@fetcher_name, block_number) - {:noreply, state} - end - @impl GenServer def handle_info({ref, _result}, state) do Process.demonitor(ref, [:flush]) diff --git a/apps/indexer/lib/indexer/fetcher/optimism/txn_batch.ex b/apps/indexer/lib/indexer/fetcher/optimism/txn_batch.ex index 01bc603cae..8010861c2d 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism/txn_batch.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism/txn_batch.ex @@ -17,11 +17,14 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do alias EthereumJSONRPC.Block.ByHash alias EthereumJSONRPC.Blocks alias Explorer.{Chain, Repo} - alias Explorer.Chain.Block - alias Explorer.Chain.Events.Subscriber + alias Explorer.Chain.Beacon.Blob, as: BeaconBlob + alias Explorer.Chain.{Block, Hash} alias Explorer.Chain.Optimism.FrameSequence alias Explorer.Chain.Optimism.TxnBatch, as: OptimismTxnBatch - alias Indexer.Fetcher.Optimism + alias HTTPoison.Response + alias Indexer.Fetcher.Beacon.Blob + alias Indexer.Fetcher.Beacon.Client, as: BeaconClient + alias Indexer.Fetcher.{Optimism, RollupL1ReorgMonitor} alias Indexer.Helper alias Varint.LEB128 @@ -65,9 +68,10 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do with {:start_block_l1_undefined, false} <- {:start_block_l1_undefined, is_nil(env[:start_block_l1])}, {:genesis_block_l2_invalid, false} <- {:genesis_block_l2_invalid, is_nil(env[:genesis_block_l2]) or env[:genesis_block_l2] < 0}, - {:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(Indexer.Fetcher.Optimism))}, + {:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(RollupL1ReorgMonitor))}, optimism_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:optimism_l1_rpc], {:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(optimism_l1_rpc)}, + {:blobs_api_url_undefined, false} <- {:blobs_api_url_undefined, is_nil(env[:blobs_api_url])}, {:batch_inbox_valid, true} <- {:batch_inbox_valid, Helper.address_correct?(env[:batch_inbox])}, {:batch_submitter_valid, true} <- {:batch_submitter_valid, Helper.address_correct?(env[:batch_submitter])}, start_block_l1 = parse_integer(env[:start_block_l1]), @@ -83,14 +87,13 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do {:ok, block_check_interval, last_safe_block} <- Optimism.get_block_check_interval(json_rpc_named_arguments) do start_block = max(start_block_l1, last_l1_block_number) - Subscriber.to(:optimism_reorg_block, :realtime) - Process.send(self(), :continue, []) {:noreply, %{ batch_inbox: String.downcase(env[:batch_inbox]), batch_submitter: String.downcase(env[:batch_submitter]), + blobs_api_url: String.trim_trailing(env[:blobs_api_url], "/"), block_check_interval: block_check_interval, start_block: start_block, end_block: last_safe_block, @@ -110,13 +113,20 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do {:stop, :normal, state} {:reorg_monitor_started, false} -> - Logger.error("Cannot start this process as reorg monitor in Indexer.Fetcher.Optimism is not started.") + Logger.error( + "Cannot start this process as reorg monitor in Indexer.Fetcher.RollupL1ReorgMonitor is not started." + ) + {:stop, :normal, state} {:rpc_l1_undefined, true} -> Logger.error("L1 RPC URL is not defined.") {:stop, :normal, state} + {:blobs_api_url_undefined, true} -> + Logger.error("L1 Blockscout Blobs API URL is not defined.") + {:stop, :normal, state} + {:batch_inbox_valid, false} -> Logger.error("Batch Inbox address is invalid or not defined.") {:stop, :normal, state} @@ -157,6 +167,7 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do %{ batch_inbox: batch_inbox, batch_submitter: batch_submitter, + blobs_api_url: blobs_api_url, block_check_interval: block_check_interval, start_block: start_block, end_block: end_block, @@ -189,8 +200,8 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do batch_submitter, genesis_block_l2, incomplete_channels_acc, - json_rpc_named_arguments, - json_rpc_named_arguments_l2, + {json_rpc_named_arguments, json_rpc_named_arguments_l2}, + blobs_api_url, Helper.infinite_retries_number() ) @@ -217,7 +228,7 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do incomplete_channels_acc end - reorg_block = Optimism.reorg_block_pop(@fetcher_name) + reorg_block = RollupL1ReorgMonitor.reorg_block_pop(__MODULE__) if !is_nil(reorg_block) && reorg_block > 0 do new_incomplete_channels = handle_l1_reorg(reorg_block, new_incomplete_channels) @@ -251,12 +262,6 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do }} end - @impl GenServer - def handle_info({:chain_event, :optimism_reorg_block, :realtime, block_number}, state) do - Optimism.reorg_block_push(@fetcher_name, block_number) - {:noreply, state} - end - @impl GenServer def handle_info({ref, _result}, state) do Process.demonitor(ref, [:flush]) @@ -356,8 +361,8 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do batch_submitter, genesis_block_l2, incomplete_channels, - json_rpc_named_arguments, - json_rpc_named_arguments_l2, + {json_rpc_named_arguments, json_rpc_named_arguments_l2}, + blobs_api_url, retries_left ) do case fetch_blocks_by_range(block_range, json_rpc_named_arguments) do @@ -368,7 +373,8 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do blocks_params, genesis_block_l2, incomplete_channels, - json_rpc_named_arguments_l2 + json_rpc_named_arguments_l2, + blobs_api_url ) {_, message_or_errors} -> @@ -395,64 +401,178 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do batch_submitter, genesis_block_l2, incomplete_channels, - json_rpc_named_arguments, - json_rpc_named_arguments_l2, + {json_rpc_named_arguments, json_rpc_named_arguments_l2}, + blobs_api_url, retries_left ) end end end + defp blobs_to_input(transaction_hash, blob_versioned_hashes, block_timestamp, blobs_api_url) do + Enum.reduce(blob_versioned_hashes, <<>>, fn blob_hash, acc -> + with {:ok, response} <- http_get_request(blobs_api_url <> "/" <> blob_hash), + blob_data = Map.get(response, "blob_data"), + false <- is_nil(blob_data) do + # read the data from Blockscout API + decoded = + blob_data + |> String.trim_leading("0x") + |> Base.decode16!(case: :lower) + |> OptimismTxnBatch.decode_eip4844_blob() + + if is_nil(decoded) do + Logger.warning("Cannot decode the blob #{blob_hash} taken from the Blockscout Blobs API.") + acc + else + Logger.info("The input for transaction #{transaction_hash} is taken from the Blockscout Blobs API.") + acc <> decoded + end + else + _ -> + # read the data from the fallback source (beacon node) + + beacon_config = + :indexer + |> Application.get_env(Blob) + |> Keyword.take([:reference_slot, :reference_timestamp, :slot_duration]) + |> Enum.into(%{}) + + try do + {:ok, fetched_blobs} = + block_timestamp + |> DateTime.to_unix() + |> Blob.timestamp_to_slot(beacon_config) + |> BeaconClient.get_blob_sidecars() + + blobs = Map.get(fetched_blobs, "data", []) + + if Enum.empty?(blobs) do + raise "Empty data" + end + + decoded_blob_data = + blobs + |> Enum.find(fn b -> + b + |> Map.get("kzg_commitment", "0x") + |> String.trim_leading("0x") + |> Base.decode16!(case: :lower) + |> BeaconBlob.hash() + |> Hash.to_string() + |> Kernel.==(blob_hash) + end) + |> Map.get("blob") + |> String.trim_leading("0x") + |> Base.decode16!(case: :lower) + |> OptimismTxnBatch.decode_eip4844_blob() + + if is_nil(decoded_blob_data) do + raise "Invalid blob" + else + Logger.info("The input for transaction #{transaction_hash} is taken from the Beacon Node.") + acc <> decoded_blob_data + end + rescue + reason -> + Logger.warning( + "Cannot decode the blob #{blob_hash} taken from the Beacon Node. Reason: #{inspect(reason)}" + ) + + acc + end + end + end) + end + defp get_txn_batches_inner( transactions_filtered, blocks_params, genesis_block_l2, incomplete_channels, - json_rpc_named_arguments_l2 + json_rpc_named_arguments_l2, + blobs_api_url ) do transactions_filtered - |> Enum.reduce({:ok, incomplete_channels, [], []}, fn t, {_, incomplete_channels_acc, batches_acc, sequences_acc} -> - frame = input_to_frame(t.input) - - channel = Map.get(incomplete_channels_acc, frame.channel_id, %{frames: %{}}) - - channel_frames = - Map.put(channel.frames, frame.number, %{ - data: frame.data, - is_last: frame.is_last, - block_number: t.block_number, - tx_hash: t.hash - }) - - l1_timestamp = - if frame.is_last do - get_block_timestamp_by_number(t.block_number, blocks_params) + |> Enum.reduce({:ok, incomplete_channels, [], []}, fn tx, + {_, incomplete_channels_acc, batches_acc, sequences_acc} -> + input = + if tx.type == 3 do + # this is EIP-4844 transaction, so we get the input from the blobs + block_timestamp = get_block_timestamp_by_number(tx.block_number, blocks_params) + blobs_to_input(tx.hash, tx.blob_versioned_hashes, block_timestamp, blobs_api_url) else - Map.get(channel, :l1_timestamp) + tx.input end - channel = - channel - |> Map.put_new(:id, frame.channel_id) - |> Map.put(:frames, channel_frames) - |> Map.put(:timestamp, DateTime.utc_now()) - |> Map.put(:l1_timestamp, l1_timestamp) - - if channel_complete?(channel) do - handle_channel( - channel, + if tx.type == 3 and input == <<>> do + # skip this transaction as we cannot find or read its blobs + {:ok, incomplete_channels_acc, batches_acc, sequences_acc} + else + handle_input( + input, + tx, + blocks_params, incomplete_channels_acc, batches_acc, sequences_acc, genesis_block_l2, json_rpc_named_arguments_l2 ) - else - {:ok, Map.put(incomplete_channels_acc, frame.channel_id, channel), batches_acc, sequences_acc} end end) end + defp handle_input( + input, + tx, + blocks_params, + incomplete_channels_acc, + batches_acc, + sequences_acc, + genesis_block_l2, + json_rpc_named_arguments_l2 + ) do + frame = input_to_frame(input) + + channel = Map.get(incomplete_channels_acc, frame.channel_id, %{frames: %{}}) + + channel_frames = + Map.put(channel.frames, frame.number, %{ + data: frame.data, + is_last: frame.is_last, + block_number: tx.block_number, + tx_hash: tx.hash + }) + + l1_timestamp = + if frame.is_last do + get_block_timestamp_by_number(tx.block_number, blocks_params) + else + Map.get(channel, :l1_timestamp) + end + + channel_updated = + channel + |> Map.put_new(:id, frame.channel_id) + |> Map.put(:frames, channel_frames) + |> Map.put(:timestamp, DateTime.utc_now()) + |> Map.put(:l1_timestamp, l1_timestamp) + + if channel_complete?(channel_updated) do + handle_channel( + channel_updated, + incomplete_channels_acc, + batches_acc, + sequences_acc, + genesis_block_l2, + json_rpc_named_arguments_l2 + ) + else + {:ok, Map.put(incomplete_channels_acc, frame.channel_id, channel_updated), batches_acc, sequences_acc} + end + end + defp handle_channel( channel, incomplete_channels_acc, @@ -546,6 +666,30 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do end end + defp http_get_request(url) do + case Application.get_env(:explorer, :http_adapter).get(url) do + {:ok, %Response{body: body, status_code: 200}} -> + Jason.decode(body) + + {:ok, %Response{body: body, status_code: _}} -> + {:error, body} + + {:error, error} -> + old_truncate = Application.get_env(:logger, :truncate) + Logger.configure(truncate: :infinity) + + Logger.error(fn -> + [ + "Error while sending request to Blockscout Blobs API: #{url}: ", + inspect(error, limit: :infinity, printable_limit: :infinity) + ] + end) + + Logger.configure(truncate: old_truncate) + {:error, "Error while sending request to Blockscout Blobs API"} + end + end + defp channel_complete?(channel) do last_frame_number = channel.frames @@ -568,8 +712,12 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do end defp input_to_frame("0x" <> input) do - input_binary = Base.decode16!(input, case: :mixed) + input + |> Base.decode16!(case: :mixed) + |> input_to_frame() + end + defp input_to_frame(input_binary) do # the structure of the input is as follows: # # input = derivation_version ++ channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last diff --git a/apps/indexer/lib/indexer/fetcher/optimism/withdrawal_event.ex b/apps/indexer/lib/indexer/fetcher/optimism/withdrawal_event.ex index 8854f868ba..fdd8bf04d8 100644 --- a/apps/indexer/lib/indexer/fetcher/optimism/withdrawal_event.ex +++ b/apps/indexer/lib/indexer/fetcher/optimism/withdrawal_event.ex @@ -16,7 +16,7 @@ defmodule Indexer.Fetcher.Optimism.WithdrawalEvent do alias EthereumJSONRPC.Blocks alias Explorer.{Chain, Repo} alias Explorer.Chain.Optimism.WithdrawalEvent - alias Indexer.Fetcher.Optimism + alias Indexer.Fetcher.{Optimism, RollupL1ReorgMonitor} alias Indexer.Helper @fetcher_name :optimism_withdrawal_events @@ -111,7 +111,7 @@ defmodule Indexer.Fetcher.Optimism.WithdrawalEvent do ) end - reorg_block = Optimism.reorg_block_pop(@fetcher_name) + reorg_block = RollupL1ReorgMonitor.reorg_block_pop(__MODULE__) if !is_nil(reorg_block) && reorg_block > 0 do {deleted_count, _} = Repo.delete_all(from(we in WithdrawalEvent, where: we.l1_block_number >= ^reorg_block)) @@ -142,12 +142,6 @@ defmodule Indexer.Fetcher.Optimism.WithdrawalEvent do {:noreply, %{state | start_block: new_start_block, end_block: new_end_block}} end - @impl GenServer - def handle_info({:chain_event, :optimism_reorg_block, :realtime, block_number}, state) do - Optimism.reorg_block_push(@fetcher_name, block_number) - {:noreply, state} - end - @impl GenServer def handle_info({ref, _result}, state) do Process.demonitor(ref, [:flush]) diff --git a/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex b/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex index b9a5e8e4ec..5da08bf1d7 100644 --- a/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex +++ b/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex @@ -32,6 +32,9 @@ defmodule Indexer.Fetcher.RollupL1ReorgMonitor do Logger.metadata(fetcher: @fetcher_name) modules_can_use_reorg_monitor = [ + Indexer.Fetcher.Optimism.OutputRoot, + Indexer.Fetcher.Optimism.TxnBatch, + Indexer.Fetcher.Optimism.WithdrawalEvent, Indexer.Fetcher.PolygonEdge.Deposit, Indexer.Fetcher.PolygonEdge.WithdrawalExit, Indexer.Fetcher.PolygonZkevm.BridgeL1, @@ -56,14 +59,27 @@ defmodule Indexer.Fetcher.RollupL1ReorgMonitor do module_using_reorg_monitor = Enum.at(modules_using_reorg_monitor, 0) l1_rpc = - if Enum.member?( - [Indexer.Fetcher.PolygonEdge.Deposit, Indexer.Fetcher.PolygonEdge.WithdrawalExit], - module_using_reorg_monitor - ) do - # there can be more than one PolygonEdge.* modules, so we get the common L1 RPC URL for them from Indexer.Fetcher.PolygonEdge - Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc] - else - Application.get_all_env(:indexer)[module_using_reorg_monitor][:rpc] + cond do + Enum.member?( + [Indexer.Fetcher.PolygonEdge.Deposit, Indexer.Fetcher.PolygonEdge.WithdrawalExit], + module_using_reorg_monitor + ) -> + # there can be more than one PolygonEdge.* modules, so we get the common L1 RPC URL for them from Indexer.Fetcher.PolygonEdge + Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc] + + Enum.member?( + [ + Indexer.Fetcher.Optimism.OutputRoot, + Indexer.Fetcher.Optimism.TxnBatch, + Indexer.Fetcher.Optimism.WithdrawalEvent + ], + module_using_reorg_monitor + ) -> + # there can be more than one Optimism.* modules, so we get the common L1 RPC URL for them from Indexer.Fetcher.Optimism + Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:optimism_l1_rpc] + + true -> + Application.get_all_env(:indexer)[module_using_reorg_monitor][:rpc] end json_rpc_named_arguments = Helper.json_rpc_named_arguments(l1_rpc) diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index 6d585a2f74..e46927a0e0 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -138,7 +138,7 @@ defmodule Indexer.Supervisor do {TokenUpdater.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {ReplacedTransaction.Supervisor, [[memory_monitor: memory_monitor]]}, - configure(Indexer.Fetcher.Optimism.Supervisor, [[memory_monitor: memory_monitor]]), + {Indexer.Fetcher.RollupL1ReorgMonitor.Supervisor, [[memory_monitor: memory_monitor]]}, configure( Indexer.Fetcher.Optimism.TxnBatch.Supervisor, [[memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments]] @@ -150,7 +150,6 @@ defmodule Indexer.Supervisor do [[memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments]] ), configure(Indexer.Fetcher.Optimism.WithdrawalEvent.Supervisor, [[memory_monitor: memory_monitor]]), - {Indexer.Fetcher.RollupL1ReorgMonitor.Supervisor, [[memory_monitor: memory_monitor]]}, configure(Indexer.Fetcher.PolygonEdge.Deposit.Supervisor, [[memory_monitor: memory_monitor]]), configure(Indexer.Fetcher.PolygonEdge.DepositExecute.Supervisor, [ [memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments] diff --git a/config/runtime.exs b/config/runtime.exs index cefab56efc..9760b03005 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -682,7 +682,6 @@ config :indexer, Indexer.Fetcher.CoinBalance.Realtime, batch_size: coin_balances_batch_size, concurrency: coin_balances_concurrency -config :indexer, Indexer.Fetcher.Optimism.Supervisor, enabled: ConfigHelper.chain_type() == "optimism" config :indexer, Indexer.Fetcher.Optimism.TxnBatch.Supervisor, enabled: ConfigHelper.chain_type() == "optimism" config :indexer, Indexer.Fetcher.Optimism.OutputRoot.Supervisor, enabled: ConfigHelper.chain_type() == "optimism" config :indexer, Indexer.Fetcher.Optimism.Deposit.Supervisor, enabled: ConfigHelper.chain_type() == "optimism" @@ -713,6 +712,7 @@ config :indexer, Indexer.Fetcher.Optimism.TxnBatch, batch_inbox: System.get_env("INDEXER_OPTIMISM_L1_BATCH_INBOX"), batch_submitter: System.get_env("INDEXER_OPTIMISM_L1_BATCH_SUBMITTER"), blocks_chunk_size: System.get_env("INDEXER_OPTIMISM_L1_BATCH_BLOCKS_CHUNK_SIZE", "4"), + blobs_api_url: System.get_env("INDEXER_OPTIMISM_L1_BATCH_BLOCKSCOUT_BLOBS_API_URL"), genesis_block_l2: ConfigHelper.parse_integer_or_nil_env_var("INDEXER_OPTIMISM_L2_BATCH_GENESIS_BLOCK_NUMBER") config :indexer, Indexer.Fetcher.Withdrawal.Supervisor, diff --git a/cspell.json b/cspell.json index fa02ef75e0..d4a8e9fd81 100644 --- a/cspell.json +++ b/cspell.json @@ -233,6 +233,7 @@ "invalidstart", "inversed", "ipfs", + "ipos", "itxs", "johnnny", "jsons", @@ -332,6 +333,7 @@ "onclick", "onconnect", "ondisconnect", + "opos", "outcoming", "overengineering", "pawesome",