Support Optimism Ecotone upgrade by Indexer.Fetcher.Optimism.TxnBatch module (#9571)

* Draft for Ecotone support by Indexer.Fetcher.Optimism.TxnBatch module

* Small refactoring

* Use RollupL1ReorgMonitor for Optimism

* Move EIP-4844 decode function to Explorer.Chain.Optimism.TxnBatch

* Small refactoring

* Update changelog

* Update cspell.json

* Extend logs

* Fix init_continue in Indexer.Fetcher.Optimism

* mix format

---------

Co-authored-by: POA <33550681+poa@users.noreply.github.com>
pull/9604/head
varasev 9 months ago committed by GitHub
parent 3ca1455ffd
commit ce5e2f54e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 4
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex
  3. 2
      apps/explorer/lib/explorer/chain/events/publisher.ex
  4. 2
      apps/explorer/lib/explorer/chain/events/subscriber.ex
  5. 95
      apps/explorer/lib/explorer/chain/optimism/txn_batch.ex
  6. 14
      apps/indexer/lib/indexer/fetcher/beacon/blob.ex
  7. 104
      apps/indexer/lib/indexer/fetcher/optimism.ex
  8. 10
      apps/indexer/lib/indexer/fetcher/optimism/output_root.ex
  9. 252
      apps/indexer/lib/indexer/fetcher/optimism/txn_batch.ex
  10. 10
      apps/indexer/lib/indexer/fetcher/optimism/withdrawal_event.ex
  11. 32
      apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex
  12. 3
      apps/indexer/lib/indexer/supervisor.ex
  13. 2
      config/runtime.exs
  14. 2
      cspell.json

@ -38,6 +38,7 @@
### Chore
- [#9571](https://github.com/blockscout/blockscout/pull/9571) - Support Optimism Ecotone upgrade by Indexer.Fetcher.Optimism.TxnBatch module
- [#9562](https://github.com/blockscout/blockscout/pull/9562) - Add cancun evm version
- [#9260](https://github.com/blockscout/blockscout/pull/9260) - Optimism Delta upgrade support by Indexer.Fetcher.OptimismTxnBatch module
- [#8740](https://github.com/blockscout/blockscout/pull/8740) - Add delay to Indexer.Fetcher.OptimismTxnBatch module initialization

@ -489,9 +489,11 @@ defmodule EthereumJSONRPC.Transaction do
])
"optimism" ->
# we need to put blobVersionedHashes for Indexer.Fetcher.Optimism.TxnBatch module
put_if_present(elixir, params, [
{"l1TxOrigin", :l1_tx_origin},
{"l1BlockNumber", :l1_block_number}
{"l1BlockNumber", :l1_block_number},
{"blobVersionedHashes", :blob_versioned_hashes}
])
"suave" ->

@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Publisher do
Publishes events related to the Chain context.
"""
@allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits optimism_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
def broadcast(_data, false), do: :ok

@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Subscriber do
Subscribes to events related to the Chain context.
"""
@allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits optimism_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@allowed_broadcast_types ~w(catchup realtime on_demand contract_verification_result)a

@ -12,6 +12,11 @@ defmodule Explorer.Chain.Optimism.TxnBatch do
@required_attrs ~w(l2_block_number frame_sequence_id)a
@blob_size 4096 * 32
@encoding_version 0
@max_blob_data_size (4 * 31 + 3) * 1024 - 4
@rounds 1024
@type t :: %__MODULE__{
l2_block_number: non_neg_integer(),
frame_sequence_id: non_neg_integer(),
@ -53,6 +58,96 @@ defmodule Explorer.Chain.Optimism.TxnBatch do
|> select_repo(options).all()
end
@doc """
Decodes EIP-4844 blob to the raw data. Returns `nil` if the blob is invalid.
"""
@spec decode_eip4844_blob(binary()) :: binary() | nil
def decode_eip4844_blob(b) do
<<encoded_byte0::size(8), version::size(8), output_len::size(24), first_output::binary-size(27), _::binary>> = b
if version != @encoding_version or output_len > @max_blob_data_size do
raise "Blob version or data size is incorrect"
end
output = first_output <> :binary.copy(<<0>>, @max_blob_data_size - 27)
opos = 28
ipos = 32
{encoded_byte1, opos, ipos, output} = decode_eip4844_field_element(b, opos, ipos, output)
{encoded_byte2, opos, ipos, output} = decode_eip4844_field_element(b, opos, ipos, output)
{encoded_byte3, opos, ipos, output} = decode_eip4844_field_element(b, opos, ipos, output)
{opos, output} = reassemble_eip4844_bytes(opos, encoded_byte0, encoded_byte1, encoded_byte2, encoded_byte3, output)
{_opos, ipos, output} =
Enum.reduce_while(Range.new(1, @rounds - 1), {opos, ipos, output}, fn _i, {opos_acc, ipos_acc, output_acc} ->
if opos_acc >= output_len do
{:halt, {opos_acc, ipos_acc, output_acc}}
else
{encoded_byte0, opos_acc, ipos_acc, output_acc} =
decode_eip4844_field_element(b, opos_acc, ipos_acc, output_acc)
{encoded_byte1, opos_acc, ipos_acc, output_acc} =
decode_eip4844_field_element(b, opos_acc, ipos_acc, output_acc)
{encoded_byte2, opos_acc, ipos_acc, output_acc} =
decode_eip4844_field_element(b, opos_acc, ipos_acc, output_acc)
{encoded_byte3, opos_acc, ipos_acc, output_acc} =
decode_eip4844_field_element(b, opos_acc, ipos_acc, output_acc)
{opos_acc, output_acc} =
reassemble_eip4844_bytes(opos_acc, encoded_byte0, encoded_byte1, encoded_byte2, encoded_byte3, output_acc)
{:cont, {opos_acc, ipos_acc, output_acc}}
end
end)
Enum.each(Range.new(output_len, byte_size(output) - 1), fn i ->
<<0>> = binary_part(output, i, 1)
end)
output = binary_part(output, 0, output_len)
Enum.each(Range.new(ipos, @blob_size - 1), fn i ->
<<0>> = binary_part(b, i, 1)
end)
output
rescue
_ -> nil
end
defp decode_eip4844_field_element(b, opos, ipos, output) do
<<_::binary-size(ipos), ipos_byte::size(8), insert::binary-size(32), _::binary>> = b
if Bitwise.band(ipos_byte, 0b11000000) == 0 do
<<output_before_opos::binary-size(opos), _::binary-size(32), rest::binary>> = output
{ipos_byte, opos + 32, ipos + 32, output_before_opos <> insert <> rest}
end
end
defp reassemble_eip4844_bytes(opos, encoded_byte0, encoded_byte1, encoded_byte2, encoded_byte3, output) do
opos = opos - 1
x = Bitwise.bor(Bitwise.band(encoded_byte0, 0b00111111), Bitwise.bsl(Bitwise.band(encoded_byte1, 0b00110000), 2))
y = Bitwise.bor(Bitwise.band(encoded_byte1, 0b00001111), Bitwise.bsl(Bitwise.band(encoded_byte3, 0b00001111), 4))
z = Bitwise.bor(Bitwise.band(encoded_byte2, 0b00111111), Bitwise.bsl(Bitwise.band(encoded_byte3, 0b00110000), 2))
new_output =
output
|> replace_byte(z, opos - 32)
|> replace_byte(y, opos - 32 * 2)
|> replace_byte(x, opos - 32 * 3)
{opos, new_output}
end
defp replace_byte(bytes, byte, pos) do
<<bytes_before::binary-size(pos), _::size(8), bytes_after::binary>> = bytes
bytes_before <> <<byte>> <> bytes_after
end
defp page_txn_batches(query, %PagingOptions{key: nil}), do: query
defp page_txn_batches(query, %PagingOptions{key: {block_number}}) do

@ -105,11 +105,15 @@ defmodule Indexer.Fetcher.Beacon.Blob do
DateTime.to_unix(block_timestamp)
end
defp timestamp_to_slot(block_timestamp, %{
reference_timestamp: reference_timestamp,
reference_slot: reference_slot,
slot_duration: slot_duration
}) do
@doc """
Converts block timestamp to the slot number.
"""
@spec timestamp_to_slot(non_neg_integer(), map()) :: non_neg_integer()
def timestamp_to_slot(block_timestamp, %{
reference_timestamp: reference_timestamp,
reference_slot: reference_slot,
slot_duration: slot_duration
}) do
((block_timestamp - reference_timestamp) |> div(slot_duration)) + reference_slot
end

@ -20,8 +20,7 @@ defmodule Indexer.Fetcher.Optimism do
import Explorer.Helper, only: [parse_integer: 1]
alias EthereumJSONRPC.Block.ByNumber
alias Explorer.Chain.Events.{Publisher, Subscriber}
alias Indexer.{BoundQueue, Helper}
alias Indexer.Helper
@fetcher_name :optimism
@block_check_interval_range_size 100
@ -46,59 +45,7 @@ defmodule Indexer.Fetcher.Optimism do
@impl GenServer
def init(_args) do
Logger.metadata(fetcher: @fetcher_name)
modules_using_reorg_monitor = [
Indexer.Fetcher.Optimism.TxnBatch,
Indexer.Fetcher.Optimism.OutputRoot,
Indexer.Fetcher.Optimism.WithdrawalEvent
]
reorg_monitor_not_needed =
modules_using_reorg_monitor
|> Enum.all?(fn module ->
is_nil(Application.get_all_env(:indexer)[module][:start_block_l1])
end)
if reorg_monitor_not_needed do
:ignore
else
optimism_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:optimism_l1_rpc]
json_rpc_named_arguments = json_rpc_named_arguments(optimism_l1_rpc)
{:ok, %{}, {:continue, json_rpc_named_arguments}}
end
end
@impl GenServer
def handle_continue(json_rpc_named_arguments, _state) do
{:ok, block_check_interval, _} = get_block_check_interval(json_rpc_named_arguments)
Process.send(self(), :reorg_monitor, [])
{:noreply,
%{block_check_interval: block_check_interval, json_rpc_named_arguments: json_rpc_named_arguments, prev_latest: 0}}
end
@impl GenServer
def handle_info(
:reorg_monitor,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
prev_latest: prev_latest
} = state
) do
{:ok, latest} = get_block_number_by_tag("latest", json_rpc_named_arguments, Helper.infinite_retries_number())
if latest < prev_latest do
Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.")
Publisher.broadcast([{:optimism_reorg_block, latest}], :realtime)
end
Process.send_after(self(), :reorg_monitor, block_check_interval)
{:noreply, %{state | prev_latest: latest}}
:ignore
end
@doc """
@ -289,7 +236,8 @@ defmodule Indexer.Fetcher.Optimism do
end
with {:start_block_l1_undefined, false} <- {:start_block_l1_undefined, is_nil(env[:start_block_l1])},
{:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(Indexer.Fetcher.Optimism))},
{:reorg_monitor_started, true} <-
{:reorg_monitor_started, !is_nil(Process.whereis(Indexer.Fetcher.RollupL1ReorgMonitor))},
optimism_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:optimism_l1_rpc],
{:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(optimism_l1_rpc)},
{:contract_is_valid, true} <- {:contract_is_valid, Helper.address_correct?(contract_address)},
@ -305,8 +253,6 @@ defmodule Indexer.Fetcher.Optimism do
{:ok, block_check_interval, last_safe_block} <- get_block_check_interval(json_rpc_named_arguments) do
start_block = max(start_block_l1, last_l1_block_number)
Subscriber.to(:optimism_reorg_block, :realtime)
Process.send(self(), :continue, [])
{:noreply,
@ -361,46 +307,4 @@ defmodule Indexer.Fetcher.Optimism do
def repeated_request(req, error_message, json_rpc_named_arguments, retries) do
Helper.repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries)
end
def reorg_block_pop(fetcher_name) do
table_name = reorg_table_name(fetcher_name)
case BoundQueue.pop_front(reorg_queue_get(table_name)) do
{:ok, {block_number, updated_queue}} ->
:ets.insert(table_name, {:queue, updated_queue})
block_number
{:error, :empty} ->
nil
end
end
def reorg_block_push(fetcher_name, block_number) do
table_name = reorg_table_name(fetcher_name)
{:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number)
:ets.insert(table_name, {:queue, updated_queue})
end
defp reorg_queue_get(table_name) do
if :ets.whereis(table_name) == :undefined do
:ets.new(table_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
with info when info != :undefined <- :ets.info(table_name),
[{_, value}] <- :ets.lookup(table_name, :queue) do
value
else
_ -> %BoundQueue{}
end
end
defp reorg_table_name(fetcher_name) do
:"#{fetcher_name}#{:_reorgs}"
end
end

@ -14,7 +14,7 @@ defmodule Indexer.Fetcher.Optimism.OutputRoot do
alias Explorer.{Chain, Helper, Repo}
alias Explorer.Chain.Optimism.OutputRoot
alias Indexer.Fetcher.Optimism
alias Indexer.Fetcher.{Optimism, RollupL1ReorgMonitor}
alias Indexer.Helper, as: IndexerHelper
@fetcher_name :optimism_output_roots
@ -105,7 +105,7 @@ defmodule Indexer.Fetcher.Optimism.OutputRoot do
)
end
reorg_block = Optimism.reorg_block_pop(@fetcher_name)
reorg_block = RollupL1ReorgMonitor.reorg_block_pop(__MODULE__)
if !is_nil(reorg_block) && reorg_block > 0 do
{deleted_count, _} = Repo.delete_all(from(r in OutputRoot, where: r.l1_block_number >= ^reorg_block))
@ -136,12 +136,6 @@ defmodule Indexer.Fetcher.Optimism.OutputRoot do
{:noreply, %{state | start_block: new_start_block, end_block: new_end_block}}
end
@impl GenServer
def handle_info({:chain_event, :optimism_reorg_block, :realtime, block_number}, state) do
Optimism.reorg_block_push(@fetcher_name, block_number)
{:noreply, state}
end
@impl GenServer
def handle_info({ref, _result}, state) do
Process.demonitor(ref, [:flush])

@ -17,11 +17,14 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
alias EthereumJSONRPC.Block.ByHash
alias EthereumJSONRPC.Blocks
alias Explorer.{Chain, Repo}
alias Explorer.Chain.Block
alias Explorer.Chain.Events.Subscriber
alias Explorer.Chain.Beacon.Blob, as: BeaconBlob
alias Explorer.Chain.{Block, Hash}
alias Explorer.Chain.Optimism.FrameSequence
alias Explorer.Chain.Optimism.TxnBatch, as: OptimismTxnBatch
alias Indexer.Fetcher.Optimism
alias HTTPoison.Response
alias Indexer.Fetcher.Beacon.Blob
alias Indexer.Fetcher.Beacon.Client, as: BeaconClient
alias Indexer.Fetcher.{Optimism, RollupL1ReorgMonitor}
alias Indexer.Helper
alias Varint.LEB128
@ -65,9 +68,10 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
with {:start_block_l1_undefined, false} <- {:start_block_l1_undefined, is_nil(env[:start_block_l1])},
{:genesis_block_l2_invalid, false} <-
{:genesis_block_l2_invalid, is_nil(env[:genesis_block_l2]) or env[:genesis_block_l2] < 0},
{:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(Indexer.Fetcher.Optimism))},
{:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(RollupL1ReorgMonitor))},
optimism_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:optimism_l1_rpc],
{:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(optimism_l1_rpc)},
{:blobs_api_url_undefined, false} <- {:blobs_api_url_undefined, is_nil(env[:blobs_api_url])},
{:batch_inbox_valid, true} <- {:batch_inbox_valid, Helper.address_correct?(env[:batch_inbox])},
{:batch_submitter_valid, true} <- {:batch_submitter_valid, Helper.address_correct?(env[:batch_submitter])},
start_block_l1 = parse_integer(env[:start_block_l1]),
@ -83,14 +87,13 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
{:ok, block_check_interval, last_safe_block} <- Optimism.get_block_check_interval(json_rpc_named_arguments) do
start_block = max(start_block_l1, last_l1_block_number)
Subscriber.to(:optimism_reorg_block, :realtime)
Process.send(self(), :continue, [])
{:noreply,
%{
batch_inbox: String.downcase(env[:batch_inbox]),
batch_submitter: String.downcase(env[:batch_submitter]),
blobs_api_url: String.trim_trailing(env[:blobs_api_url], "/"),
block_check_interval: block_check_interval,
start_block: start_block,
end_block: last_safe_block,
@ -110,13 +113,20 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
{:stop, :normal, state}
{:reorg_monitor_started, false} ->
Logger.error("Cannot start this process as reorg monitor in Indexer.Fetcher.Optimism is not started.")
Logger.error(
"Cannot start this process as reorg monitor in Indexer.Fetcher.RollupL1ReorgMonitor is not started."
)
{:stop, :normal, state}
{:rpc_l1_undefined, true} ->
Logger.error("L1 RPC URL is not defined.")
{:stop, :normal, state}
{:blobs_api_url_undefined, true} ->
Logger.error("L1 Blockscout Blobs API URL is not defined.")
{:stop, :normal, state}
{:batch_inbox_valid, false} ->
Logger.error("Batch Inbox address is invalid or not defined.")
{:stop, :normal, state}
@ -157,6 +167,7 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
%{
batch_inbox: batch_inbox,
batch_submitter: batch_submitter,
blobs_api_url: blobs_api_url,
block_check_interval: block_check_interval,
start_block: start_block,
end_block: end_block,
@ -189,8 +200,8 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
batch_submitter,
genesis_block_l2,
incomplete_channels_acc,
json_rpc_named_arguments,
json_rpc_named_arguments_l2,
{json_rpc_named_arguments, json_rpc_named_arguments_l2},
blobs_api_url,
Helper.infinite_retries_number()
)
@ -217,7 +228,7 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
incomplete_channels_acc
end
reorg_block = Optimism.reorg_block_pop(@fetcher_name)
reorg_block = RollupL1ReorgMonitor.reorg_block_pop(__MODULE__)
if !is_nil(reorg_block) && reorg_block > 0 do
new_incomplete_channels = handle_l1_reorg(reorg_block, new_incomplete_channels)
@ -251,12 +262,6 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
}}
end
@impl GenServer
def handle_info({:chain_event, :optimism_reorg_block, :realtime, block_number}, state) do
Optimism.reorg_block_push(@fetcher_name, block_number)
{:noreply, state}
end
@impl GenServer
def handle_info({ref, _result}, state) do
Process.demonitor(ref, [:flush])
@ -356,8 +361,8 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
batch_submitter,
genesis_block_l2,
incomplete_channels,
json_rpc_named_arguments,
json_rpc_named_arguments_l2,
{json_rpc_named_arguments, json_rpc_named_arguments_l2},
blobs_api_url,
retries_left
) do
case fetch_blocks_by_range(block_range, json_rpc_named_arguments) do
@ -368,7 +373,8 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
blocks_params,
genesis_block_l2,
incomplete_channels,
json_rpc_named_arguments_l2
json_rpc_named_arguments_l2,
blobs_api_url
)
{_, message_or_errors} ->
@ -395,64 +401,178 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
batch_submitter,
genesis_block_l2,
incomplete_channels,
json_rpc_named_arguments,
json_rpc_named_arguments_l2,
{json_rpc_named_arguments, json_rpc_named_arguments_l2},
blobs_api_url,
retries_left
)
end
end
end
defp blobs_to_input(transaction_hash, blob_versioned_hashes, block_timestamp, blobs_api_url) do
Enum.reduce(blob_versioned_hashes, <<>>, fn blob_hash, acc ->
with {:ok, response} <- http_get_request(blobs_api_url <> "/" <> blob_hash),
blob_data = Map.get(response, "blob_data"),
false <- is_nil(blob_data) do
# read the data from Blockscout API
decoded =
blob_data
|> String.trim_leading("0x")
|> Base.decode16!(case: :lower)
|> OptimismTxnBatch.decode_eip4844_blob()
if is_nil(decoded) do
Logger.warning("Cannot decode the blob #{blob_hash} taken from the Blockscout Blobs API.")
acc
else
Logger.info("The input for transaction #{transaction_hash} is taken from the Blockscout Blobs API.")
acc <> decoded
end
else
_ ->
# read the data from the fallback source (beacon node)
beacon_config =
:indexer
|> Application.get_env(Blob)
|> Keyword.take([:reference_slot, :reference_timestamp, :slot_duration])
|> Enum.into(%{})
try do
{:ok, fetched_blobs} =
block_timestamp
|> DateTime.to_unix()
|> Blob.timestamp_to_slot(beacon_config)
|> BeaconClient.get_blob_sidecars()
blobs = Map.get(fetched_blobs, "data", [])
if Enum.empty?(blobs) do
raise "Empty data"
end
decoded_blob_data =
blobs
|> Enum.find(fn b ->
b
|> Map.get("kzg_commitment", "0x")
|> String.trim_leading("0x")
|> Base.decode16!(case: :lower)
|> BeaconBlob.hash()
|> Hash.to_string()
|> Kernel.==(blob_hash)
end)
|> Map.get("blob")
|> String.trim_leading("0x")
|> Base.decode16!(case: :lower)
|> OptimismTxnBatch.decode_eip4844_blob()
if is_nil(decoded_blob_data) do
raise "Invalid blob"
else
Logger.info("The input for transaction #{transaction_hash} is taken from the Beacon Node.")
acc <> decoded_blob_data
end
rescue
reason ->
Logger.warning(
"Cannot decode the blob #{blob_hash} taken from the Beacon Node. Reason: #{inspect(reason)}"
)
acc
end
end
end)
end
defp get_txn_batches_inner(
transactions_filtered,
blocks_params,
genesis_block_l2,
incomplete_channels,
json_rpc_named_arguments_l2
json_rpc_named_arguments_l2,
blobs_api_url
) do
transactions_filtered
|> Enum.reduce({:ok, incomplete_channels, [], []}, fn t, {_, incomplete_channels_acc, batches_acc, sequences_acc} ->
frame = input_to_frame(t.input)
channel = Map.get(incomplete_channels_acc, frame.channel_id, %{frames: %{}})
channel_frames =
Map.put(channel.frames, frame.number, %{
data: frame.data,
is_last: frame.is_last,
block_number: t.block_number,
tx_hash: t.hash
})
l1_timestamp =
if frame.is_last do
get_block_timestamp_by_number(t.block_number, blocks_params)
|> Enum.reduce({:ok, incomplete_channels, [], []}, fn tx,
{_, incomplete_channels_acc, batches_acc, sequences_acc} ->
input =
if tx.type == 3 do
# this is EIP-4844 transaction, so we get the input from the blobs
block_timestamp = get_block_timestamp_by_number(tx.block_number, blocks_params)
blobs_to_input(tx.hash, tx.blob_versioned_hashes, block_timestamp, blobs_api_url)
else
Map.get(channel, :l1_timestamp)
tx.input
end
channel =
channel
|> Map.put_new(:id, frame.channel_id)
|> Map.put(:frames, channel_frames)
|> Map.put(:timestamp, DateTime.utc_now())
|> Map.put(:l1_timestamp, l1_timestamp)
if channel_complete?(channel) do
handle_channel(
channel,
if tx.type == 3 and input == <<>> do
# skip this transaction as we cannot find or read its blobs
{:ok, incomplete_channels_acc, batches_acc, sequences_acc}
else
handle_input(
input,
tx,
blocks_params,
incomplete_channels_acc,
batches_acc,
sequences_acc,
genesis_block_l2,
json_rpc_named_arguments_l2
)
else
{:ok, Map.put(incomplete_channels_acc, frame.channel_id, channel), batches_acc, sequences_acc}
end
end)
end
defp handle_input(
input,
tx,
blocks_params,
incomplete_channels_acc,
batches_acc,
sequences_acc,
genesis_block_l2,
json_rpc_named_arguments_l2
) do
frame = input_to_frame(input)
channel = Map.get(incomplete_channels_acc, frame.channel_id, %{frames: %{}})
channel_frames =
Map.put(channel.frames, frame.number, %{
data: frame.data,
is_last: frame.is_last,
block_number: tx.block_number,
tx_hash: tx.hash
})
l1_timestamp =
if frame.is_last do
get_block_timestamp_by_number(tx.block_number, blocks_params)
else
Map.get(channel, :l1_timestamp)
end
channel_updated =
channel
|> Map.put_new(:id, frame.channel_id)
|> Map.put(:frames, channel_frames)
|> Map.put(:timestamp, DateTime.utc_now())
|> Map.put(:l1_timestamp, l1_timestamp)
if channel_complete?(channel_updated) do
handle_channel(
channel_updated,
incomplete_channels_acc,
batches_acc,
sequences_acc,
genesis_block_l2,
json_rpc_named_arguments_l2
)
else
{:ok, Map.put(incomplete_channels_acc, frame.channel_id, channel_updated), batches_acc, sequences_acc}
end
end
defp handle_channel(
channel,
incomplete_channels_acc,
@ -546,6 +666,30 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
end
end
defp http_get_request(url) do
case Application.get_env(:explorer, :http_adapter).get(url) do
{:ok, %Response{body: body, status_code: 200}} ->
Jason.decode(body)
{:ok, %Response{body: body, status_code: _}} ->
{:error, body}
{:error, error} ->
old_truncate = Application.get_env(:logger, :truncate)
Logger.configure(truncate: :infinity)
Logger.error(fn ->
[
"Error while sending request to Blockscout Blobs API: #{url}: ",
inspect(error, limit: :infinity, printable_limit: :infinity)
]
end)
Logger.configure(truncate: old_truncate)
{:error, "Error while sending request to Blockscout Blobs API"}
end
end
defp channel_complete?(channel) do
last_frame_number =
channel.frames
@ -568,8 +712,12 @@ defmodule Indexer.Fetcher.Optimism.TxnBatch do
end
defp input_to_frame("0x" <> input) do
input_binary = Base.decode16!(input, case: :mixed)
input
|> Base.decode16!(case: :mixed)
|> input_to_frame()
end
defp input_to_frame(input_binary) do
# the structure of the input is as follows:
#
# input = derivation_version ++ channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last

@ -16,7 +16,7 @@ defmodule Indexer.Fetcher.Optimism.WithdrawalEvent do
alias EthereumJSONRPC.Blocks
alias Explorer.{Chain, Repo}
alias Explorer.Chain.Optimism.WithdrawalEvent
alias Indexer.Fetcher.Optimism
alias Indexer.Fetcher.{Optimism, RollupL1ReorgMonitor}
alias Indexer.Helper
@fetcher_name :optimism_withdrawal_events
@ -111,7 +111,7 @@ defmodule Indexer.Fetcher.Optimism.WithdrawalEvent do
)
end
reorg_block = Optimism.reorg_block_pop(@fetcher_name)
reorg_block = RollupL1ReorgMonitor.reorg_block_pop(__MODULE__)
if !is_nil(reorg_block) && reorg_block > 0 do
{deleted_count, _} = Repo.delete_all(from(we in WithdrawalEvent, where: we.l1_block_number >= ^reorg_block))
@ -142,12 +142,6 @@ defmodule Indexer.Fetcher.Optimism.WithdrawalEvent do
{:noreply, %{state | start_block: new_start_block, end_block: new_end_block}}
end
@impl GenServer
def handle_info({:chain_event, :optimism_reorg_block, :realtime, block_number}, state) do
Optimism.reorg_block_push(@fetcher_name, block_number)
{:noreply, state}
end
@impl GenServer
def handle_info({ref, _result}, state) do
Process.demonitor(ref, [:flush])

@ -32,6 +32,9 @@ defmodule Indexer.Fetcher.RollupL1ReorgMonitor do
Logger.metadata(fetcher: @fetcher_name)
modules_can_use_reorg_monitor = [
Indexer.Fetcher.Optimism.OutputRoot,
Indexer.Fetcher.Optimism.TxnBatch,
Indexer.Fetcher.Optimism.WithdrawalEvent,
Indexer.Fetcher.PolygonEdge.Deposit,
Indexer.Fetcher.PolygonEdge.WithdrawalExit,
Indexer.Fetcher.PolygonZkevm.BridgeL1,
@ -56,14 +59,27 @@ defmodule Indexer.Fetcher.RollupL1ReorgMonitor do
module_using_reorg_monitor = Enum.at(modules_using_reorg_monitor, 0)
l1_rpc =
if Enum.member?(
[Indexer.Fetcher.PolygonEdge.Deposit, Indexer.Fetcher.PolygonEdge.WithdrawalExit],
module_using_reorg_monitor
) do
# there can be more than one PolygonEdge.* modules, so we get the common L1 RPC URL for them from Indexer.Fetcher.PolygonEdge
Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc]
else
Application.get_all_env(:indexer)[module_using_reorg_monitor][:rpc]
cond do
Enum.member?(
[Indexer.Fetcher.PolygonEdge.Deposit, Indexer.Fetcher.PolygonEdge.WithdrawalExit],
module_using_reorg_monitor
) ->
# there can be more than one PolygonEdge.* modules, so we get the common L1 RPC URL for them from Indexer.Fetcher.PolygonEdge
Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc]
Enum.member?(
[
Indexer.Fetcher.Optimism.OutputRoot,
Indexer.Fetcher.Optimism.TxnBatch,
Indexer.Fetcher.Optimism.WithdrawalEvent
],
module_using_reorg_monitor
) ->
# there can be more than one Optimism.* modules, so we get the common L1 RPC URL for them from Indexer.Fetcher.Optimism
Application.get_all_env(:indexer)[Indexer.Fetcher.Optimism][:optimism_l1_rpc]
true ->
Application.get_all_env(:indexer)[module_using_reorg_monitor][:rpc]
end
json_rpc_named_arguments = Helper.json_rpc_named_arguments(l1_rpc)

@ -138,7 +138,7 @@ defmodule Indexer.Supervisor do
{TokenUpdater.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{ReplacedTransaction.Supervisor, [[memory_monitor: memory_monitor]]},
configure(Indexer.Fetcher.Optimism.Supervisor, [[memory_monitor: memory_monitor]]),
{Indexer.Fetcher.RollupL1ReorgMonitor.Supervisor, [[memory_monitor: memory_monitor]]},
configure(
Indexer.Fetcher.Optimism.TxnBatch.Supervisor,
[[memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments]]
@ -150,7 +150,6 @@ defmodule Indexer.Supervisor do
[[memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments]]
),
configure(Indexer.Fetcher.Optimism.WithdrawalEvent.Supervisor, [[memory_monitor: memory_monitor]]),
{Indexer.Fetcher.RollupL1ReorgMonitor.Supervisor, [[memory_monitor: memory_monitor]]},
configure(Indexer.Fetcher.PolygonEdge.Deposit.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.PolygonEdge.DepositExecute.Supervisor, [
[memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments]

@ -682,7 +682,6 @@ config :indexer, Indexer.Fetcher.CoinBalance.Realtime,
batch_size: coin_balances_batch_size,
concurrency: coin_balances_concurrency
config :indexer, Indexer.Fetcher.Optimism.Supervisor, enabled: ConfigHelper.chain_type() == "optimism"
config :indexer, Indexer.Fetcher.Optimism.TxnBatch.Supervisor, enabled: ConfigHelper.chain_type() == "optimism"
config :indexer, Indexer.Fetcher.Optimism.OutputRoot.Supervisor, enabled: ConfigHelper.chain_type() == "optimism"
config :indexer, Indexer.Fetcher.Optimism.Deposit.Supervisor, enabled: ConfigHelper.chain_type() == "optimism"
@ -713,6 +712,7 @@ config :indexer, Indexer.Fetcher.Optimism.TxnBatch,
batch_inbox: System.get_env("INDEXER_OPTIMISM_L1_BATCH_INBOX"),
batch_submitter: System.get_env("INDEXER_OPTIMISM_L1_BATCH_SUBMITTER"),
blocks_chunk_size: System.get_env("INDEXER_OPTIMISM_L1_BATCH_BLOCKS_CHUNK_SIZE", "4"),
blobs_api_url: System.get_env("INDEXER_OPTIMISM_L1_BATCH_BLOCKSCOUT_BLOBS_API_URL"),
genesis_block_l2: ConfigHelper.parse_integer_or_nil_env_var("INDEXER_OPTIMISM_L2_BATCH_GENESIS_BLOCK_NUMBER")
config :indexer, Indexer.Fetcher.Withdrawal.Supervisor,

@ -233,6 +233,7 @@
"invalidstart",
"inversed",
"ipfs",
"ipos",
"itxs",
"johnnny",
"jsons",
@ -332,6 +333,7 @@
"onclick",
"onconnect",
"ondisconnect",
"opos",
"outcoming",
"overengineering",
"pawesome",

Loading…
Cancel
Save