diff --git a/apps/explorer/lib/explorer/chain/events/publisher.ex b/apps/explorer/lib/explorer/chain/events/publisher.ex index 3dca04f31f..87da3c8e91 100644 --- a/apps/explorer/lib/explorer/chain/events/publisher.ex +++ b/apps/explorer/lib/explorer/chain/events/publisher.ex @@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Publisher do Publishes events related to the Chain context. """ - @allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number polygon_edge_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a + @allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a def broadcast(_data, false), do: :ok diff --git a/apps/explorer/lib/explorer/chain/events/subscriber.ex b/apps/explorer/lib/explorer/chain/events/subscriber.ex index f2aa49f61f..f073afb8eb 100644 --- a/apps/explorer/lib/explorer/chain/events/subscriber.ex +++ b/apps/explorer/lib/explorer/chain/events/subscriber.ex @@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Subscriber do Subscribes to events related to the Chain context. """ - @allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number polygon_edge_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a + @allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a @allowed_broadcast_types ~w(catchup realtime on_demand contract_verification_result)a diff --git a/apps/indexer/lib/indexer/fetcher/polygon_edge.ex b/apps/indexer/lib/indexer/fetcher/polygon_edge.ex index 3878948259..ee1fe71ddb 100644 --- a/apps/indexer/lib/indexer/fetcher/polygon_edge.ex +++ b/apps/indexer/lib/indexer/fetcher/polygon_edge.ex @@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge do Contains common functions for PolygonEdge.* fetchers. """ + # todo: this module is deprecated and should be removed + use GenServer use Indexer.Fetcher @@ -15,13 +17,11 @@ defmodule Indexer.Fetcher.PolygonEdge do import Explorer.Helper, only: [parse_integer: 1] - alias Explorer.Chain.Events.Publisher alias Explorer.{Chain, Repo} - alias Indexer.{BoundQueue, Helper} + alias Indexer.Helper alias Indexer.Fetcher.PolygonEdge.{Deposit, DepositExecute, Withdrawal, WithdrawalExit} @fetcher_name :polygon_edge - @block_check_interval_range_size 100 def child_spec(start_link_arguments) do spec = %{ @@ -41,29 +41,7 @@ defmodule Indexer.Fetcher.PolygonEdge do @impl GenServer def init(_args) do Logger.metadata(fetcher: @fetcher_name) - - modules_using_reorg_monitor = [Deposit, WithdrawalExit] - - reorg_monitor_not_needed = - modules_using_reorg_monitor - |> Enum.all?(fn module -> - is_nil(Application.get_all_env(:indexer)[module][:start_block_l1]) - end) - - if reorg_monitor_not_needed do - :ignore - else - polygon_edge_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc] - - json_rpc_named_arguments = json_rpc_named_arguments(polygon_edge_l1_rpc) - - {:ok, block_check_interval, _} = get_block_check_interval(json_rpc_named_arguments) - - Process.send(self(), :reorg_monitor, []) - - {:ok, - %{block_check_interval: block_check_interval, json_rpc_named_arguments: json_rpc_named_arguments, prev_latest: 0}} - end + :ignore end @spec init_l1( @@ -78,8 +56,6 @@ defmodule Indexer.Fetcher.PolygonEdge do def init_l1(table, env, pid, contract_address, contract_name, table_name, entity_name) when table in [Explorer.Chain.PolygonEdge.Deposit, Explorer.Chain.PolygonEdge.WithdrawalExit] do with {:start_block_l1_undefined, false} <- {:start_block_l1_undefined, is_nil(env[:start_block_l1])}, - {:reorg_monitor_started, true} <- - {:reorg_monitor_started, !is_nil(Process.whereis(Indexer.Fetcher.PolygonEdge))}, polygon_edge_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc], {:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(polygon_edge_l1_rpc)}, {:contract_is_valid, true} <- {:contract_is_valid, Helper.address_correct?(contract_address)}, @@ -94,7 +70,7 @@ defmodule Indexer.Fetcher.PolygonEdge do Helper.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments, 100_000_000), {:l1_tx_not_found, false} <- {:l1_tx_not_found, !is_nil(last_l1_transaction_hash) && is_nil(last_l1_tx)}, {:ok, block_check_interval, last_safe_block} <- - get_block_check_interval(json_rpc_named_arguments) do + Helper.get_block_check_interval(json_rpc_named_arguments) do start_block = max(start_block_l1, last_l1_block_number) Process.send(pid, :continue, []) @@ -112,10 +88,6 @@ defmodule Indexer.Fetcher.PolygonEdge do # the process shouldn't start if the start block is not defined :ignore - {:reorg_monitor_started, false} -> - Logger.error("Cannot start this process as reorg monitor in Indexer.Fetcher.PolygonEdge is not started.") - :ignore - {:rpc_l1_undefined, true} -> Logger.error("L1 RPC URL is not defined.") :ignore @@ -217,29 +189,7 @@ defmodule Indexer.Fetcher.PolygonEdge do end end - @impl GenServer - def handle_info( - :reorg_monitor, - %{ - block_check_interval: block_check_interval, - json_rpc_named_arguments: json_rpc_named_arguments, - prev_latest: prev_latest - } = state - ) do - {:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000) - - if latest < prev_latest do - Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.") - - Publisher.broadcast([{:polygon_edge_reorg_block, latest}], :realtime) - end - - Process.send_after(self(), :reorg_monitor, block_check_interval) - - {:noreply, %{state | prev_latest: latest}} - end - - @spec handle_continue(map(), binary(), Deposit | WithdrawalExit, atom()) :: {:noreply, map()} + @spec handle_continue(map(), binary(), Deposit | WithdrawalExit) :: {:noreply, map()} def handle_continue( %{ contract_address: contract_address, @@ -249,8 +199,7 @@ defmodule Indexer.Fetcher.PolygonEdge do json_rpc_named_arguments: json_rpc_named_arguments } = state, event_signature, - calling_module, - fetcher_name + calling_module ) when calling_module in [Deposit, WithdrawalExit] do time_before = Timex.now() @@ -295,14 +244,7 @@ defmodule Indexer.Fetcher.PolygonEdge do ) end - reorg_block = reorg_block_pop(fetcher_name) - - if !is_nil(reorg_block) && reorg_block > 0 do - reorg_handle(reorg_block, calling_module) - {:halt, if(reorg_block <= chunk_end, do: reorg_block - 1, else: chunk_end)} - else - {:cont, chunk_end} - end + {:cont, chunk_end} end) new_start_block = last_written_block + 1 @@ -540,26 +482,6 @@ defmodule Indexer.Fetcher.PolygonEdge do Repo.all(query) end - defp get_block_check_interval(json_rpc_named_arguments) do - {last_safe_block, _} = get_safe_block(json_rpc_named_arguments) - - first_block = max(last_safe_block - @block_check_interval_range_size, 1) - - with {:ok, first_block_timestamp} <- - Helper.get_block_timestamp_by_number(first_block, json_rpc_named_arguments, 100_000_000), - {:ok, last_safe_block_timestamp} <- - Helper.get_block_timestamp_by_number(last_safe_block, json_rpc_named_arguments, 100_000_000) do - block_check_interval = - ceil((last_safe_block_timestamp - first_block_timestamp) / (last_safe_block - first_block) * 1000 / 2) - - Logger.info("Block check interval is calculated as #{block_check_interval} ms.") - {:ok, block_check_interval, last_safe_block} - else - {:error, error} -> - {:error, "Failed to calculate block check interval due to #{inspect(error)}"} - end - end - defp get_safe_block(json_rpc_named_arguments) do case Helper.get_block_number_by_tag("safe", json_rpc_named_arguments) do {:ok, safe_block} -> @@ -667,72 +589,8 @@ defmodule Indexer.Fetcher.PolygonEdge do {events, event_name} end - defp log_deleted_rows_count(reorg_block, count, table_name) do - if count > 0 do - Logger.warning( - "As L1 reorg was detected, all rows with l1_block_number >= #{reorg_block} were removed from the #{table_name} table. Number of removed rows: #{count}." - ) - end - end - @spec repeated_request(list(), any(), list(), non_neg_integer()) :: {:ok, any()} | {:error, atom()} def repeated_request(req, error_message, json_rpc_named_arguments, retries) do Helper.repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries) end - - defp reorg_block_pop(fetcher_name) do - table_name = reorg_table_name(fetcher_name) - - case BoundQueue.pop_front(reorg_queue_get(table_name)) do - {:ok, {block_number, updated_queue}} -> - :ets.insert(table_name, {:queue, updated_queue}) - block_number - - {:error, :empty} -> - nil - end - end - - @spec reorg_block_push(atom(), non_neg_integer()) :: no_return() - def reorg_block_push(fetcher_name, block_number) do - table_name = reorg_table_name(fetcher_name) - {:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number) - :ets.insert(table_name, {:queue, updated_queue}) - end - - defp reorg_handle(reorg_block, calling_module) do - {table, table_name} = - if calling_module == Deposit do - {Explorer.Chain.PolygonEdge.Deposit, "polygon_edge_deposits"} - else - {Explorer.Chain.PolygonEdge.WithdrawalExit, "polygon_edge_withdrawal_exits"} - end - - {deleted_count, _} = Repo.delete_all(from(item in table, where: item.l1_block_number >= ^reorg_block)) - - log_deleted_rows_count(reorg_block, deleted_count, table_name) - end - - defp reorg_queue_get(table_name) do - if :ets.whereis(table_name) == :undefined do - :ets.new(table_name, [ - :set, - :named_table, - :public, - read_concurrency: true, - write_concurrency: true - ]) - end - - with info when info != :undefined <- :ets.info(table_name), - [{_, value}] <- :ets.lookup(table_name, :queue) do - value - else - _ -> %BoundQueue{} - end - end - - defp reorg_table_name(fetcher_name) do - :"#{fetcher_name}#{:_reorgs}" - end end diff --git a/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex b/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex index ca11c30e08..642e1951ca 100644 --- a/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex +++ b/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex @@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do Fills polygon_edge_deposits DB table. """ + # todo: this module is deprecated and should be removed + use GenServer use Indexer.Fetcher @@ -14,7 +16,6 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do alias ABI.TypeDecoder alias EthereumJSONRPC.Block.ByNumber alias EthereumJSONRPC.Blocks - alias Explorer.Chain.Events.Subscriber alias Explorer.Chain.PolygonEdge.Deposit alias Indexer.Fetcher.PolygonEdge @@ -47,8 +48,6 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do env = Application.get_all_env(:indexer)[__MODULE__] - Subscriber.to(:polygon_edge_reorg_block, :realtime) - PolygonEdge.init_l1( Deposit, env, @@ -62,13 +61,7 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do @impl GenServer def handle_info(:continue, state) do - PolygonEdge.handle_continue(state, @state_synced_event, __MODULE__, @fetcher_name) - end - - @impl GenServer - def handle_info({:chain_event, :polygon_edge_reorg_block, :realtime, block_number}, state) do - PolygonEdge.reorg_block_push(@fetcher_name, block_number) - {:noreply, state} + PolygonEdge.handle_continue(state, @state_synced_event, __MODULE__) end @impl GenServer diff --git a/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex b/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex index 8367883ee1..82a8a2d7ed 100644 --- a/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex +++ b/apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex @@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.DepositExecute do Fills polygon_edge_deposit_executes DB table. """ + # todo: this module is deprecated and should be removed + use GenServer use Indexer.Fetcher diff --git a/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex b/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex index 4a8ae47d22..c952d51618 100644 --- a/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex +++ b/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex @@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.Withdrawal do Fills polygon_edge_withdrawals DB table. """ + # todo: this module is deprecated and should be removed + use GenServer use Indexer.Fetcher diff --git a/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex b/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex index 5b41e122dd..e19ea6517c 100644 --- a/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex +++ b/apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex @@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do Fills polygon_edge_withdrawal_exits DB table. """ + # todo: this module is deprecated and should be removed + use GenServer use Indexer.Fetcher @@ -10,7 +12,6 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do import EthereumJSONRPC, only: [quantity_to_integer: 1] - alias Explorer.Chain.Events.Subscriber alias Explorer.Chain.PolygonEdge.WithdrawalExit alias Indexer.Fetcher.PolygonEdge @@ -40,8 +41,6 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do env = Application.get_all_env(:indexer)[__MODULE__] - Subscriber.to(:polygon_edge_reorg_block, :realtime) - PolygonEdge.init_l1( WithdrawalExit, env, @@ -55,13 +54,7 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do @impl GenServer def handle_info(:continue, state) do - PolygonEdge.handle_continue(state, @exit_processed_event, __MODULE__, @fetcher_name) - end - - @impl GenServer - def handle_info({:chain_event, :polygon_edge_reorg_block, :realtime, block_number}, state) do - PolygonEdge.reorg_block_push(@fetcher_name, block_number) - {:noreply, state} + PolygonEdge.handle_continue(state, @exit_processed_event, __MODULE__) end @impl GenServer diff --git a/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex b/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex new file mode 100644 index 0000000000..bb1cdc7b6e --- /dev/null +++ b/apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex @@ -0,0 +1,141 @@ +defmodule Indexer.Fetcher.RollupL1ReorgMonitor do + @moduledoc """ + A module to catch L1 reorgs and notify a rollup module about it. + """ + + use GenServer + use Indexer.Fetcher + + require Logger + + alias Indexer.{BoundQueue, Helper} + + @fetcher_name :rollup_l1_reorg_monitor + + def child_spec(start_link_arguments) do + spec = %{ + id: __MODULE__, + start: {__MODULE__, :start_link, start_link_arguments}, + restart: :transient, + type: :worker + } + + Supervisor.child_spec(spec, []) + end + + def start_link(args, gen_server_options \\ []) do + GenServer.start_link(__MODULE__, args, Keyword.put_new(gen_server_options, :name, __MODULE__)) + end + + @impl GenServer + def init(_args) do + Logger.metadata(fetcher: @fetcher_name) + + modules_can_use_reorg_monitor = [ + Indexer.Fetcher.Shibarium.L1, + Indexer.Fetcher.Zkevm.BridgeL1 + ] + + modules_using_reorg_monitor = + modules_can_use_reorg_monitor + |> Enum.reject(fn module -> + is_nil(Application.get_all_env(:indexer)[module][:start_block]) + end) + + cond do + Enum.count(modules_using_reorg_monitor) > 1 -> + Logger.error("#{__MODULE__} cannot work for more than one rollup module. Please, check config.") + :ignore + + Enum.empty?(modules_using_reorg_monitor) -> + # don't start reorg monitor as there is no module which would use it + :ignore + + true -> + module_using_reorg_monitor = Enum.at(modules_using_reorg_monitor, 0) + + l1_rpc = Application.get_all_env(:indexer)[module_using_reorg_monitor][:rpc] + + json_rpc_named_arguments = Helper.json_rpc_named_arguments(l1_rpc) + + {:ok, block_check_interval, _} = Helper.get_block_check_interval(json_rpc_named_arguments) + + Process.send(self(), :reorg_monitor, []) + + {:ok, + %{ + block_check_interval: block_check_interval, + json_rpc_named_arguments: json_rpc_named_arguments, + prev_latest: 0 + }} + end + end + + @impl GenServer + def handle_info( + :reorg_monitor, + %{ + block_check_interval: block_check_interval, + json_rpc_named_arguments: json_rpc_named_arguments, + prev_latest: prev_latest + } = state + ) do + {:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000) + + if latest < prev_latest do + Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.") + reorg_block_push(latest) + end + + Process.send_after(self(), :reorg_monitor, block_check_interval) + + {:noreply, %{state | prev_latest: latest}} + end + + @doc """ + Pops the number of reorg block from the front of the queue. + Returns `nil` if the reorg queue is empty. + """ + @spec reorg_block_pop() :: non_neg_integer() | nil + def reorg_block_pop do + table_name = reorg_table_name(@fetcher_name) + + case BoundQueue.pop_front(reorg_queue_get(table_name)) do + {:ok, {block_number, updated_queue}} -> + :ets.insert(table_name, {:queue, updated_queue}) + block_number + + {:error, :empty} -> + nil + end + end + + defp reorg_block_push(block_number) do + table_name = reorg_table_name(@fetcher_name) + {:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number) + :ets.insert(table_name, {:queue, updated_queue}) + end + + defp reorg_queue_get(table_name) do + if :ets.whereis(table_name) == :undefined do + :ets.new(table_name, [ + :set, + :named_table, + :public, + read_concurrency: true, + write_concurrency: true + ]) + end + + with info when info != :undefined <- :ets.info(table_name), + [{_, value}] <- :ets.lookup(table_name, :queue) do + value + else + _ -> %BoundQueue{} + end + end + + defp reorg_table_name(fetcher_name) do + :"#{fetcher_name}#{:_reorgs}" + end +end diff --git a/apps/indexer/lib/indexer/fetcher/shibarium/l1.ex b/apps/indexer/lib/indexer/fetcher/shibarium/l1.ex index 906acd2689..841dfafd85 100644 --- a/apps/indexer/lib/indexer/fetcher/shibarium/l1.ex +++ b/apps/indexer/lib/indexer/fetcher/shibarium/l1.ex @@ -25,7 +25,8 @@ defmodule Indexer.Fetcher.Shibarium.L1 do alias Explorer.Chain.Shibarium.Bridge alias Explorer.{Chain, Repo} - alias Indexer.{BoundQueue, Helper} + alias Indexer.Fetcher.RollupL1ReorgMonitor + alias Indexer.Helper @block_check_interval_range_size 100 @eth_get_logs_range_size 1000 @@ -109,6 +110,7 @@ defmodule Indexer.Fetcher.Shibarium.L1 do env = Application.get_all_env(:indexer)[__MODULE__] with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])}, + {:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(RollupL1ReorgMonitor))}, rpc = env[:rpc], {:rpc_undefined, false} <- {:rpc_undefined, is_nil(rpc)}, {:deposit_manager_address_is_valid, true} <- @@ -138,7 +140,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do {:start_block_valid, true} <- {:start_block_valid, start_block <= latest_block} do recalculate_cached_count() - Process.send(self(), :reorg_monitor, []) Process.send(self(), :continue, []) {:noreply, @@ -152,14 +153,17 @@ defmodule Indexer.Fetcher.Shibarium.L1 do block_check_interval: block_check_interval, start_block: max(start_block, last_l1_block_number), end_block: latest_block, - json_rpc_named_arguments: json_rpc_named_arguments, - reorg_monitor_prev_latest: 0 + json_rpc_named_arguments: json_rpc_named_arguments }} else {:start_block_undefined, true} -> # the process shouldn't start if the start block is not defined {:stop, :normal, %{}} + {:reorg_monitor_started, false} -> + Logger.error("Cannot start this process as Indexer.Fetcher.RollupL1ReorgMonitor is not started.") + {:stop, :normal, %{}} + {:rpc_undefined, true} -> Logger.error("L1 RPC URL is not defined.") {:stop, :normal, %{}} @@ -212,27 +216,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do end end - @impl GenServer - def handle_info( - :reorg_monitor, - %{ - block_check_interval: block_check_interval, - json_rpc_named_arguments: json_rpc_named_arguments, - reorg_monitor_prev_latest: prev_latest - } = state - ) do - {:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000) - - if latest < prev_latest do - Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.") - reorg_block_push(latest) - end - - Process.send_after(self(), :reorg_monitor, block_check_interval) - - {:noreply, %{state | reorg_monitor_prev_latest: latest}} - end - @impl GenServer def handle_info( :continue, @@ -290,7 +273,7 @@ defmodule Indexer.Fetcher.Shibarium.L1 do ) end - reorg_block = reorg_block_pop() + reorg_block = RollupL1ReorgMonitor.reorg_block_pop() if !is_nil(reorg_block) && reorg_block > 0 do reorg_handle(reorg_block) @@ -626,25 +609,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do "0x#{truncated_hash}" end - defp reorg_block_pop do - table_name = reorg_table_name(@fetcher_name) - - case BoundQueue.pop_front(reorg_queue_get(table_name)) do - {:ok, {block_number, updated_queue}} -> - :ets.insert(table_name, {:queue, updated_queue}) - block_number - - {:error, :empty} -> - nil - end - end - - defp reorg_block_push(block_number) do - table_name = reorg_table_name(@fetcher_name) - {:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number) - :ets.insert(table_name, {:queue, updated_queue}) - end - defp reorg_handle(reorg_block) do {deleted_count, _} = Repo.delete_all(from(sb in Bridge, where: sb.l1_block_number >= ^reorg_block and is_nil(sb.l2_transaction_hash))) @@ -675,27 +639,4 @@ defmodule Indexer.Fetcher.Shibarium.L1 do ) end end - - defp reorg_queue_get(table_name) do - if :ets.whereis(table_name) == :undefined do - :ets.new(table_name, [ - :set, - :named_table, - :public, - read_concurrency: true, - write_concurrency: true - ]) - end - - with info when info != :undefined <- :ets.info(table_name), - [{_, value}] <- :ets.lookup(table_name, :queue) do - value - else - _ -> %BoundQueue{} - end - end - - defp reorg_table_name(fetcher_name) do - :"#{fetcher_name}#{:_reorgs}" - end end diff --git a/apps/indexer/lib/indexer/fetcher/zkevm/bridge.ex b/apps/indexer/lib/indexer/fetcher/zkevm/bridge.ex index 72ea31d67d..1180e0771c 100644 --- a/apps/indexer/lib/indexer/fetcher/zkevm/bridge.ex +++ b/apps/indexer/lib/indexer/fetcher/zkevm/bridge.ex @@ -129,25 +129,6 @@ defmodule Indexer.Fetcher.Zkevm.Bridge do }) end - @doc """ - Forms JSON RPC named arguments for the given RPC URL. - """ - @spec json_rpc_named_arguments(binary()) :: list() - def json_rpc_named_arguments(rpc_url) do - [ - transport: EthereumJSONRPC.HTTP, - transport_options: [ - http: EthereumJSONRPC.HTTP.HTTPoison, - url: rpc_url, - http_options: [ - recv_timeout: :timer.minutes(10), - timeout: :timer.minutes(10), - hackney: [pool: :ethereum_jsonrpc] - ] - ] - ] - end - @doc """ Converts the list of zkEVM bridge events to the list of operations preparing them for importing to the database. diff --git a/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1.ex b/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1.ex index 419bf6fb15..27fed061e7 100644 --- a/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1.ex +++ b/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1.ex @@ -12,13 +12,13 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do import Explorer.Helper, only: [parse_integer: 1] import Indexer.Fetcher.Zkevm.Bridge, - only: [get_logs_all: 3, import_operations: 1, json_rpc_named_arguments: 1, prepare_operations: 3] + only: [get_logs_all: 3, import_operations: 1, prepare_operations: 3] alias Explorer.Chain.Zkevm.{Bridge, Reader} alias Explorer.Repo - alias Indexer.{BoundQueue, Helper} + alias Indexer.Fetcher.RollupL1ReorgMonitor + alias Indexer.Helper - @block_check_interval_range_size 100 @eth_get_logs_range_size 1000 @fetcher_name :zkevm_bridge_l1 @@ -55,6 +55,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do env = Application.get_all_env(:indexer)[__MODULE__] with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])}, + {:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(RollupL1ReorgMonitor))}, rpc = env[:rpc], {:rpc_undefined, false} <- {:rpc_undefined, is_nil(rpc)}, {:bridge_contract_address_is_valid, true} <- @@ -63,15 +64,14 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do false <- is_nil(start_block), true <- start_block > 0, {last_l1_block_number, last_l1_transaction_hash} = Reader.last_l1_item(), - json_rpc_named_arguments = json_rpc_named_arguments(rpc), - {:ok, block_check_interval, safe_block} <- get_block_check_interval(json_rpc_named_arguments), + json_rpc_named_arguments = Helper.json_rpc_named_arguments(rpc), + {:ok, block_check_interval, safe_block} <- Helper.get_block_check_interval(json_rpc_named_arguments), {:start_block_valid, true, _, _} <- {:start_block_valid, (start_block <= last_l1_block_number || last_l1_block_number == 0) && start_block <= safe_block, last_l1_block_number, safe_block}, {:ok, last_l1_tx} <- Helper.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments), {:l1_tx_not_found, false} <- {:l1_tx_not_found, !is_nil(last_l1_transaction_hash) && is_nil(last_l1_tx)} do - Process.send(self(), :reorg_monitor, []) Process.send(self(), :continue, []) {:noreply, @@ -79,7 +79,6 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do block_check_interval: block_check_interval, bridge_contract: env[:bridge_contract], json_rpc_named_arguments: json_rpc_named_arguments, - reorg_monitor_prev_latest: 0, end_block: safe_block, start_block: max(start_block, last_l1_block_number) }} @@ -88,6 +87,10 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do # the process shouldn't start if the start block is not defined {:stop, :normal, %{}} + {:reorg_monitor_started, false} -> + Logger.error("Cannot start this process as Indexer.Fetcher.RollupL1ReorgMonitor is not started.") + {:stop, :normal, %{}} + {:rpc_undefined, true} -> Logger.error("L1 RPC URL is not defined.") {:stop, :normal, %{}} @@ -122,27 +125,6 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do end end - @impl GenServer - def handle_info( - :reorg_monitor, - %{ - block_check_interval: block_check_interval, - json_rpc_named_arguments: json_rpc_named_arguments, - reorg_monitor_prev_latest: prev_latest - } = state - ) do - {:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000) - - if latest < prev_latest do - Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.") - reorg_block_push(latest) - end - - Process.send_after(self(), :reorg_monitor, block_check_interval) - - {:noreply, %{state | reorg_monitor_prev_latest: latest}} - end - @impl GenServer def handle_info( :continue, @@ -183,7 +165,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do ) end - reorg_block = reorg_block_pop() + reorg_block = RollupL1ReorgMonitor.reorg_block_pop() if !is_nil(reorg_block) && reorg_block > 0 do reorg_handle(reorg_block) @@ -215,56 +197,6 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do {:noreply, state} end - defp get_block_check_interval(json_rpc_named_arguments) do - {last_safe_block, _} = get_safe_block(json_rpc_named_arguments) - - first_block = max(last_safe_block - @block_check_interval_range_size, 1) - - with {:ok, first_block_timestamp} <- - Helper.get_block_timestamp_by_number(first_block, json_rpc_named_arguments, 100_000_000), - {:ok, last_safe_block_timestamp} <- - Helper.get_block_timestamp_by_number(last_safe_block, json_rpc_named_arguments, 100_000_000) do - block_check_interval = - ceil((last_safe_block_timestamp - first_block_timestamp) / (last_safe_block - first_block) * 1000 / 2) - - Logger.info("Block check interval is calculated as #{block_check_interval} ms.") - {:ok, block_check_interval, last_safe_block} - else - {:error, error} -> - {:error, "Failed to calculate block check interval due to #{inspect(error)}"} - end - end - - defp get_safe_block(json_rpc_named_arguments) do - case Helper.get_block_number_by_tag("safe", json_rpc_named_arguments) do - {:ok, safe_block} -> - {safe_block, false} - - {:error, :not_found} -> - {:ok, latest_block} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000) - {latest_block, true} - end - end - - defp reorg_block_pop do - table_name = reorg_table_name(@fetcher_name) - - case BoundQueue.pop_front(reorg_queue_get(table_name)) do - {:ok, {block_number, updated_queue}} -> - :ets.insert(table_name, {:queue, updated_queue}) - block_number - - {:error, :empty} -> - nil - end - end - - defp reorg_block_push(block_number) do - table_name = reorg_table_name(@fetcher_name) - {:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number) - :ets.insert(table_name, {:queue, updated_queue}) - end - defp reorg_handle(reorg_block) do {deleted_count, _} = Repo.delete_all(from(b in Bridge, where: b.type == :deposit and b.block_number >= ^reorg_block)) @@ -275,27 +207,4 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do ) end end - - defp reorg_queue_get(table_name) do - if :ets.whereis(table_name) == :undefined do - :ets.new(table_name, [ - :set, - :named_table, - :public, - read_concurrency: true, - write_concurrency: true - ]) - end - - with info when info != :undefined <- :ets.info(table_name), - [{_, value}] <- :ets.lookup(table_name, :queue) do - value - else - _ -> %BoundQueue{} - end - end - - defp reorg_table_name(fetcher_name) do - :"#{fetcher_name}#{:_reorgs}" - end end diff --git a/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1_tokens.ex b/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1_tokens.ex index e341babf09..59e6f98908 100644 --- a/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1_tokens.ex +++ b/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1_tokens.ex @@ -9,7 +9,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1Tokens do import Ecto.Query alias Explorer.Repo - alias Indexer.BufferedTask + alias Indexer.{BufferedTask, Helper} alias Indexer.Fetcher.Zkevm.{Bridge, BridgeL1} @behaviour BufferedTask @@ -20,7 +20,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1Tokens do @doc false def child_spec([init_options, gen_server_options]) do rpc = Application.get_all_env(:indexer)[BridgeL1][:rpc] - json_rpc_named_arguments = Bridge.json_rpc_named_arguments(rpc) + json_rpc_named_arguments = Helper.json_rpc_named_arguments(rpc) merged_init_opts = defaults() diff --git a/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l2.ex b/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l2.ex index aa1b55018c..c469602be1 100644 --- a/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l2.ex +++ b/apps/indexer/lib/indexer/fetcher/zkevm/bridge_l2.ex @@ -12,7 +12,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL2 do import Explorer.Helper, only: [parse_integer: 1] import Indexer.Fetcher.Zkevm.Bridge, - only: [get_logs_all: 3, import_operations: 1, json_rpc_named_arguments: 1, prepare_operations: 3] + only: [get_logs_all: 3, import_operations: 1, prepare_operations: 3] alias Explorer.Chain.Zkevm.{Bridge, Reader} alias Explorer.Repo @@ -75,7 +75,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL2 do %{ bridge_contract: env[:bridge_contract], json_rpc_named_arguments: json_rpc_named_arguments, - json_rpc_named_arguments_l1: json_rpc_named_arguments(rpc_l1), + json_rpc_named_arguments_l1: Helper.json_rpc_named_arguments(rpc_l1), end_block: latest_block, start_block: max(start_block, last_l2_block_number) }} diff --git a/apps/indexer/lib/indexer/helper.ex b/apps/indexer/lib/indexer/helper.ex index 3b1cb18b61..111c518d96 100644 --- a/apps/indexer/lib/indexer/helper.ex +++ b/apps/indexer/lib/indexer/helper.ex @@ -17,6 +17,8 @@ defmodule Indexer.Helper do alias EthereumJSONRPC.Blocks alias Explorer.Chain.Hash + @block_check_interval_range_size 100 + @doc """ Checks whether the given Ethereum address looks correct. The address should begin with 0x prefix and then contain 40 hexadecimal digits (can be in mixed case). @@ -55,6 +57,45 @@ defmodule Indexer.Helper do end end + @doc """ + Calculates average block time in milliseconds (based on the latest 100 blocks) divided by 2. + Sends corresponding requests to the RPC node. + Returns a tuple {:ok, block_check_interval, last_safe_block} + where `last_safe_block` is the number of the recent `safe` or `latest` block (depending on which one is available). + Returns {:error, description} in case of error. + """ + @spec get_block_check_interval(list()) :: {:ok, non_neg_integer(), non_neg_integer()} | {:error, any()} + def get_block_check_interval(json_rpc_named_arguments) do + {last_safe_block, _} = get_safe_block(json_rpc_named_arguments) + + first_block = max(last_safe_block - @block_check_interval_range_size, 1) + + with {:ok, first_block_timestamp} <- + get_block_timestamp_by_number(first_block, json_rpc_named_arguments, 100_000_000), + {:ok, last_safe_block_timestamp} <- + get_block_timestamp_by_number(last_safe_block, json_rpc_named_arguments, 100_000_000) do + block_check_interval = + ceil((last_safe_block_timestamp - first_block_timestamp) / (last_safe_block - first_block) * 1000 / 2) + + Logger.info("Block check interval is calculated as #{block_check_interval} ms.") + {:ok, block_check_interval, last_safe_block} + else + {:error, error} -> + {:error, "Failed to calculate block check interval due to #{inspect(error)}"} + end + end + + defp get_safe_block(json_rpc_named_arguments) do + case get_block_number_by_tag("safe", json_rpc_named_arguments) do + {:ok, safe_block} -> + {safe_block, false} + + {:error, :not_found} -> + {:ok, latest_block} = get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000) + {latest_block, true} + end + end + @doc """ Fetches block number by its tag (e.g. `latest` or `safe`) using RPC request. Performs a specified number of retries (up to) if the first attempt returns error. @@ -87,6 +128,25 @@ defmodule Indexer.Helper do repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries) end + @doc """ + Forms JSON RPC named arguments for the given RPC URL. + """ + @spec json_rpc_named_arguments(binary()) :: list() + def json_rpc_named_arguments(rpc_url) do + [ + transport: EthereumJSONRPC.HTTP, + transport_options: [ + http: EthereumJSONRPC.HTTP.HTTPoison, + url: rpc_url, + http_options: [ + recv_timeout: :timer.minutes(10), + timeout: :timer.minutes(10), + hackney: [pool: :ethereum_jsonrpc] + ] + ] + ] + end + @doc """ Prints a log of progress when handling something splitted to block chunks. """ diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index 10a9de790b..d8f32eed81 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -32,7 +32,6 @@ defmodule Indexer.Supervisor do InternalTransaction, PendingBlockOperationsSanitizer, PendingTransaction, - PolygonEdge, ReplacedTransaction, RootstockData, Token, @@ -132,7 +131,7 @@ defmodule Indexer.Supervisor do {TokenUpdater.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {ReplacedTransaction.Supervisor, [[memory_monitor: memory_monitor]]}, - configure(PolygonEdge.Supervisor, [[memory_monitor: memory_monitor]]), + {Indexer.Fetcher.RollupL1ReorgMonitor.Supervisor, [[memory_monitor: memory_monitor]]}, configure(Indexer.Fetcher.PolygonEdge.Deposit.Supervisor, [[memory_monitor: memory_monitor]]), configure(Indexer.Fetcher.PolygonEdge.DepositExecute.Supervisor, [ [memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments] diff --git a/apps/indexer/lib/indexer/transform/zkevm/bridge.ex b/apps/indexer/lib/indexer/transform/zkevm/bridge.ex index ae9f0f58e7..2d23fcb2e0 100644 --- a/apps/indexer/lib/indexer/transform/zkevm/bridge.ex +++ b/apps/indexer/lib/indexer/transform/zkevm/bridge.ex @@ -6,7 +6,7 @@ defmodule Indexer.Transform.Zkevm.Bridge do require Logger import Indexer.Fetcher.Zkevm.Bridge, - only: [filter_bridge_events: 2, json_rpc_named_arguments: 1, prepare_operations: 4] + only: [filter_bridge_events: 2, prepare_operations: 4] alias Indexer.Fetcher.Zkevm.{BridgeL1, BridgeL2} alias Indexer.Helper @@ -35,7 +35,7 @@ defmodule Indexer.Transform.Zkevm.Bridge do Helper.log_blocks_chunk_handling(start_block, end_block, start_block, end_block, nil, "L2") - json_rpc_named_arguments_l1 = json_rpc_named_arguments(rpc_l1) + json_rpc_named_arguments_l1 = Helper.json_rpc_named_arguments(rpc_l1) block_to_timestamp = Enum.reduce(blocks, %{}, fn block, acc -> Map.put(acc, block.number, block.timestamp) end) diff --git a/config/runtime.exs b/config/runtime.exs index 625b8aa44f..78b5e3f782 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -649,8 +649,6 @@ config :indexer, Indexer.Fetcher.Withdrawal.Supervisor, config :indexer, Indexer.Fetcher.Withdrawal, first_block: System.get_env("WITHDRAWALS_FIRST_BLOCK") -config :indexer, Indexer.Fetcher.PolygonEdge.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_edge" - config :indexer, Indexer.Fetcher.PolygonEdge.Deposit.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_edge" config :indexer, Indexer.Fetcher.PolygonEdge.DepositExecute.Supervisor,