Add Indexer.Fetcher.RollupL1ReorgMonitor module and remove duplicated code

pull/9098/head
POA 10 months ago
parent 2949f02189
commit ce06032aba
  1. 2
      apps/explorer/lib/explorer/chain/events/publisher.ex
  2. 2
      apps/explorer/lib/explorer/chain/events/subscriber.ex
  3. 154
      apps/indexer/lib/indexer/fetcher/polygon_edge.ex
  4. 13
      apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex
  5. 2
      apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex
  6. 2
      apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex
  7. 13
      apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex
  8. 141
      apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex
  9. 77
      apps/indexer/lib/indexer/fetcher/shibarium/l1.ex
  10. 19
      apps/indexer/lib/indexer/fetcher/zkevm/bridge.ex
  11. 113
      apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1.ex
  12. 4
      apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1_tokens.ex
  13. 4
      apps/indexer/lib/indexer/fetcher/zkevm/bridge_l2.ex
  14. 60
      apps/indexer/lib/indexer/helper.ex
  15. 3
      apps/indexer/lib/indexer/supervisor.ex
  16. 4
      apps/indexer/lib/indexer/transform/zkevm/bridge.ex
  17. 2
      config/runtime.exs

@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Publisher do
Publishes events related to the Chain context.
"""
@allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number polygon_edge_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
def broadcast(_data, false), do: :ok

@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Subscriber do
Subscribes to events related to the Chain context.
"""
@allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number polygon_edge_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@allowed_broadcast_types ~w(catchup realtime on_demand contract_verification_result)a

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge do
Contains common functions for PolygonEdge.* fetchers.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher
@ -15,13 +17,11 @@ defmodule Indexer.Fetcher.PolygonEdge do
import Explorer.Helper, only: [parse_integer: 1]
alias Explorer.Chain.Events.Publisher
alias Explorer.{Chain, Repo}
alias Indexer.{BoundQueue, Helper}
alias Indexer.Helper
alias Indexer.Fetcher.PolygonEdge.{Deposit, DepositExecute, Withdrawal, WithdrawalExit}
@fetcher_name :polygon_edge
@block_check_interval_range_size 100
def child_spec(start_link_arguments) do
spec = %{
@ -41,29 +41,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
@impl GenServer
def init(_args) do
Logger.metadata(fetcher: @fetcher_name)
modules_using_reorg_monitor = [Deposit, WithdrawalExit]
reorg_monitor_not_needed =
modules_using_reorg_monitor
|> Enum.all?(fn module ->
is_nil(Application.get_all_env(:indexer)[module][:start_block_l1])
end)
if reorg_monitor_not_needed do
:ignore
else
polygon_edge_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc]
json_rpc_named_arguments = json_rpc_named_arguments(polygon_edge_l1_rpc)
{:ok, block_check_interval, _} = get_block_check_interval(json_rpc_named_arguments)
Process.send(self(), :reorg_monitor, [])
{:ok,
%{block_check_interval: block_check_interval, json_rpc_named_arguments: json_rpc_named_arguments, prev_latest: 0}}
end
end
@spec init_l1(
@ -78,8 +56,6 @@ defmodule Indexer.Fetcher.PolygonEdge do
def init_l1(table, env, pid, contract_address, contract_name, table_name, entity_name)
when table in [Explorer.Chain.PolygonEdge.Deposit, Explorer.Chain.PolygonEdge.WithdrawalExit] do
with {:start_block_l1_undefined, false} <- {:start_block_l1_undefined, is_nil(env[:start_block_l1])},
{:reorg_monitor_started, true} <-
{:reorg_monitor_started, !is_nil(Process.whereis(Indexer.Fetcher.PolygonEdge))},
polygon_edge_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc],
{:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(polygon_edge_l1_rpc)},
{:contract_is_valid, true} <- {:contract_is_valid, Helper.address_correct?(contract_address)},
@ -94,7 +70,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
Helper.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments, 100_000_000),
{:l1_tx_not_found, false} <- {:l1_tx_not_found, !is_nil(last_l1_transaction_hash) && is_nil(last_l1_tx)},
{:ok, block_check_interval, last_safe_block} <-
get_block_check_interval(json_rpc_named_arguments) do
Helper.get_block_check_interval(json_rpc_named_arguments) do
start_block = max(start_block_l1, last_l1_block_number)
Process.send(pid, :continue, [])
@ -112,10 +88,6 @@ defmodule Indexer.Fetcher.PolygonEdge do
# the process shouldn't start if the start block is not defined
:ignore
{:reorg_monitor_started, false} ->
Logger.error("Cannot start this process as reorg monitor in Indexer.Fetcher.PolygonEdge is not started.")
:ignore
{:rpc_l1_undefined, true} ->
Logger.error("L1 RPC URL is not defined.")
:ignore
@ -217,29 +189,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
end
end
@impl GenServer
def handle_info(
:reorg_monitor,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
prev_latest: prev_latest
} = state
) do
{:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
if latest < prev_latest do
Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.")
Publisher.broadcast([{:polygon_edge_reorg_block, latest}], :realtime)
end
Process.send_after(self(), :reorg_monitor, block_check_interval)
{:noreply, %{state | prev_latest: latest}}
end
@spec handle_continue(map(), binary(), Deposit | WithdrawalExit, atom()) :: {:noreply, map()}
@spec handle_continue(map(), binary(), Deposit | WithdrawalExit) :: {:noreply, map()}
def handle_continue(
%{
contract_address: contract_address,
@ -249,8 +199,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
json_rpc_named_arguments: json_rpc_named_arguments
} = state,
event_signature,
calling_module,
fetcher_name
calling_module
)
when calling_module in [Deposit, WithdrawalExit] do
time_before = Timex.now()
@ -295,14 +244,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
)
end
reorg_block = reorg_block_pop(fetcher_name)
if !is_nil(reorg_block) && reorg_block > 0 do
reorg_handle(reorg_block, calling_module)
{:halt, if(reorg_block <= chunk_end, do: reorg_block - 1, else: chunk_end)}
else
{:cont, chunk_end}
end
end)
new_start_block = last_written_block + 1
@ -540,26 +482,6 @@ defmodule Indexer.Fetcher.PolygonEdge do
Repo.all(query)
end
defp get_block_check_interval(json_rpc_named_arguments) do
{last_safe_block, _} = get_safe_block(json_rpc_named_arguments)
first_block = max(last_safe_block - @block_check_interval_range_size, 1)
with {:ok, first_block_timestamp} <-
Helper.get_block_timestamp_by_number(first_block, json_rpc_named_arguments, 100_000_000),
{:ok, last_safe_block_timestamp} <-
Helper.get_block_timestamp_by_number(last_safe_block, json_rpc_named_arguments, 100_000_000) do
block_check_interval =
ceil((last_safe_block_timestamp - first_block_timestamp) / (last_safe_block - first_block) * 1000 / 2)
Logger.info("Block check interval is calculated as #{block_check_interval} ms.")
{:ok, block_check_interval, last_safe_block}
else
{:error, error} ->
{:error, "Failed to calculate block check interval due to #{inspect(error)}"}
end
end
defp get_safe_block(json_rpc_named_arguments) do
case Helper.get_block_number_by_tag("safe", json_rpc_named_arguments) do
{:ok, safe_block} ->
@ -667,72 +589,8 @@ defmodule Indexer.Fetcher.PolygonEdge do
{events, event_name}
end
defp log_deleted_rows_count(reorg_block, count, table_name) do
if count > 0 do
Logger.warning(
"As L1 reorg was detected, all rows with l1_block_number >= #{reorg_block} were removed from the #{table_name} table. Number of removed rows: #{count}."
)
end
end
@spec repeated_request(list(), any(), list(), non_neg_integer()) :: {:ok, any()} | {:error, atom()}
def repeated_request(req, error_message, json_rpc_named_arguments, retries) do
Helper.repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries)
end
defp reorg_block_pop(fetcher_name) do
table_name = reorg_table_name(fetcher_name)
case BoundQueue.pop_front(reorg_queue_get(table_name)) do
{:ok, {block_number, updated_queue}} ->
:ets.insert(table_name, {:queue, updated_queue})
block_number
{:error, :empty} ->
nil
end
end
@spec reorg_block_push(atom(), non_neg_integer()) :: no_return()
def reorg_block_push(fetcher_name, block_number) do
table_name = reorg_table_name(fetcher_name)
{:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number)
:ets.insert(table_name, {:queue, updated_queue})
end
defp reorg_handle(reorg_block, calling_module) do
{table, table_name} =
if calling_module == Deposit do
{Explorer.Chain.PolygonEdge.Deposit, "polygon_edge_deposits"}
else
{Explorer.Chain.PolygonEdge.WithdrawalExit, "polygon_edge_withdrawal_exits"}
end
{deleted_count, _} = Repo.delete_all(from(item in table, where: item.l1_block_number >= ^reorg_block))
log_deleted_rows_count(reorg_block, deleted_count, table_name)
end
defp reorg_queue_get(table_name) do
if :ets.whereis(table_name) == :undefined do
:ets.new(table_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
with info when info != :undefined <- :ets.info(table_name),
[{_, value}] <- :ets.lookup(table_name, :queue) do
value
else
_ -> %BoundQueue{}
end
end
defp reorg_table_name(fetcher_name) do
:"#{fetcher_name}#{:_reorgs}"
end
end

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do
Fills polygon_edge_deposits DB table.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher
@ -14,7 +16,6 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do
alias ABI.TypeDecoder
alias EthereumJSONRPC.Block.ByNumber
alias EthereumJSONRPC.Blocks
alias Explorer.Chain.Events.Subscriber
alias Explorer.Chain.PolygonEdge.Deposit
alias Indexer.Fetcher.PolygonEdge
@ -47,8 +48,6 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do
env = Application.get_all_env(:indexer)[__MODULE__]
Subscriber.to(:polygon_edge_reorg_block, :realtime)
PolygonEdge.init_l1(
Deposit,
env,
@ -62,13 +61,7 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do
@impl GenServer
def handle_info(:continue, state) do
PolygonEdge.handle_continue(state, @state_synced_event, __MODULE__, @fetcher_name)
end
@impl GenServer
def handle_info({:chain_event, :polygon_edge_reorg_block, :realtime, block_number}, state) do
PolygonEdge.reorg_block_push(@fetcher_name, block_number)
{:noreply, state}
PolygonEdge.handle_continue(state, @state_synced_event, __MODULE__)
end
@impl GenServer

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.DepositExecute do
Fills polygon_edge_deposit_executes DB table.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.Withdrawal do
Fills polygon_edge_withdrawals DB table.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do
Fills polygon_edge_withdrawal_exits DB table.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher
@ -10,7 +12,6 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do
import EthereumJSONRPC, only: [quantity_to_integer: 1]
alias Explorer.Chain.Events.Subscriber
alias Explorer.Chain.PolygonEdge.WithdrawalExit
alias Indexer.Fetcher.PolygonEdge
@ -40,8 +41,6 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do
env = Application.get_all_env(:indexer)[__MODULE__]
Subscriber.to(:polygon_edge_reorg_block, :realtime)
PolygonEdge.init_l1(
WithdrawalExit,
env,
@ -55,13 +54,7 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do
@impl GenServer
def handle_info(:continue, state) do
PolygonEdge.handle_continue(state, @exit_processed_event, __MODULE__, @fetcher_name)
end
@impl GenServer
def handle_info({:chain_event, :polygon_edge_reorg_block, :realtime, block_number}, state) do
PolygonEdge.reorg_block_push(@fetcher_name, block_number)
{:noreply, state}
PolygonEdge.handle_continue(state, @exit_processed_event, __MODULE__)
end
@impl GenServer

@ -0,0 +1,141 @@
defmodule Indexer.Fetcher.RollupL1ReorgMonitor do
@moduledoc """
A module to catch L1 reorgs and notify a rollup module about it.
"""
use GenServer
use Indexer.Fetcher
require Logger
alias Indexer.{BoundQueue, Helper}
@fetcher_name :rollup_l1_reorg_monitor
def child_spec(start_link_arguments) do
spec = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
restart: :transient,
type: :worker
}
Supervisor.child_spec(spec, [])
end
def start_link(args, gen_server_options \\ []) do
GenServer.start_link(__MODULE__, args, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl GenServer
def init(_args) do
Logger.metadata(fetcher: @fetcher_name)
modules_can_use_reorg_monitor = [
Indexer.Fetcher.Shibarium.L1,
Indexer.Fetcher.Zkevm.BridgeL1
]
modules_using_reorg_monitor =
modules_can_use_reorg_monitor
|> Enum.reject(fn module ->
is_nil(Application.get_all_env(:indexer)[module][:start_block])
end)
cond do
Enum.count(modules_using_reorg_monitor) > 1 ->
Logger.error("#{__MODULE__} cannot work for more than one rollup module. Please, check config.")
:ignore
Enum.empty?(modules_using_reorg_monitor) ->
# don't start reorg monitor as there is no module which would use it
:ignore
true ->
module_using_reorg_monitor = Enum.at(modules_using_reorg_monitor, 0)
l1_rpc = Application.get_all_env(:indexer)[module_using_reorg_monitor][:rpc]
json_rpc_named_arguments = Helper.json_rpc_named_arguments(l1_rpc)
{:ok, block_check_interval, _} = Helper.get_block_check_interval(json_rpc_named_arguments)
Process.send(self(), :reorg_monitor, [])
{:ok,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
prev_latest: 0
}}
end
end
@impl GenServer
def handle_info(
:reorg_monitor,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
prev_latest: prev_latest
} = state
) do
{:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
if latest < prev_latest do
Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.")
reorg_block_push(latest)
end
Process.send_after(self(), :reorg_monitor, block_check_interval)
{:noreply, %{state | prev_latest: latest}}
end
@doc """
Pops the number of reorg block from the front of the queue.
Returns `nil` if the reorg queue is empty.
"""
@spec reorg_block_pop() :: non_neg_integer() | nil
def reorg_block_pop do
table_name = reorg_table_name(@fetcher_name)
case BoundQueue.pop_front(reorg_queue_get(table_name)) do
{:ok, {block_number, updated_queue}} ->
:ets.insert(table_name, {:queue, updated_queue})
block_number
{:error, :empty} ->
nil
end
end
defp reorg_block_push(block_number) do
table_name = reorg_table_name(@fetcher_name)
{:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number)
:ets.insert(table_name, {:queue, updated_queue})
end
defp reorg_queue_get(table_name) do
if :ets.whereis(table_name) == :undefined do
:ets.new(table_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
with info when info != :undefined <- :ets.info(table_name),
[{_, value}] <- :ets.lookup(table_name, :queue) do
value
else
_ -> %BoundQueue{}
end
end
defp reorg_table_name(fetcher_name) do
:"#{fetcher_name}#{:_reorgs}"
end
end

@ -25,7 +25,8 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
alias Explorer.Chain.Shibarium.Bridge
alias Explorer.{Chain, Repo}
alias Indexer.{BoundQueue, Helper}
alias Indexer.Fetcher.RollupL1ReorgMonitor
alias Indexer.Helper
@block_check_interval_range_size 100
@eth_get_logs_range_size 1000
@ -109,6 +110,7 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
env = Application.get_all_env(:indexer)[__MODULE__]
with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])},
{:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(RollupL1ReorgMonitor))},
rpc = env[:rpc],
{:rpc_undefined, false} <- {:rpc_undefined, is_nil(rpc)},
{:deposit_manager_address_is_valid, true} <-
@ -138,7 +140,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
{:start_block_valid, true} <- {:start_block_valid, start_block <= latest_block} do
recalculate_cached_count()
Process.send(self(), :reorg_monitor, [])
Process.send(self(), :continue, [])
{:noreply,
@ -152,14 +153,17 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
block_check_interval: block_check_interval,
start_block: max(start_block, last_l1_block_number),
end_block: latest_block,
json_rpc_named_arguments: json_rpc_named_arguments,
reorg_monitor_prev_latest: 0
json_rpc_named_arguments: json_rpc_named_arguments
}}
else
{:start_block_undefined, true} ->
# the process shouldn't start if the start block is not defined
{:stop, :normal, %{}}
{:reorg_monitor_started, false} ->
Logger.error("Cannot start this process as Indexer.Fetcher.RollupL1ReorgMonitor is not started.")
{:stop, :normal, %{}}
{:rpc_undefined, true} ->
Logger.error("L1 RPC URL is not defined.")
{:stop, :normal, %{}}
@ -212,27 +216,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
end
end
@impl GenServer
def handle_info(
:reorg_monitor,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
reorg_monitor_prev_latest: prev_latest
} = state
) do
{:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
if latest < prev_latest do
Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.")
reorg_block_push(latest)
end
Process.send_after(self(), :reorg_monitor, block_check_interval)
{:noreply, %{state | reorg_monitor_prev_latest: latest}}
end
@impl GenServer
def handle_info(
:continue,
@ -290,7 +273,7 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
)
end
reorg_block = reorg_block_pop()
reorg_block = RollupL1ReorgMonitor.reorg_block_pop()
if !is_nil(reorg_block) && reorg_block > 0 do
reorg_handle(reorg_block)
@ -626,25 +609,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
"0x#{truncated_hash}"
end
defp reorg_block_pop do
table_name = reorg_table_name(@fetcher_name)
case BoundQueue.pop_front(reorg_queue_get(table_name)) do
{:ok, {block_number, updated_queue}} ->
:ets.insert(table_name, {:queue, updated_queue})
block_number
{:error, :empty} ->
nil
end
end
defp reorg_block_push(block_number) do
table_name = reorg_table_name(@fetcher_name)
{:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number)
:ets.insert(table_name, {:queue, updated_queue})
end
defp reorg_handle(reorg_block) do
{deleted_count, _} =
Repo.delete_all(from(sb in Bridge, where: sb.l1_block_number >= ^reorg_block and is_nil(sb.l2_transaction_hash)))
@ -675,27 +639,4 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
)
end
end
defp reorg_queue_get(table_name) do
if :ets.whereis(table_name) == :undefined do
:ets.new(table_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
with info when info != :undefined <- :ets.info(table_name),
[{_, value}] <- :ets.lookup(table_name, :queue) do
value
else
_ -> %BoundQueue{}
end
end
defp reorg_table_name(fetcher_name) do
:"#{fetcher_name}#{:_reorgs}"
end
end

@ -129,25 +129,6 @@ defmodule Indexer.Fetcher.Zkevm.Bridge do
})
end
@doc """
Forms JSON RPC named arguments for the given RPC URL.
"""
@spec json_rpc_named_arguments(binary()) :: list()
def json_rpc_named_arguments(rpc_url) do
[
transport: EthereumJSONRPC.HTTP,
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: rpc_url,
http_options: [
recv_timeout: :timer.minutes(10),
timeout: :timer.minutes(10),
hackney: [pool: :ethereum_jsonrpc]
]
]
]
end
@doc """
Converts the list of zkEVM bridge events to the list of operations
preparing them for importing to the database.

@ -12,13 +12,13 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
import Explorer.Helper, only: [parse_integer: 1]
import Indexer.Fetcher.Zkevm.Bridge,
only: [get_logs_all: 3, import_operations: 1, json_rpc_named_arguments: 1, prepare_operations: 3]
only: [get_logs_all: 3, import_operations: 1, prepare_operations: 3]
alias Explorer.Chain.Zkevm.{Bridge, Reader}
alias Explorer.Repo
alias Indexer.{BoundQueue, Helper}
alias Indexer.Fetcher.RollupL1ReorgMonitor
alias Indexer.Helper
@block_check_interval_range_size 100
@eth_get_logs_range_size 1000
@fetcher_name :zkevm_bridge_l1
@ -55,6 +55,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
env = Application.get_all_env(:indexer)[__MODULE__]
with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])},
{:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(RollupL1ReorgMonitor))},
rpc = env[:rpc],
{:rpc_undefined, false} <- {:rpc_undefined, is_nil(rpc)},
{:bridge_contract_address_is_valid, true} <-
@ -63,15 +64,14 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
false <- is_nil(start_block),
true <- start_block > 0,
{last_l1_block_number, last_l1_transaction_hash} = Reader.last_l1_item(),
json_rpc_named_arguments = json_rpc_named_arguments(rpc),
{:ok, block_check_interval, safe_block} <- get_block_check_interval(json_rpc_named_arguments),
json_rpc_named_arguments = Helper.json_rpc_named_arguments(rpc),
{:ok, block_check_interval, safe_block} <- Helper.get_block_check_interval(json_rpc_named_arguments),
{:start_block_valid, true, _, _} <-
{:start_block_valid,
(start_block <= last_l1_block_number || last_l1_block_number == 0) && start_block <= safe_block,
last_l1_block_number, safe_block},
{:ok, last_l1_tx} <- Helper.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments),
{:l1_tx_not_found, false} <- {:l1_tx_not_found, !is_nil(last_l1_transaction_hash) && is_nil(last_l1_tx)} do
Process.send(self(), :reorg_monitor, [])
Process.send(self(), :continue, [])
{:noreply,
@ -79,7 +79,6 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
block_check_interval: block_check_interval,
bridge_contract: env[:bridge_contract],
json_rpc_named_arguments: json_rpc_named_arguments,
reorg_monitor_prev_latest: 0,
end_block: safe_block,
start_block: max(start_block, last_l1_block_number)
}}
@ -88,6 +87,10 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
# the process shouldn't start if the start block is not defined
{:stop, :normal, %{}}
{:reorg_monitor_started, false} ->
Logger.error("Cannot start this process as Indexer.Fetcher.RollupL1ReorgMonitor is not started.")
{:stop, :normal, %{}}
{:rpc_undefined, true} ->
Logger.error("L1 RPC URL is not defined.")
{:stop, :normal, %{}}
@ -122,27 +125,6 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
end
end
@impl GenServer
def handle_info(
:reorg_monitor,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
reorg_monitor_prev_latest: prev_latest
} = state
) do
{:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
if latest < prev_latest do
Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.")
reorg_block_push(latest)
end
Process.send_after(self(), :reorg_monitor, block_check_interval)
{:noreply, %{state | reorg_monitor_prev_latest: latest}}
end
@impl GenServer
def handle_info(
:continue,
@ -183,7 +165,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
)
end
reorg_block = reorg_block_pop()
reorg_block = RollupL1ReorgMonitor.reorg_block_pop()
if !is_nil(reorg_block) && reorg_block > 0 do
reorg_handle(reorg_block)
@ -215,56 +197,6 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
{:noreply, state}
end
defp get_block_check_interval(json_rpc_named_arguments) do
{last_safe_block, _} = get_safe_block(json_rpc_named_arguments)
first_block = max(last_safe_block - @block_check_interval_range_size, 1)
with {:ok, first_block_timestamp} <-
Helper.get_block_timestamp_by_number(first_block, json_rpc_named_arguments, 100_000_000),
{:ok, last_safe_block_timestamp} <-
Helper.get_block_timestamp_by_number(last_safe_block, json_rpc_named_arguments, 100_000_000) do
block_check_interval =
ceil((last_safe_block_timestamp - first_block_timestamp) / (last_safe_block - first_block) * 1000 / 2)
Logger.info("Block check interval is calculated as #{block_check_interval} ms.")
{:ok, block_check_interval, last_safe_block}
else
{:error, error} ->
{:error, "Failed to calculate block check interval due to #{inspect(error)}"}
end
end
defp get_safe_block(json_rpc_named_arguments) do
case Helper.get_block_number_by_tag("safe", json_rpc_named_arguments) do
{:ok, safe_block} ->
{safe_block, false}
{:error, :not_found} ->
{:ok, latest_block} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
{latest_block, true}
end
end
defp reorg_block_pop do
table_name = reorg_table_name(@fetcher_name)
case BoundQueue.pop_front(reorg_queue_get(table_name)) do
{:ok, {block_number, updated_queue}} ->
:ets.insert(table_name, {:queue, updated_queue})
block_number
{:error, :empty} ->
nil
end
end
defp reorg_block_push(block_number) do
table_name = reorg_table_name(@fetcher_name)
{:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number)
:ets.insert(table_name, {:queue, updated_queue})
end
defp reorg_handle(reorg_block) do
{deleted_count, _} =
Repo.delete_all(from(b in Bridge, where: b.type == :deposit and b.block_number >= ^reorg_block))
@ -275,27 +207,4 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
)
end
end
defp reorg_queue_get(table_name) do
if :ets.whereis(table_name) == :undefined do
:ets.new(table_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
with info when info != :undefined <- :ets.info(table_name),
[{_, value}] <- :ets.lookup(table_name, :queue) do
value
else
_ -> %BoundQueue{}
end
end
defp reorg_table_name(fetcher_name) do
:"#{fetcher_name}#{:_reorgs}"
end
end

@ -9,7 +9,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1Tokens do
import Ecto.Query
alias Explorer.Repo
alias Indexer.BufferedTask
alias Indexer.{BufferedTask, Helper}
alias Indexer.Fetcher.Zkevm.{Bridge, BridgeL1}
@behaviour BufferedTask
@ -20,7 +20,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1Tokens do
@doc false
def child_spec([init_options, gen_server_options]) do
rpc = Application.get_all_env(:indexer)[BridgeL1][:rpc]
json_rpc_named_arguments = Bridge.json_rpc_named_arguments(rpc)
json_rpc_named_arguments = Helper.json_rpc_named_arguments(rpc)
merged_init_opts =
defaults()

@ -12,7 +12,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL2 do
import Explorer.Helper, only: [parse_integer: 1]
import Indexer.Fetcher.Zkevm.Bridge,
only: [get_logs_all: 3, import_operations: 1, json_rpc_named_arguments: 1, prepare_operations: 3]
only: [get_logs_all: 3, import_operations: 1, prepare_operations: 3]
alias Explorer.Chain.Zkevm.{Bridge, Reader}
alias Explorer.Repo
@ -75,7 +75,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL2 do
%{
bridge_contract: env[:bridge_contract],
json_rpc_named_arguments: json_rpc_named_arguments,
json_rpc_named_arguments_l1: json_rpc_named_arguments(rpc_l1),
json_rpc_named_arguments_l1: Helper.json_rpc_named_arguments(rpc_l1),
end_block: latest_block,
start_block: max(start_block, last_l2_block_number)
}}

@ -17,6 +17,8 @@ defmodule Indexer.Helper do
alias EthereumJSONRPC.Blocks
alias Explorer.Chain.Hash
@block_check_interval_range_size 100
@doc """
Checks whether the given Ethereum address looks correct.
The address should begin with 0x prefix and then contain 40 hexadecimal digits (can be in mixed case).
@ -55,6 +57,45 @@ defmodule Indexer.Helper do
end
end
@doc """
Calculates average block time in milliseconds (based on the latest 100 blocks) divided by 2.
Sends corresponding requests to the RPC node.
Returns a tuple {:ok, block_check_interval, last_safe_block}
where `last_safe_block` is the number of the recent `safe` or `latest` block (depending on which one is available).
Returns {:error, description} in case of error.
"""
@spec get_block_check_interval(list()) :: {:ok, non_neg_integer(), non_neg_integer()} | {:error, any()}
def get_block_check_interval(json_rpc_named_arguments) do
{last_safe_block, _} = get_safe_block(json_rpc_named_arguments)
first_block = max(last_safe_block - @block_check_interval_range_size, 1)
with {:ok, first_block_timestamp} <-
get_block_timestamp_by_number(first_block, json_rpc_named_arguments, 100_000_000),
{:ok, last_safe_block_timestamp} <-
get_block_timestamp_by_number(last_safe_block, json_rpc_named_arguments, 100_000_000) do
block_check_interval =
ceil((last_safe_block_timestamp - first_block_timestamp) / (last_safe_block - first_block) * 1000 / 2)
Logger.info("Block check interval is calculated as #{block_check_interval} ms.")
{:ok, block_check_interval, last_safe_block}
else
{:error, error} ->
{:error, "Failed to calculate block check interval due to #{inspect(error)}"}
end
end
defp get_safe_block(json_rpc_named_arguments) do
case get_block_number_by_tag("safe", json_rpc_named_arguments) do
{:ok, safe_block} ->
{safe_block, false}
{:error, :not_found} ->
{:ok, latest_block} = get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
{latest_block, true}
end
end
@doc """
Fetches block number by its tag (e.g. `latest` or `safe`) using RPC request.
Performs a specified number of retries (up to) if the first attempt returns error.
@ -87,6 +128,25 @@ defmodule Indexer.Helper do
repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries)
end
@doc """
Forms JSON RPC named arguments for the given RPC URL.
"""
@spec json_rpc_named_arguments(binary()) :: list()
def json_rpc_named_arguments(rpc_url) do
[
transport: EthereumJSONRPC.HTTP,
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: rpc_url,
http_options: [
recv_timeout: :timer.minutes(10),
timeout: :timer.minutes(10),
hackney: [pool: :ethereum_jsonrpc]
]
]
]
end
@doc """
Prints a log of progress when handling something splitted to block chunks.
"""

@ -32,7 +32,6 @@ defmodule Indexer.Supervisor do
InternalTransaction,
PendingBlockOperationsSanitizer,
PendingTransaction,
PolygonEdge,
ReplacedTransaction,
RootstockData,
Token,
@ -132,7 +131,7 @@ defmodule Indexer.Supervisor do
{TokenUpdater.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{ReplacedTransaction.Supervisor, [[memory_monitor: memory_monitor]]},
configure(PolygonEdge.Supervisor, [[memory_monitor: memory_monitor]]),
{Indexer.Fetcher.RollupL1ReorgMonitor.Supervisor, [[memory_monitor: memory_monitor]]},
configure(Indexer.Fetcher.PolygonEdge.Deposit.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.PolygonEdge.DepositExecute.Supervisor, [
[memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments]

@ -6,7 +6,7 @@ defmodule Indexer.Transform.Zkevm.Bridge do
require Logger
import Indexer.Fetcher.Zkevm.Bridge,
only: [filter_bridge_events: 2, json_rpc_named_arguments: 1, prepare_operations: 4]
only: [filter_bridge_events: 2, prepare_operations: 4]
alias Indexer.Fetcher.Zkevm.{BridgeL1, BridgeL2}
alias Indexer.Helper
@ -35,7 +35,7 @@ defmodule Indexer.Transform.Zkevm.Bridge do
Helper.log_blocks_chunk_handling(start_block, end_block, start_block, end_block, nil, "L2")
json_rpc_named_arguments_l1 = json_rpc_named_arguments(rpc_l1)
json_rpc_named_arguments_l1 = Helper.json_rpc_named_arguments(rpc_l1)
block_to_timestamp = Enum.reduce(blocks, %{}, fn block, acc -> Map.put(acc, block.number, block.timestamp) end)

@ -649,8 +649,6 @@ config :indexer, Indexer.Fetcher.Withdrawal.Supervisor,
config :indexer, Indexer.Fetcher.Withdrawal, first_block: System.get_env("WITHDRAWALS_FIRST_BLOCK")
config :indexer, Indexer.Fetcher.PolygonEdge.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_edge"
config :indexer, Indexer.Fetcher.PolygonEdge.Deposit.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_edge"
config :indexer, Indexer.Fetcher.PolygonEdge.DepositExecute.Supervisor,

Loading…
Cancel
Save