Merge pull request #9098 from blockscout/va-zkevm-bridge

Polygon zkEVM Bridge indexer and API v2 extension
pull/9306/head
Victor Baranov 9 months ago committed by GitHub
commit 6ae6692d02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 16
      apps/block_scout_web/lib/block_scout_web/api_router.ex
  3. 7
      apps/block_scout_web/lib/block_scout_web/chain.ex
  4. 6
      apps/block_scout_web/lib/block_scout_web/channels/polygon_zkevm_confirmed_batch_channel.ex
  5. 2
      apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex
  6. 72
      apps/block_scout_web/lib/block_scout_web/controllers/api/v2/polygon_zkevm_controller.ex
  7. 6
      apps/block_scout_web/lib/block_scout_web/controllers/api/v2/transaction_controller.ex
  8. 59
      apps/block_scout_web/lib/block_scout_web/views/api/v2/polygon_zkevm_view.ex
  9. 2
      apps/block_scout_web/mix.exs
  10. 9
      apps/explorer/config/test.exs
  11. 2
      apps/explorer/lib/explorer/chain/events/publisher.ex
  12. 2
      apps/explorer/lib/explorer/chain/events/subscriber.ex
  13. 16
      apps/explorer/lib/explorer/chain/import/runner/polygon_zkevm/batch_transactions.ex
  14. 101
      apps/explorer/lib/explorer/chain/import/runner/polygon_zkevm/bridge_l1_tokens.ex
  15. 115
      apps/explorer/lib/explorer/chain/import/runner/polygon_zkevm/bridge_operations.ex
  16. 16
      apps/explorer/lib/explorer/chain/import/runner/polygon_zkevm/lifecycle_transactions.ex
  17. 16
      apps/explorer/lib/explorer/chain/import/runner/polygon_zkevm/transaction_batches.ex
  18. 8
      apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex
  19. 6
      apps/explorer/lib/explorer/chain/polygon_zkevm/batch_transaction.ex
  20. 55
      apps/explorer/lib/explorer/chain/polygon_zkevm/bridge.ex
  21. 37
      apps/explorer/lib/explorer/chain/polygon_zkevm/bridge_l1_token.ex
  22. 6
      apps/explorer/lib/explorer/chain/polygon_zkevm/lifecycle_transaction.ex
  23. 321
      apps/explorer/lib/explorer/chain/polygon_zkevm/reader.ex
  24. 6
      apps/explorer/lib/explorer/chain/polygon_zkevm/transaction_batch.ex
  25. 2
      apps/explorer/lib/explorer/chain/transaction.ex
  26. 149
      apps/explorer/lib/explorer/chain/zkevm/reader.ex
  27. 2
      apps/explorer/mix.exs
  28. 46
      apps/explorer/priv/polygon_zkevm/migrations/20231010093238_add_bridge_tables.exs
  29. 103
      apps/indexer/lib/indexer/block/fetcher.ex
  30. 12
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  31. 158
      apps/indexer/lib/indexer/fetcher/polygon_edge.ex
  32. 13
      apps/indexer/lib/indexer/fetcher/polygon_edge/deposit.ex
  33. 2
      apps/indexer/lib/indexer/fetcher/polygon_edge/deposit_execute.ex
  34. 2
      apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal.ex
  35. 13
      apps/indexer/lib/indexer/fetcher/polygon_edge/withdrawal_exit.ex
  36. 413
      apps/indexer/lib/indexer/fetcher/polygon_zkevm/bridge.ex
  37. 210
      apps/indexer/lib/indexer/fetcher/polygon_zkevm/bridge_l1.ex
  38. 78
      apps/indexer/lib/indexer/fetcher/polygon_zkevm/bridge_l1_tokens.ex
  39. 176
      apps/indexer/lib/indexer/fetcher/polygon_zkevm/bridge_l2.ex
  40. 31
      apps/indexer/lib/indexer/fetcher/polygon_zkevm/transaction_batch.ex
  41. 153
      apps/indexer/lib/indexer/fetcher/rollup_l1_reorg_monitor.ex
  42. 100
      apps/indexer/lib/indexer/fetcher/shibarium/l1.ex
  43. 119
      apps/indexer/lib/indexer/helper.ex
  44. 12
      apps/indexer/lib/indexer/supervisor.ex
  45. 10
      apps/indexer/lib/indexer/transform/addresses.ex
  46. 77
      apps/indexer/lib/indexer/transform/polygon_zkevm/bridge.ex
  47. 37
      config/runtime.exs
  48. 32
      config/runtime/dev.exs
  49. 34
      config/runtime/prod.exs
  50. 2
      cspell.json
  51. 13
      docker-compose/envs/common-blockscout.env

@ -12,6 +12,7 @@
- [#9282](https://github.com/blockscout/blockscout/pull/9282) - Add `license_type` to smart contracts
- [#9202](https://github.com/blockscout/blockscout/pull/9202) - Add base and priority fee to gas oracle response
- [#9168](https://github.com/blockscout/blockscout/pull/9168) - Support EIP4844 blobs indexing & API
- [#9098](https://github.com/blockscout/blockscout/pull/9098) - Polygon zkEVM Bridge indexer and API v2 extension
### Fixes

@ -204,7 +204,7 @@ defmodule BlockScoutWeb.ApiRouter do
get("/watchlist", V2.TransactionController, :watchlist_transactions)
if System.get_env("CHAIN_TYPE") == "polygon_zkevm" do
get("/zkevm-batch/:batch_number", V2.TransactionController, :zkevm_batch)
get("/zkevm-batch/:batch_number", V2.TransactionController, :polygon_zkevm_batch)
end
if System.get_env("CHAIN_TYPE") == "suave" do
@ -274,8 +274,8 @@ defmodule BlockScoutWeb.ApiRouter do
get("/indexing-status", V2.MainPageController, :indexing_status)
if System.get_env("CHAIN_TYPE") == "polygon_zkevm" do
get("/zkevm/batches/confirmed", V2.ZkevmController, :batches_confirmed)
get("/zkevm/batches/latest-number", V2.ZkevmController, :batch_latest_number)
get("/zkevm/batches/confirmed", V2.PolygonZkevmController, :batches_confirmed)
get("/zkevm/batches/latest-number", V2.PolygonZkevmController, :batch_latest_number)
end
end
@ -313,9 +313,13 @@ defmodule BlockScoutWeb.ApiRouter do
scope "/zkevm" do
if System.get_env("CHAIN_TYPE") == "polygon_zkevm" do
get("/batches", V2.ZkevmController, :batches)
get("/batches/count", V2.ZkevmController, :batches_count)
get("/batches/:batch_number", V2.ZkevmController, :batch)
get("/batches", V2.PolygonZkevmController, :batches)
get("/batches/count", V2.PolygonZkevmController, :batches_count)
get("/batches/:batch_number", V2.PolygonZkevmController, :batch)
get("/deposits", V2.PolygonZkevmController, :deposits)
get("/deposits/count", V2.PolygonZkevmController, :deposits_count)
get("/withdrawals", V2.PolygonZkevmController, :withdrawals)
get("/withdrawals/count", V2.PolygonZkevmController, :withdrawals_count)
end
end

@ -38,11 +38,10 @@ defmodule BlockScoutWeb.Chain do
Transaction,
Transaction.StateChange,
UserOperation,
Wei,
Withdrawal
Wei
}
alias Explorer.Chain.Zkevm.TransactionBatch
alias Explorer.Chain.PolygonZkevm.TransactionBatch
alias Explorer.PagingOptions
defimpl Poison.Encoder, for: Decimal do
@ -612,7 +611,7 @@ defmodule BlockScoutWeb.Chain do
}
end
defp paging_params(%Withdrawal{index: index}) do
defp paging_params(%{index: index}) do
%{"index" => index}
end

@ -1,10 +1,10 @@
defmodule BlockScoutWeb.ZkevmConfirmedBatchChannel do
defmodule BlockScoutWeb.PolygonZkevmConfirmedBatchChannel do
@moduledoc """
Establishes pub/sub channel for live updates of zkEVM confirmed batch events.
"""
use BlockScoutWeb, :channel
alias BlockScoutWeb.API.V2.ZkevmView
alias BlockScoutWeb.API.V2.PolygonZkevmView
intercept(["new_zkevm_confirmed_batch"])
@ -17,7 +17,7 @@ defmodule BlockScoutWeb.ZkevmConfirmedBatchChannel do
%{batch: batch},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket
) do
rendered_batch = ZkevmView.render("zkevm_batch.json", %{batch: batch, socket: nil})
rendered_batch = PolygonZkevmView.render("zkevm_batch.json", %{batch: batch, socket: nil})
push(socket, "new_zkevm_confirmed_batch", %{
batch: rendered_batch

@ -10,7 +10,7 @@ defmodule BlockScoutWeb.UserSocketV2 do
channel("rewards:*", BlockScoutWeb.RewardChannel)
channel("transactions:*", BlockScoutWeb.TransactionChannel)
channel("tokens:*", BlockScoutWeb.TokenChannel)
channel("zkevm_batches:*", BlockScoutWeb.ZkevmConfirmedBatchChannel)
channel("zkevm_batches:*", BlockScoutWeb.PolygonZkevmConfirmedBatchChannel)
def connect(_params, socket) do
{:ok, socket}

@ -1,4 +1,4 @@
defmodule BlockScoutWeb.API.V2.ZkevmController do
defmodule BlockScoutWeb.API.V2.PolygonZkevmController do
use BlockScoutWeb, :controller
import BlockScoutWeb.Chain,
@ -8,7 +8,7 @@ defmodule BlockScoutWeb.API.V2.ZkevmController do
split_list_by_page: 1
]
alias Explorer.Chain.Zkevm.Reader
alias Explorer.Chain.PolygonZkevm.Reader
action_fallback(BlockScoutWeb.API.V2.FallbackController)
@ -109,4 +109,72 @@ defmodule BlockScoutWeb.API.V2.ZkevmController do
{:error, :not_found} -> 0
end
end
@doc """
Function to handle GET requests to `/api/v2/zkevm/deposits` endpoint.
"""
@spec deposits(Plug.Conn.t(), map()) :: Plug.Conn.t()
def deposits(conn, params) do
{deposits, next_page} =
params
|> paging_options()
|> Keyword.put(:api?, true)
|> Reader.deposits()
|> split_list_by_page()
next_page_params = next_page_params(next_page, deposits, params)
conn
|> put_status(200)
|> render(:polygon_zkevm_bridge_items, %{
items: deposits,
next_page_params: next_page_params
})
end
@doc """
Function to handle GET requests to `/api/v2/zkevm/deposits/count` endpoint.
"""
@spec deposits_count(Plug.Conn.t(), map()) :: Plug.Conn.t()
def deposits_count(conn, _params) do
count = Reader.deposits_count(api?: true)
conn
|> put_status(200)
|> render(:polygon_zkevm_bridge_items_count, %{count: count})
end
@doc """
Function to handle GET requests to `/api/v2/zkevm/withdrawals` endpoint.
"""
@spec withdrawals(Plug.Conn.t(), map()) :: Plug.Conn.t()
def withdrawals(conn, params) do
{withdrawals, next_page} =
params
|> paging_options()
|> Keyword.put(:api?, true)
|> Reader.withdrawals()
|> split_list_by_page()
next_page_params = next_page_params(next_page, withdrawals, params)
conn
|> put_status(200)
|> render(:polygon_zkevm_bridge_items, %{
items: withdrawals,
next_page_params: next_page_params
})
end
@doc """
Function to handle GET requests to `/api/v2/zkevm/withdrawals/count` endpoint.
"""
@spec withdrawals_count(Plug.Conn.t(), map()) :: Plug.Conn.t()
def withdrawals_count(conn, _params) do
count = Reader.withdrawals_count(api?: true)
conn
|> put_status(200)
|> render(:polygon_zkevm_bridge_items_count, %{count: count})
end
end

@ -31,7 +31,7 @@ defmodule BlockScoutWeb.API.V2.TransactionController do
alias Explorer.Chain
alias Explorer.Chain.Beacon.Reader, as: BeaconReader
alias Explorer.Chain.{Hash, Transaction}
alias Explorer.Chain.Zkevm.Reader
alias Explorer.Chain.PolygonZkevm.Reader
alias Indexer.Fetcher.FirstTraceOnDemand
action_fallback(BlockScoutWeb.API.V2.FallbackController)
@ -155,8 +155,8 @@ defmodule BlockScoutWeb.API.V2.TransactionController do
Function to handle GET requests to `/api/v2/transactions/zkevm-batch/:batch_number` endpoint.
It renders the list of L2 transactions bound to the specified batch.
"""
@spec zkevm_batch(Plug.Conn.t(), map()) :: Plug.Conn.t()
def zkevm_batch(conn, %{"batch_number" => batch_number} = _params) do
@spec polygon_zkevm_batch(Plug.Conn.t(), map()) :: Plug.Conn.t()
def polygon_zkevm_batch(conn, %{"batch_number" => batch_number} = _params) do
transactions =
batch_number
|> Reader.batch_transactions(api?: true)

@ -1,4 +1,4 @@
defmodule BlockScoutWeb.API.V2.ZkevmView do
defmodule BlockScoutWeb.API.V2.PolygonZkevmView do
use BlockScoutWeb, :view
@doc """
@ -68,6 +68,56 @@ defmodule BlockScoutWeb.API.V2.ZkevmView do
number
end
@doc """
Function to render GET requests to `/api/v2/zkevm/deposits` and `/api/v2/zkevm/withdrawals` endpoints.
"""
def render("polygon_zkevm_bridge_items.json", %{
items: items,
next_page_params: next_page_params
}) do
env = Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonZkevm.BridgeL1]
%{
items:
Enum.map(items, fn item ->
l1_token = if is_nil(Map.get(item, :l1_token)), do: %{}, else: Map.get(item, :l1_token)
l2_token = if is_nil(Map.get(item, :l2_token)), do: %{}, else: Map.get(item, :l2_token)
decimals =
cond do
not is_nil(Map.get(l1_token, :decimals)) -> Map.get(l1_token, :decimals)
not is_nil(Map.get(l2_token, :decimals)) -> Map.get(l2_token, :decimals)
true -> env[:native_decimals]
end
symbol =
cond do
not is_nil(Map.get(l1_token, :symbol)) -> Map.get(l1_token, :symbol)
not is_nil(Map.get(l2_token, :symbol)) -> Map.get(l2_token, :symbol)
true -> env[:native_symbol]
end
%{
"block_number" => item.block_number,
"index" => item.index,
"l1_transaction_hash" => item.l1_transaction_hash,
"timestamp" => item.block_timestamp,
"l2_transaction_hash" => item.l2_transaction_hash,
"value" => fractional(Decimal.new(item.amount), Decimal.new(decimals)),
"symbol" => symbol
}
end),
next_page_params: next_page_params
}
end
@doc """
Function to render GET requests to `/api/v2/zkevm/deposits/count` and `/api/v2/zkevm/withdrawals/count` endpoints.
"""
def render("polygon_zkevm_bridge_items_count.json", %{count: count}) do
count
end
defp batch_status(batch) do
sequence_id = Map.get(batch, :sequence_id)
verify_id = Map.get(batch, :verify_id)
@ -79,6 +129,13 @@ defmodule BlockScoutWeb.API.V2.ZkevmView do
end
end
defp fractional(%Decimal{} = amount, %Decimal{} = decimals) do
amount.sign
|> Decimal.new(amount.coef, amount.exp - Decimal.to_integer(decimals))
|> Decimal.normalize()
|> Decimal.to_string(:normal)
end
defp render_zkevm_batches(batches) do
Enum.map(batches, fn batch ->
sequence_tx_hash =

@ -24,7 +24,7 @@ defmodule BlockScoutWeb.Mixfile do
],
start_permanent: Mix.env() == :prod,
version: "6.1.0",
xref: [exclude: [Explorer.Chain.Zkevm.Reader, Explorer.Chain.Beacon.Reader]]
xref: [exclude: [Explorer.Chain.PolygonZkevm.Reader, Explorer.Chain.Beacon.Reader]]
]
end

@ -64,6 +64,15 @@ for repo <- [
pool_size: 1
end
config :explorer, Explorer.Repo.PolygonZkevm,
database: "explorer_test",
hostname: "localhost",
pool: Ecto.Adapters.SQL.Sandbox,
# Default of `5_000` was too low for `BlockFetcher` test
ownership_timeout: :timer.minutes(1),
timeout: :timer.seconds(60),
queue_target: 1000
config :logger, :explorer,
level: :warn,
path: Path.absname("logs/test/explorer.log")

@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Publisher do
Publishes events related to the Chain context.
"""
@allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number polygon_edge_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
def broadcast(_data, false), do: :ok

@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Subscriber do
Subscribes to events related to the Chain context.
"""
@allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number polygon_edge_reorg_block token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@allowed_broadcast_types ~w(catchup realtime on_demand contract_verification_result)a

@ -1,13 +1,13 @@
defmodule Explorer.Chain.Import.Runner.Zkevm.BatchTransactions do
defmodule Explorer.Chain.Import.Runner.PolygonZkevm.BatchTransactions do
@moduledoc """
Bulk imports `t:Explorer.Chain.Zkevm.BatchTransaction.t/0`.
Bulk imports `t:Explorer.Chain.PolygonZkevm.BatchTransaction.t/0`.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Import
alias Explorer.Chain.Zkevm.BatchTransaction
alias Explorer.Chain.PolygonZkevm.BatchTransaction
alias Explorer.Prometheus.Instrumenter
@behaviour Import.Runner
@ -21,7 +21,7 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.BatchTransactions do
def ecto_schema_module, do: BatchTransaction
@impl Import.Runner
def option_key, do: :zkevm_batch_transactions
def option_key, do: :polygon_zkevm_batch_transactions
@impl Import.Runner
@spec imported_table_row() :: %{:value_description => binary(), :value_type => binary()}
@ -42,12 +42,12 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.BatchTransactions do
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :insert_zkevm_batch_transactions, fn repo, _ ->
Multi.run(multi, :insert_polygon_zkevm_batch_transactions, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end,
:block_referencing,
:zkevm_batch_transactions,
:zkevm_batch_transactions
:polygon_zkevm_batch_transactions,
:polygon_zkevm_batch_transactions
)
end)
end
@ -59,7 +59,7 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.BatchTransactions do
{:ok, [BatchTransaction.t()]}
| {:error, [Changeset.t()]}
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = _options) when is_list(changes_list) do
# Enforce Zkevm.BatchTransaction ShareLocks order (see docs: sharelock.md)
# Enforce PolygonZkevm.BatchTransaction ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, & &1.hash)
{:ok, inserted} =

@ -0,0 +1,101 @@
defmodule Explorer.Chain.Import.Runner.PolygonZkevm.BridgeL1Tokens do
@moduledoc """
Bulk imports `t:Explorer.Chain.PolygonZkevm.BridgeL1Token.t/0`.
"""
require Ecto.Query
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Import
alias Explorer.Chain.PolygonZkevm.BridgeL1Token
alias Explorer.Prometheus.Instrumenter
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type imported :: [BridgeL1Token.t()]
@impl Import.Runner
def ecto_schema_module, do: BridgeL1Token
@impl Import.Runner
def option_key, do: :polygon_zkevm_bridge_l1_tokens
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, %{timestamps: timestamps} = options) do
insert_options =
options
|> Map.get(option_key(), %{})
|> Map.take(~w(on_conflict timeout)a)
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :insert_polygon_zkevm_bridge_l1_tokens, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end,
:block_referencing,
:polygon_zkevm_bridge_l1_tokens,
:polygon_zkevm_bridge_l1_tokens
)
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert(Repo.t(), [map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) ::
{:ok, [BridgeL1Token.t()]}
| {:error, [Changeset.t()]}
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Enforce BridgeL1Token ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address})
{:ok, inserted} =
Import.insert_changes_list(
repo,
ordered_changes_list,
conflict_target: :address,
on_conflict: on_conflict,
for: BridgeL1Token,
returning: true,
timeout: timeout,
timestamps: timestamps
)
{:ok, inserted}
end
defp default_on_conflict do
from(
t in BridgeL1Token,
update: [
set: [
decimals: fragment("EXCLUDED.decimals"),
symbol: fragment("EXCLUDED.symbol"),
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", t.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", t.updated_at)
]
],
where:
fragment(
"(EXCLUDED.decimals, EXCLUDED.symbol) IS DISTINCT FROM (?, ?)",
t.decimals,
t.symbol
)
)
end
end

@ -0,0 +1,115 @@
defmodule Explorer.Chain.Import.Runner.PolygonZkevm.BridgeOperations do
@moduledoc """
Bulk imports `t:Explorer.Chain.PolygonZkevm.Bridge.t/0`.
"""
require Ecto.Query
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Import
alias Explorer.Chain.PolygonZkevm.Bridge, as: PolygonZkevmBridge
alias Explorer.Prometheus.Instrumenter
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type imported :: [PolygonZkevmBridge.t()]
@impl Import.Runner
def ecto_schema_module, do: PolygonZkevmBridge
@impl Import.Runner
def option_key, do: :polygon_zkevm_bridge_operations
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, %{timestamps: timestamps} = options) do
insert_options =
options
|> Map.get(option_key(), %{})
|> Map.take(~w(on_conflict timeout)a)
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :insert_polygon_zkevm_bridge_operations, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end,
:block_referencing,
:polygon_zkevm_bridge_operations,
:polygon_zkevm_bridge_operations
)
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert(Repo.t(), [map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) ::
{:ok, [PolygonZkevmBridge.t()]}
| {:error, [Changeset.t()]}
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Enforce PolygonZkevmBridge ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, &{&1.type, &1.index})
{:ok, inserted} =
Import.insert_changes_list(
repo,
ordered_changes_list,
conflict_target: [:type, :index],
on_conflict: on_conflict,
for: PolygonZkevmBridge,
returning: true,
timeout: timeout,
timestamps: timestamps
)
{:ok, inserted}
end
defp default_on_conflict do
from(
op in PolygonZkevmBridge,
update: [
set: [
# Don't update `type` as it is part of the composite primary key and used for the conflict target
# Don't update `index` as it is part of the composite primary key and used for the conflict target
l1_transaction_hash: fragment("COALESCE(EXCLUDED.l1_transaction_hash, ?)", op.l1_transaction_hash),
l2_transaction_hash: fragment("COALESCE(EXCLUDED.l2_transaction_hash, ?)", op.l2_transaction_hash),
l1_token_id: fragment("COALESCE(EXCLUDED.l1_token_id, ?)", op.l1_token_id),
l1_token_address: fragment("COALESCE(EXCLUDED.l1_token_address, ?)", op.l1_token_address),
l2_token_address: fragment("COALESCE(EXCLUDED.l2_token_address, ?)", op.l2_token_address),
amount: fragment("EXCLUDED.amount"),
block_number: fragment("COALESCE(EXCLUDED.block_number, ?)", op.block_number),
block_timestamp: fragment("COALESCE(EXCLUDED.block_timestamp, ?)", op.block_timestamp),
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", op.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", op.updated_at)
]
],
where:
fragment(
"(EXCLUDED.l1_transaction_hash, EXCLUDED.l2_transaction_hash, EXCLUDED.l1_token_id, EXCLUDED.l1_token_address, EXCLUDED.l2_token_address, EXCLUDED.amount, EXCLUDED.block_number, EXCLUDED.block_timestamp) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?, ?)",
op.l1_transaction_hash,
op.l2_transaction_hash,
op.l1_token_id,
op.l1_token_address,
op.l2_token_address,
op.amount,
op.block_number,
op.block_timestamp
)
)
end
end

@ -1,13 +1,13 @@
defmodule Explorer.Chain.Import.Runner.Zkevm.LifecycleTransactions do
defmodule Explorer.Chain.Import.Runner.PolygonZkevm.LifecycleTransactions do
@moduledoc """
Bulk imports `t:Explorer.Chain.Zkevm.LifecycleTransaction.t/0`.
Bulk imports `t:Explorer.Chain.PolygonZkevm.LifecycleTransaction.t/0`.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Import
alias Explorer.Chain.Zkevm.LifecycleTransaction
alias Explorer.Chain.PolygonZkevm.LifecycleTransaction
alias Explorer.Prometheus.Instrumenter
import Ecto.Query, only: [from: 2]
@ -23,7 +23,7 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.LifecycleTransactions do
def ecto_schema_module, do: LifecycleTransaction
@impl Import.Runner
def option_key, do: :zkevm_lifecycle_transactions
def option_key, do: :polygon_zkevm_lifecycle_transactions
@impl Import.Runner
@spec imported_table_row() :: %{:value_description => binary(), :value_type => binary()}
@ -44,12 +44,12 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.LifecycleTransactions do
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :insert_zkevm_lifecycle_transactions, fn repo, _ ->
Multi.run(multi, :insert_polygon_zkevm_lifecycle_transactions, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end,
:block_referencing,
:zkevm_lifecycle_transactions,
:zkevm_lifecycle_transactions
:polygon_zkevm_lifecycle_transactions,
:polygon_zkevm_lifecycle_transactions
)
end)
end
@ -63,7 +63,7 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.LifecycleTransactions do
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Enforce Zkevm.LifecycleTransaction ShareLocks order (see docs: sharelock.md)
# Enforce PolygonZkevm.LifecycleTransaction ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, & &1.id)
{:ok, inserted} =

@ -1,13 +1,13 @@
defmodule Explorer.Chain.Import.Runner.Zkevm.TransactionBatches do
defmodule Explorer.Chain.Import.Runner.PolygonZkevm.TransactionBatches do
@moduledoc """
Bulk imports `t:Explorer.Chain.Zkevm.TransactionBatch.t/0`.
Bulk imports `t:Explorer.Chain.PolygonZkevm.TransactionBatch.t/0`.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Import
alias Explorer.Chain.Zkevm.TransactionBatch
alias Explorer.Chain.PolygonZkevm.TransactionBatch
alias Explorer.Prometheus.Instrumenter
import Ecto.Query, only: [from: 2]
@ -23,7 +23,7 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.TransactionBatches do
def ecto_schema_module, do: TransactionBatch
@impl Import.Runner
def option_key, do: :zkevm_transaction_batches
def option_key, do: :polygon_zkevm_transaction_batches
@impl Import.Runner
@spec imported_table_row() :: %{:value_description => binary(), :value_type => binary()}
@ -44,12 +44,12 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.TransactionBatches do
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :insert_zkevm_transaction_batches, fn repo, _ ->
Multi.run(multi, :insert_polygon_zkevm_transaction_batches, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end,
:block_referencing,
:zkevm_transaction_batches,
:zkevm_transaction_batches
:polygon_zkevm_transaction_batches,
:polygon_zkevm_transaction_batches
)
end)
end
@ -63,7 +63,7 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.TransactionBatches do
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Enforce Zkevm.TransactionBatch ShareLocks order (see docs: sharelock.md)
# Enforce PolygonZkevm.TransactionBatch ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, & &1.number)
{:ok, inserted} =

@ -26,9 +26,11 @@ defmodule Explorer.Chain.Import.Stage.BlockReferencing do
]
@polygon_zkevm_runners [
Runner.Zkevm.LifecycleTransactions,
Runner.Zkevm.TransactionBatches,
Runner.Zkevm.BatchTransactions
Runner.PolygonZkevm.LifecycleTransactions,
Runner.PolygonZkevm.TransactionBatches,
Runner.PolygonZkevm.BatchTransactions,
Runner.PolygonZkevm.BridgeL1Tokens,
Runner.PolygonZkevm.BridgeOperations
]
@shibarium_runners [

@ -1,15 +1,15 @@
defmodule Explorer.Chain.Zkevm.BatchTransaction do
defmodule Explorer.Chain.PolygonZkevm.BatchTransaction do
@moduledoc "Models a list of transactions related to a batch for zkEVM."
use Explorer.Schema
alias Explorer.Chain.{Hash, Transaction}
alias Explorer.Chain.Zkevm.TransactionBatch
alias Explorer.Chain.PolygonZkevm.TransactionBatch
@required_attrs ~w(batch_number hash)a
@primary_key false
typed_schema "zkevm_batch_l2_transactions" do
typed_schema "polygon_zkevm_batch_l2_transactions" do
belongs_to(:batch, TransactionBatch, foreign_key: :batch_number, references: :number, type: :integer, null: false)
belongs_to(:l2_transaction, Transaction,

@ -0,0 +1,55 @@
defmodule Explorer.Chain.PolygonZkevm.Bridge do
@moduledoc "Models a bridge operation for Polygon zkEVM."
use Explorer.Schema
alias Explorer.Chain.{Block, Hash, Token}
alias Explorer.Chain.PolygonZkevm.BridgeL1Token
@optional_attrs ~w(l1_transaction_hash l2_transaction_hash l1_token_id l2_token_address block_number block_timestamp)a
@required_attrs ~w(type index amount)a
@type t :: %__MODULE__{
type: String.t(),
index: non_neg_integer(),
l1_transaction_hash: Hash.t() | nil,
l2_transaction_hash: Hash.t() | nil,
l1_token: %Ecto.Association.NotLoaded{} | BridgeL1Token.t() | nil,
l1_token_id: non_neg_integer() | nil,
l1_token_address: Hash.Address.t() | nil,
l2_token: %Ecto.Association.NotLoaded{} | Token.t() | nil,
l2_token_address: Hash.Address.t() | nil,
amount: Decimal.t(),
block_number: Block.block_number() | nil,
block_timestamp: DateTime.t() | nil
}
@primary_key false
schema "polygon_zkevm_bridge" do
field(:type, Ecto.Enum, values: [:deposit, :withdrawal], primary_key: true)
field(:index, :integer, primary_key: true)
field(:l1_transaction_hash, Hash.Full)
field(:l2_transaction_hash, Hash.Full)
belongs_to(:l1_token, BridgeL1Token, foreign_key: :l1_token_id, references: :id, type: :integer)
field(:l1_token_address, Hash.Address)
belongs_to(:l2_token, Token, foreign_key: :l2_token_address, references: :contract_address_hash, type: Hash.Address)
field(:amount, :decimal)
field(:block_number, :integer)
field(:block_timestamp, :utc_datetime_usec)
timestamps()
end
@doc """
Checks that the `attrs` are valid.
"""
@spec changeset(Ecto.Schema.t(), map()) :: Ecto.Schema.t()
def changeset(%__MODULE__{} = operations, attrs \\ %{}) do
operations
|> cast(attrs, @required_attrs ++ @optional_attrs)
|> validate_required(@required_attrs)
|> unique_constraint([:type, :index])
|> foreign_key_constraint(:l1_token_id)
end
end

@ -0,0 +1,37 @@
defmodule Explorer.Chain.PolygonZkevm.BridgeL1Token do
@moduledoc "Models a bridge token on L1 for Polygon zkEVM."
use Explorer.Schema
alias Explorer.Chain.Hash
@optional_attrs ~w(decimals symbol)a
@required_attrs ~w(address)a
@type t :: %__MODULE__{
address: Hash.Address.t(),
decimals: non_neg_integer() | nil,
symbol: String.t() | nil
}
@primary_key {:id, :id, autogenerate: true}
schema "polygon_zkevm_bridge_l1_tokens" do
field(:address, Hash.Address)
field(:decimals, :integer)
field(:symbol, :string)
timestamps()
end
@doc """
Checks that the `attrs` are valid.
"""
@spec changeset(Ecto.Schema.t(), map()) :: Ecto.Schema.t()
def changeset(%__MODULE__{} = tokens, attrs \\ %{}) do
tokens
|> cast(attrs, @required_attrs ++ @optional_attrs)
|> validate_required(@required_attrs)
|> unique_constraint(:id)
end
end

@ -1,15 +1,15 @@
defmodule Explorer.Chain.Zkevm.LifecycleTransaction do
defmodule Explorer.Chain.PolygonZkevm.LifecycleTransaction do
@moduledoc "Models an L1 lifecycle transaction for zkEVM."
use Explorer.Schema
alias Explorer.Chain.Hash
alias Explorer.Chain.Zkevm.TransactionBatch
alias Explorer.Chain.PolygonZkevm.TransactionBatch
@required_attrs ~w(id hash is_verify)a
@primary_key false
typed_schema "zkevm_lifecycle_l1_transactions" do
typed_schema "polygon_zkevm_lifecycle_l1_transactions" do
field(:id, :integer, primary_key: true, null: false)
field(:hash, Hash.Full, null: false)
field(:is_verify, :boolean, null: false)

@ -0,0 +1,321 @@
defmodule Explorer.Chain.PolygonZkevm.Reader do
@moduledoc "Contains read functions for zkevm modules."
import Ecto.Query,
only: [
from: 2,
limit: 2,
order_by: 2,
where: 2,
where: 3
]
import Explorer.Chain, only: [select_repo: 1]
alias Explorer.Chain.PolygonZkevm.{BatchTransaction, Bridge, BridgeL1Token, LifecycleTransaction, TransactionBatch}
alias Explorer.{Chain, PagingOptions, Repo}
alias Indexer.Helper
@doc """
Reads a batch by its number from database.
If the number is :latest, gets the latest batch from `polygon_zkevm_transaction_batches` table.
Returns {:error, :not_found} in case the batch is not found.
"""
@spec batch(non_neg_integer() | :latest, list()) :: {:ok, map()} | {:error, :not_found}
def batch(number, options \\ [])
def batch(:latest, options) when is_list(options) do
TransactionBatch
|> order_by(desc: :number)
|> limit(1)
|> select_repo(options).one()
|> case do
nil -> {:error, :not_found}
batch -> {:ok, batch}
end
end
def batch(number, options) when is_list(options) do
necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
TransactionBatch
|> where(number: ^number)
|> Chain.join_associations(necessity_by_association)
|> select_repo(options).one()
|> case do
nil -> {:error, :not_found}
batch -> {:ok, batch}
end
end
@doc """
Reads a list of batches from `polygon_zkevm_transaction_batches` table.
"""
@spec batches(list()) :: list()
def batches(options \\ []) do
necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
base_query =
from(tb in TransactionBatch,
order_by: [desc: tb.number]
)
query =
if Keyword.get(options, :confirmed?, false) do
base_query
|> Chain.join_associations(necessity_by_association)
|> where([tb], not is_nil(tb.sequence_id) and tb.sequence_id > 0)
|> limit(10)
else
paging_options = Keyword.get(options, :paging_options, Chain.default_paging_options())
base_query
|> Chain.join_associations(necessity_by_association)
|> page_batches(paging_options)
|> limit(^paging_options.page_size)
end
select_repo(options).all(query)
end
@doc """
Reads a list of L2 transaction hashes from `polygon_zkevm_batch_l2_transactions` table.
"""
@spec batch_transactions(non_neg_integer(), list()) :: list()
def batch_transactions(batch_number, options \\ []) do
query = from(bts in BatchTransaction, where: bts.batch_number == ^batch_number)
select_repo(options).all(query)
end
@doc """
Tries to read L1 token data (address, symbol, decimals) for the given addresses
from the database. If the data for an address is not found in Explorer.Chain.PolygonZkevm.BridgeL1Token,
the address is returned in the list inside the tuple (the second item of the tuple).
The first item of the returned tuple contains `L1 token address -> L1 token data` map.
"""
@spec get_token_data_from_db(list()) :: {map(), list()}
def get_token_data_from_db(token_addresses) do
# try to read token symbols and decimals from the database
query =
from(
t in BridgeL1Token,
where: t.address in ^token_addresses,
select: {t.address, t.decimals, t.symbol}
)
token_data =
query
|> Repo.all()
|> Enum.reduce(%{}, fn {address, decimals, symbol}, acc ->
token_address = Helper.address_hash_to_string(address, true)
Map.put(acc, token_address, %{symbol: symbol, decimals: decimals})
end)
token_addresses_for_rpc =
token_addresses
|> Enum.reject(fn address ->
Map.has_key?(token_data, Helper.address_hash_to_string(address, true))
end)
{token_data, token_addresses_for_rpc}
end
@doc """
Gets last known L1 item (deposit) from polygon_zkevm_bridge table.
Returns block number and L1 transaction hash bound to that deposit.
If not found, returns zero block number and nil as the transaction hash.
"""
@spec last_l1_item() :: {non_neg_integer(), binary() | nil}
def last_l1_item do
query =
from(b in Bridge,
select: {b.block_number, b.l1_transaction_hash},
where: b.type == :deposit and not is_nil(b.block_number),
order_by: [desc: b.index],
limit: 1
)
query
|> Repo.one()
|> Kernel.||({0, nil})
end
@doc """
Gets last known L2 item (withdrawal) from polygon_zkevm_bridge table.
Returns block number and L2 transaction hash bound to that withdrawal.
If not found, returns zero block number and nil as the transaction hash.
"""
@spec last_l2_item() :: {non_neg_integer(), binary() | nil}
def last_l2_item do
query =
from(b in Bridge,
select: {b.block_number, b.l2_transaction_hash},
where: b.type == :withdrawal and not is_nil(b.block_number),
order_by: [desc: b.index],
limit: 1
)
query
|> Repo.one()
|> Kernel.||({0, nil})
end
@doc """
Gets the number of the latest batch with defined verify_id from `polygon_zkevm_transaction_batches` table.
Returns 0 if not found.
"""
@spec last_verified_batch_number() :: non_neg_integer()
def last_verified_batch_number do
query =
from(tb in TransactionBatch,
select: tb.number,
where: not is_nil(tb.verify_id),
order_by: [desc: tb.number],
limit: 1
)
query
|> Repo.one()
|> Kernel.||(0)
end
@doc """
Reads a list of L1 transactions by their hashes from `polygon_zkevm_lifecycle_l1_transactions` table.
"""
@spec lifecycle_transactions(list()) :: list()
def lifecycle_transactions(l1_tx_hashes) do
query =
from(
lt in LifecycleTransaction,
select: {lt.hash, lt.id},
where: lt.hash in ^l1_tx_hashes
)
Repo.all(query, timeout: :infinity)
end
@doc """
Determines ID of the future lifecycle transaction by reading `polygon_zkevm_lifecycle_l1_transactions` table.
"""
@spec next_id() :: non_neg_integer()
def next_id do
query =
from(lt in LifecycleTransaction,
select: lt.id,
order_by: [desc: lt.id],
limit: 1
)
last_id =
query
|> Repo.one()
|> Kernel.||(0)
last_id + 1
end
@doc """
Builds `L1 token address -> L1 token id` map for the given token addresses.
The info is taken from Explorer.Chain.PolygonZkevm.BridgeL1Token.
If an address is not in the table, it won't be in the resulting map.
"""
@spec token_addresses_to_ids_from_db(list()) :: map()
def token_addresses_to_ids_from_db(addresses) do
query = from(t in BridgeL1Token, select: {t.address, t.id}, where: t.address in ^addresses)
query
|> Repo.all(timeout: :infinity)
|> Enum.reduce(%{}, fn {address, id}, acc ->
Map.put(acc, Helper.address_hash_to_string(address), id)
end)
end
@doc """
Retrieves a list of Polygon zkEVM deposits (completed and unclaimed)
sorted in descending order of the index.
"""
@spec deposits(list()) :: list()
def deposits(options \\ []) do
paging_options = Keyword.get(options, :paging_options, Chain.default_paging_options())
base_query =
from(
b in Bridge,
left_join: t1 in assoc(b, :l1_token),
left_join: t2 in assoc(b, :l2_token),
where: b.type == :deposit and not is_nil(b.l1_transaction_hash),
preload: [l1_token: t1, l2_token: t2],
order_by: [desc: b.index]
)
base_query
|> page_deposits_or_withdrawals(paging_options)
|> limit(^paging_options.page_size)
|> select_repo(options).all()
end
@doc """
Returns a total number of Polygon zkEVM deposits (completed and unclaimed).
"""
@spec deposits_count(list()) :: term() | nil
def deposits_count(options \\ []) do
query =
from(
b in Bridge,
where: b.type == :deposit and not is_nil(b.l1_transaction_hash)
)
select_repo(options).aggregate(query, :count, timeout: :infinity)
end
@doc """
Retrieves a list of Polygon zkEVM withdrawals (completed and unclaimed)
sorted in descending order of the index.
"""
@spec withdrawals(list()) :: list()
def withdrawals(options \\ []) do
paging_options = Keyword.get(options, :paging_options, Chain.default_paging_options())
base_query =
from(
b in Bridge,
left_join: t1 in assoc(b, :l1_token),
left_join: t2 in assoc(b, :l2_token),
where: b.type == :withdrawal and not is_nil(b.l2_transaction_hash),
preload: [l1_token: t1, l2_token: t2],
order_by: [desc: b.index]
)
base_query
|> page_deposits_or_withdrawals(paging_options)
|> limit(^paging_options.page_size)
|> select_repo(options).all()
end
@doc """
Returns a total number of Polygon zkEVM withdrawals (completed and unclaimed).
"""
@spec withdrawals_count(list()) :: term() | nil
def withdrawals_count(options \\ []) do
query =
from(
b in Bridge,
where: b.type == :withdrawal and not is_nil(b.l2_transaction_hash)
)
select_repo(options).aggregate(query, :count, timeout: :infinity)
end
defp page_batches(query, %PagingOptions{key: nil}), do: query
defp page_batches(query, %PagingOptions{key: {number}}) do
from(tb in query, where: tb.number < ^number)
end
defp page_deposits_or_withdrawals(query, %PagingOptions{key: nil}), do: query
defp page_deposits_or_withdrawals(query, %PagingOptions{key: {index}}) do
from(b in query, where: b.index < ^index)
end
end

@ -1,17 +1,17 @@
defmodule Explorer.Chain.Zkevm.TransactionBatch do
defmodule Explorer.Chain.PolygonZkevm.TransactionBatch do
@moduledoc "Models a batch of transactions for zkEVM."
use Explorer.Schema
alias Explorer.Chain.Hash
alias Explorer.Chain.Zkevm.{BatchTransaction, LifecycleTransaction}
alias Explorer.Chain.PolygonZkevm.{BatchTransaction, LifecycleTransaction}
@optional_attrs ~w(sequence_id verify_id)a
@required_attrs ~w(number timestamp l2_transactions_count global_exit_root acc_input_hash state_root)a
@primary_key false
typed_schema "zkevm_transaction_batches" do
typed_schema "polygon_zkevm_transaction_batches" do
field(:number, :integer, primary_key: true, null: false)
field(:timestamp, :utc_datetime_usec)
field(:l2_transactions_count, :integer)

@ -14,8 +14,8 @@ defmodule Explorer.Chain.Transaction.Schema do
Wei
}
alias Explorer.Chain.PolygonZkevm.BatchTransaction
alias Explorer.Chain.Transaction.{Fork, Status}
alias Explorer.Chain.Zkevm.BatchTransaction
@chain_type_fields (case Application.compile_env(:explorer, :chain_type) do
"ethereum" ->

@ -1,149 +0,0 @@
defmodule Explorer.Chain.Zkevm.Reader do
@moduledoc "Contains read functions for zkevm modules."
import Ecto.Query,
only: [
from: 2,
limit: 2,
order_by: 2,
where: 2,
where: 3
]
import Explorer.Chain, only: [select_repo: 1]
alias Explorer.Chain.Zkevm.{BatchTransaction, LifecycleTransaction, TransactionBatch}
alias Explorer.{Chain, PagingOptions, Repo}
@doc """
Reads a batch by its number from database.
If the number is :latest, gets the latest batch from `zkevm_transaction_batches` table.
Returns {:error, :not_found} in case the batch is not found.
"""
@spec batch(non_neg_integer() | :latest, list()) :: {:ok, map()} | {:error, :not_found}
def batch(number, options \\ [])
def batch(:latest, options) when is_list(options) do
TransactionBatch
|> order_by(desc: :number)
|> limit(1)
|> select_repo(options).one()
|> case do
nil -> {:error, :not_found}
batch -> {:ok, batch}
end
end
def batch(number, options) when is_list(options) do
necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
TransactionBatch
|> where(number: ^number)
|> Chain.join_associations(necessity_by_association)
|> select_repo(options).one()
|> case do
nil -> {:error, :not_found}
batch -> {:ok, batch}
end
end
@doc """
Reads a list of batches from `zkevm_transaction_batches` table.
"""
@spec batches(list()) :: list()
def batches(options \\ []) do
necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
base_query =
from(tb in TransactionBatch,
order_by: [desc: tb.number]
)
query =
if Keyword.get(options, :confirmed?, false) do
base_query
|> Chain.join_associations(necessity_by_association)
|> where([tb], not is_nil(tb.sequence_id) and tb.sequence_id > 0)
|> limit(10)
else
paging_options = Keyword.get(options, :paging_options, Chain.default_paging_options())
base_query
|> Chain.join_associations(necessity_by_association)
|> page_batches(paging_options)
|> limit(^paging_options.page_size)
end
select_repo(options).all(query)
end
@doc """
Reads a list of L2 transaction hashes from `zkevm_batch_l2_transactions` table.
"""
@spec batch_transactions(non_neg_integer(), list()) :: list()
def batch_transactions(batch_number, options \\ []) do
query = from(bts in BatchTransaction, where: bts.batch_number == ^batch_number)
select_repo(options).all(query)
end
@doc """
Gets the number of the latest batch with defined verify_id from `zkevm_transaction_batches` table.
Returns 0 if not found.
"""
@spec last_verified_batch_number() :: non_neg_integer()
def last_verified_batch_number do
query =
from(tb in TransactionBatch,
select: tb.number,
where: not is_nil(tb.verify_id),
order_by: [desc: tb.number],
limit: 1
)
query
|> Repo.one()
|> Kernel.||(0)
end
@doc """
Reads a list of L1 transactions by their hashes from `zkevm_lifecycle_l1_transactions` table.
"""
@spec lifecycle_transactions(list()) :: list()
def lifecycle_transactions(l1_tx_hashes) do
query =
from(
lt in LifecycleTransaction,
select: {lt.hash, lt.id},
where: lt.hash in ^l1_tx_hashes
)
Repo.all(query, timeout: :infinity)
end
@doc """
Determines ID of the future lifecycle transaction by reading `zkevm_lifecycle_l1_transactions` table.
"""
@spec next_id() :: non_neg_integer()
def next_id do
query =
from(lt in LifecycleTransaction,
select: lt.id,
order_by: [desc: lt.id],
limit: 1
)
last_id =
query
|> Repo.one()
|> Kernel.||(0)
last_id + 1
end
defp page_batches(query, %PagingOptions{key: nil}), do: query
defp page_batches(query, %PagingOptions{key: {number}}) do
from(tb in query, where: tb.number < ^number)
end
end

@ -25,7 +25,7 @@ defmodule Explorer.Mixfile do
],
start_permanent: Mix.env() == :prod,
version: "6.1.0",
xref: [exclude: [BlockScoutWeb.WebRouter.Helpers]]
xref: [exclude: [BlockScoutWeb.WebRouter.Helpers, Indexer.Helper]]
]
end

@ -0,0 +1,46 @@
defmodule Explorer.Repo.PolygonZkevm.Migrations.AddBridgeTables do
use Ecto.Migration
def change do
create table(:polygon_zkevm_bridge_l1_tokens, primary_key: false) do
add(:id, :identity, primary_key: true, start_value: 0, increment: 1)
add(:address, :bytea, null: false)
add(:decimals, :smallint, null: true, default: nil)
add(:symbol, :string, size: 16, null: true, default: nil)
timestamps(null: false, type: :utc_datetime_usec)
end
create(unique_index(:polygon_zkevm_bridge_l1_tokens, :address))
execute(
"CREATE TYPE polygon_zkevm_bridge_op_type AS ENUM ('deposit', 'withdrawal')",
"DROP TYPE polygon_zkevm_bridge_op_type"
)
create table(:polygon_zkevm_bridge, primary_key: false) do
add(:type, :polygon_zkevm_bridge_op_type, null: false, primary_key: true)
add(:index, :integer, null: false, primary_key: true)
add(:l1_transaction_hash, :bytea, null: true)
add(:l2_transaction_hash, :bytea, null: true)
add(
:l1_token_id,
references(:polygon_zkevm_bridge_l1_tokens, on_delete: :restrict, on_update: :update_all, type: :identity),
null: true
)
add(:l1_token_address, :bytea, null: true)
add(:l2_token_address, :bytea, null: true)
add(:amount, :numeric, precision: 100, null: false)
add(:block_number, :bigint, null: true)
add(:block_timestamp, :"timestamp without time zone", null: true)
timestamps(null: false, type: :utc_datetime_usec)
end
create(index(:polygon_zkevm_bridge, :l1_token_address))
rename(table(:zkevm_lifecycle_l1_transactions), to: table(:polygon_zkevm_lifecycle_l1_transactions))
rename(table(:zkevm_transaction_batches), to: table(:polygon_zkevm_transaction_batches))
rename(table(:zkevm_batch_l2_transactions), to: table(:polygon_zkevm_batch_l2_transactions))
end
end

@ -16,6 +16,7 @@ defmodule Indexer.Block.Fetcher do
alias Explorer.Chain.Cache.Blocks, as: BlocksCache
alias Explorer.Chain.Cache.{Accounts, BlockNumber, Transactions, Uncles}
alias Indexer.Block.Fetcher.Receipts
alias Indexer.Fetcher.PolygonZkevm.BridgeL1Tokens, as: PolygonZkevmBridgeL1Tokens
alias Indexer.Fetcher.TokenInstance.Realtime, as: TokenInstanceRealtime
alias Indexer.Fetcher.{
@ -47,6 +48,7 @@ defmodule Indexer.Block.Fetcher do
alias Indexer.Transform.Shibarium.Bridge, as: ShibariumBridge
alias Indexer.Transform.Blocks, as: TransformBlocks
alias Indexer.Transform.PolygonZkevm.Bridge, as: PolygonZkevmBridge
@type address_hash_to_fetched_balance_block_number :: %{String.t() => Block.block_number()}
@ -158,6 +160,11 @@ defmodule Indexer.Block.Fetcher do
do: ShibariumBridge.parse(blocks, transactions_with_receipts, logs),
else: []
),
polygon_zkevm_bridge_operations =
if(callback_module == Indexer.Block.Realtime.Fetcher,
do: PolygonZkevmBridge.parse(blocks, logs),
else: []
),
%FetchedBeneficiaries{params_set: beneficiary_params_set, errors: beneficiaries_errors} =
fetch_beneficiaries(blocks, transactions_with_receipts, json_rpc_named_arguments),
addresses =
@ -170,7 +177,8 @@ defmodule Indexer.Block.Fetcher do
token_transfers: token_transfers,
transactions: transactions_with_receipts,
transaction_actions: transaction_actions,
withdrawals: withdrawals_params
withdrawals: withdrawals_params,
polygon_zkevm_bridge_operations: polygon_zkevm_bridge_operations
}),
coin_balances_params_set =
%{
@ -204,30 +212,17 @@ defmodule Indexer.Block.Fetcher do
withdrawals: %{params: withdrawals_params},
token_instances: %{params: token_instances}
},
import_options =
(case Application.get_env(:explorer, :chain_type) do
"polygon_edge" ->
basic_import_options
|> Map.put_new(:polygon_edge_withdrawals, %{params: polygon_edge_withdrawals})
|> Map.put_new(:polygon_edge_deposit_executes, %{params: polygon_edge_deposit_executes})
"ethereum" ->
basic_import_options
|> Map.put_new(:beacon_blob_transactions, %{
params: transactions_with_receipts |> Enum.filter(&Map.has_key?(&1, :max_fee_per_blob_gas))
})
"shibarium" ->
basic_import_options
|> Map.put_new(:shibarium_bridge_operations, %{params: shibarium_bridge_operations})
_ ->
basic_import_options
end),
chain_type_import_options = %{
transactions_with_receipts: transactions_with_receipts,
polygon_edge_withdrawals: polygon_edge_withdrawals,
polygon_edge_deposit_executes: polygon_edge_deposit_executes,
polygon_zkevm_bridge_operations: polygon_zkevm_bridge_operations,
shibarium_bridge_operations: shibarium_bridge_operations
},
{:ok, inserted} <-
__MODULE__.import(
state,
import_options
import_options(basic_import_options, chain_type_import_options)
),
{:tx_actions, {:ok, inserted_tx_actions}} <-
{:tx_actions,
@ -250,6 +245,38 @@ defmodule Indexer.Block.Fetcher do
end
end
defp import_options(basic_import_options, %{
transactions_with_receipts: transactions_with_receipts,
polygon_edge_withdrawals: polygon_edge_withdrawals,
polygon_edge_deposit_executes: polygon_edge_deposit_executes,
polygon_zkevm_bridge_operations: polygon_zkevm_bridge_operations,
shibarium_bridge_operations: shibarium_bridge_operations
}) do
case Application.get_env(:explorer, :chain_type) do
"ethereum" ->
basic_import_options
|> Map.put_new(:beacon_blob_transactions, %{
params: transactions_with_receipts |> Enum.filter(&Map.has_key?(&1, :max_fee_per_blob_gas))
})
"polygon_edge" ->
basic_import_options
|> Map.put_new(:polygon_edge_withdrawals, %{params: polygon_edge_withdrawals})
|> Map.put_new(:polygon_edge_deposit_executes, %{params: polygon_edge_deposit_executes})
"polygon_zkevm" ->
basic_import_options
|> Map.put_new(:polygon_zkevm_bridge_operations, %{params: polygon_zkevm_bridge_operations})
"shibarium" ->
basic_import_options
|> Map.put_new(:shibarium_bridge_operations, %{params: shibarium_bridge_operations})
_ ->
basic_import_options
end
end
defp update_block_cache([]), do: :ok
defp update_block_cache(blocks) when is_list(blocks) do
@ -315,6 +342,19 @@ defmodule Indexer.Block.Fetcher do
def async_import_token_instances(_), do: :ok
def async_import_blobs(%{blocks: blocks}) do
timestamps =
blocks
|> Enum.filter(fn block -> block |> Map.get(:blob_gas_used, 0) > 0 end)
|> Enum.map(&Map.get(&1, :timestamp))
if !Enum.empty?(timestamps) do
Blob.async_fetch(timestamps)
end
end
def async_import_blobs(_), do: :ok
def async_import_block_rewards([]), do: :ok
def async_import_block_rewards(errors) when is_list(errors) do
@ -397,18 +437,17 @@ defmodule Indexer.Block.Fetcher do
def async_import_replaced_transactions(_), do: :ok
def async_import_blobs(%{blocks: blocks}) do
timestamps =
blocks
|> Enum.filter(fn block -> block |> Map.get(:blob_gas_used, 0) > 0 end)
|> Enum.map(&Map.get(&1, :timestamp))
if !Enum.empty?(timestamps) do
Blob.async_fetch(timestamps)
end
@doc """
Fills a buffer of L1 token addresses to handle it asynchronously in
the Indexer.Fetcher.PolygonZkevm.BridgeL1Tokens module. The addresses are
taken from the `operations` list.
"""
@spec async_import_polygon_zkevm_bridge_l1_tokens(map()) :: :ok
def async_import_polygon_zkevm_bridge_l1_tokens(%{polygon_zkevm_bridge_operations: operations}) do
PolygonZkevmBridgeL1Tokens.async_fetch(operations)
end
def async_import_blobs(_), do: :ok
def async_import_polygon_zkevm_bridge_l1_tokens(_), do: :ok
defp block_reward_errors_to_block_numbers(block_reward_errors) when is_list(block_reward_errors) do
Enum.map(block_reward_errors, &block_reward_error_to_block_number/1)

@ -22,6 +22,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
async_import_token_balances: 1,
async_import_token_instances: 1,
async_import_uncles: 1,
async_import_polygon_zkevm_bridge_l1_tokens: 1,
fetch_and_import_range: 2
]
@ -36,6 +37,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
alias Indexer.Block.Realtime.TaskSupervisor
alias Indexer.Fetcher.{CoinBalance, CoinBalanceDailyUpdater}
alias Indexer.Fetcher.PolygonEdge.{DepositExecute, Withdrawal}
alias Indexer.Fetcher.PolygonZkevm.BridgeL2, as: PolygonZkevmBridgeL2
alias Indexer.Fetcher.Shibarium.L2, as: ShibariumBridgeL2
alias Indexer.Prometheus
alias Indexer.Transform.Addresses
@ -292,6 +294,9 @@ defmodule Indexer.Block.Realtime.Fetcher do
# we need to remove all rows from `shibarium_bridge` table previously written starting from reorg block number
remove_shibarium_assets_by_number(block_number_to_fetch)
# we need to remove all rows from `polygon_zkevm_bridge` table previously written starting from reorg block number
remove_polygon_zkevm_assets_by_number(block_number_to_fetch)
# give previous fetch attempt (for same block number) a chance to finish
# before fetching again, to reduce block consensus mistakes
:timer.sleep(@reorg_delay)
@ -311,6 +316,12 @@ defmodule Indexer.Block.Realtime.Fetcher do
end
end
defp remove_polygon_zkevm_assets_by_number(block_number_to_fetch) do
if Application.get_env(:explorer, :chain_type) == "polygon_zkevm" do
PolygonZkevmBridgeL2.reorg_handle(block_number_to_fetch)
end
end
defp remove_shibarium_assets_by_number(block_number_to_fetch) do
if Application.get_env(:explorer, :chain_type) == "shibarium" do
ShibariumBridgeL2.reorg_handle(block_number_to_fetch)
@ -441,6 +452,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
async_import_uncles(imported)
async_import_replaced_transactions(imported)
async_import_blobs(imported)
async_import_polygon_zkevm_bridge_l1_tokens(imported)
end
defp balances(

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge do
Contains common functions for PolygonEdge.* fetchers.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher
@ -15,13 +17,11 @@ defmodule Indexer.Fetcher.PolygonEdge do
import Explorer.Helper, only: [parse_integer: 1]
alias Explorer.Chain.Events.Publisher
alias Explorer.{Chain, Repo}
alias Indexer.{BoundQueue, Helper}
alias Indexer.Helper
alias Indexer.Fetcher.PolygonEdge.{Deposit, DepositExecute, Withdrawal, WithdrawalExit}
@fetcher_name :polygon_edge
@block_check_interval_range_size 100
def child_spec(start_link_arguments) do
spec = %{
@ -41,29 +41,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
@impl GenServer
def init(_args) do
Logger.metadata(fetcher: @fetcher_name)
modules_using_reorg_monitor = [Deposit, WithdrawalExit]
reorg_monitor_not_needed =
modules_using_reorg_monitor
|> Enum.all?(fn module ->
is_nil(Application.get_all_env(:indexer)[module][:start_block_l1])
end)
if reorg_monitor_not_needed do
:ignore
else
polygon_edge_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc]
json_rpc_named_arguments = json_rpc_named_arguments(polygon_edge_l1_rpc)
{:ok, block_check_interval, _} = get_block_check_interval(json_rpc_named_arguments)
Process.send(self(), :reorg_monitor, [])
{:ok,
%{block_check_interval: block_check_interval, json_rpc_named_arguments: json_rpc_named_arguments, prev_latest: 0}}
end
:ignore
end
@spec init_l1(
@ -78,8 +56,6 @@ defmodule Indexer.Fetcher.PolygonEdge do
def init_l1(table, env, pid, contract_address, contract_name, table_name, entity_name)
when table in [Explorer.Chain.PolygonEdge.Deposit, Explorer.Chain.PolygonEdge.WithdrawalExit] do
with {:start_block_l1_undefined, false} <- {:start_block_l1_undefined, is_nil(env[:start_block_l1])},
{:reorg_monitor_started, true} <-
{:reorg_monitor_started, !is_nil(Process.whereis(Indexer.Fetcher.PolygonEdge))},
polygon_edge_l1_rpc = Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc],
{:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(polygon_edge_l1_rpc)},
{:contract_is_valid, true} <- {:contract_is_valid, Helper.address_correct?(contract_address)},
@ -94,7 +70,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
Helper.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments, 100_000_000),
{:l1_tx_not_found, false} <- {:l1_tx_not_found, !is_nil(last_l1_transaction_hash) && is_nil(last_l1_tx)},
{:ok, block_check_interval, last_safe_block} <-
get_block_check_interval(json_rpc_named_arguments) do
Helper.get_block_check_interval(json_rpc_named_arguments) do
start_block = max(start_block_l1, last_l1_block_number)
Process.send(pid, :continue, [])
@ -112,10 +88,6 @@ defmodule Indexer.Fetcher.PolygonEdge do
# the process shouldn't start if the start block is not defined
:ignore
{:reorg_monitor_started, false} ->
Logger.error("Cannot start this process as reorg monitor in Indexer.Fetcher.PolygonEdge is not started.")
:ignore
{:rpc_l1_undefined, true} ->
Logger.error("L1 RPC URL is not defined.")
:ignore
@ -217,29 +189,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
end
end
@impl GenServer
def handle_info(
:reorg_monitor,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
prev_latest: prev_latest
} = state
) do
{:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
if latest < prev_latest do
Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.")
Publisher.broadcast([{:polygon_edge_reorg_block, latest}], :realtime)
end
Process.send_after(self(), :reorg_monitor, block_check_interval)
{:noreply, %{state | prev_latest: latest}}
end
@spec handle_continue(map(), binary(), Deposit | WithdrawalExit, atom()) :: {:noreply, map()}
@spec handle_continue(map(), binary(), Deposit | WithdrawalExit) :: {:noreply, map()}
def handle_continue(
%{
contract_address: contract_address,
@ -249,8 +199,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
json_rpc_named_arguments: json_rpc_named_arguments
} = state,
event_signature,
calling_module,
fetcher_name
calling_module
)
when calling_module in [Deposit, WithdrawalExit] do
time_before = Timex.now()
@ -295,14 +244,7 @@ defmodule Indexer.Fetcher.PolygonEdge do
)
end
reorg_block = reorg_block_pop(fetcher_name)
if !is_nil(reorg_block) && reorg_block > 0 do
reorg_handle(reorg_block, calling_module)
{:halt, if(reorg_block <= chunk_end, do: reorg_block - 1, else: chunk_end)}
else
{:cont, chunk_end}
end
{:cont, chunk_end}
end)
new_start_block = last_written_block + 1
@ -540,26 +482,6 @@ defmodule Indexer.Fetcher.PolygonEdge do
Repo.all(query)
end
defp get_block_check_interval(json_rpc_named_arguments) do
{last_safe_block, _} = get_safe_block(json_rpc_named_arguments)
first_block = max(last_safe_block - @block_check_interval_range_size, 1)
with {:ok, first_block_timestamp} <-
Helper.get_block_timestamp_by_number(first_block, json_rpc_named_arguments, 100_000_000),
{:ok, last_safe_block_timestamp} <-
Helper.get_block_timestamp_by_number(last_safe_block, json_rpc_named_arguments, 100_000_000) do
block_check_interval =
ceil((last_safe_block_timestamp - first_block_timestamp) / (last_safe_block - first_block) * 1000 / 2)
Logger.info("Block check interval is calculated as #{block_check_interval} ms.")
{:ok, block_check_interval, last_safe_block}
else
{:error, error} ->
{:error, "Failed to calculate block check interval due to #{inspect(error)}"}
end
end
defp get_safe_block(json_rpc_named_arguments) do
case Helper.get_block_number_by_tag("safe", json_rpc_named_arguments) do
{:ok, safe_block} ->
@ -667,72 +589,8 @@ defmodule Indexer.Fetcher.PolygonEdge do
{events, event_name}
end
defp log_deleted_rows_count(reorg_block, count, table_name) do
if count > 0 do
Logger.warning(
"As L1 reorg was detected, all rows with l1_block_number >= #{reorg_block} were removed from the #{table_name} table. Number of removed rows: #{count}."
)
end
end
@spec repeated_request(list(), any(), list(), non_neg_integer()) :: {:ok, any()} | {:error, atom()}
def repeated_request(req, error_message, json_rpc_named_arguments, retries) do
Helper.repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries)
end
defp reorg_block_pop(fetcher_name) do
table_name = reorg_table_name(fetcher_name)
case BoundQueue.pop_front(reorg_queue_get(table_name)) do
{:ok, {block_number, updated_queue}} ->
:ets.insert(table_name, {:queue, updated_queue})
block_number
{:error, :empty} ->
nil
end
end
@spec reorg_block_push(atom(), non_neg_integer()) :: no_return()
def reorg_block_push(fetcher_name, block_number) do
table_name = reorg_table_name(fetcher_name)
{:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number)
:ets.insert(table_name, {:queue, updated_queue})
end
defp reorg_handle(reorg_block, calling_module) do
{table, table_name} =
if calling_module == Deposit do
{Explorer.Chain.PolygonEdge.Deposit, "polygon_edge_deposits"}
else
{Explorer.Chain.PolygonEdge.WithdrawalExit, "polygon_edge_withdrawal_exits"}
end
{deleted_count, _} = Repo.delete_all(from(item in table, where: item.l1_block_number >= ^reorg_block))
log_deleted_rows_count(reorg_block, deleted_count, table_name)
end
defp reorg_queue_get(table_name) do
if :ets.whereis(table_name) == :undefined do
:ets.new(table_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
with info when info != :undefined <- :ets.info(table_name),
[{_, value}] <- :ets.lookup(table_name, :queue) do
value
else
_ -> %BoundQueue{}
end
end
defp reorg_table_name(fetcher_name) do
:"#{fetcher_name}#{:_reorgs}"
end
end

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do
Fills polygon_edge_deposits DB table.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher
@ -14,7 +16,6 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do
alias ABI.TypeDecoder
alias EthereumJSONRPC.Block.ByNumber
alias EthereumJSONRPC.Blocks
alias Explorer.Chain.Events.Subscriber
alias Explorer.Chain.PolygonEdge.Deposit
alias Indexer.Fetcher.PolygonEdge
@ -47,8 +48,6 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do
env = Application.get_all_env(:indexer)[__MODULE__]
Subscriber.to(:polygon_edge_reorg_block, :realtime)
PolygonEdge.init_l1(
Deposit,
env,
@ -62,13 +61,7 @@ defmodule Indexer.Fetcher.PolygonEdge.Deposit do
@impl GenServer
def handle_info(:continue, state) do
PolygonEdge.handle_continue(state, @state_synced_event, __MODULE__, @fetcher_name)
end
@impl GenServer
def handle_info({:chain_event, :polygon_edge_reorg_block, :realtime, block_number}, state) do
PolygonEdge.reorg_block_push(@fetcher_name, block_number)
{:noreply, state}
PolygonEdge.handle_continue(state, @state_synced_event, __MODULE__)
end
@impl GenServer

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.DepositExecute do
Fills polygon_edge_deposit_executes DB table.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.Withdrawal do
Fills polygon_edge_withdrawals DB table.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher

@ -3,6 +3,8 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do
Fills polygon_edge_withdrawal_exits DB table.
"""
# todo: this module is deprecated and should be removed
use GenServer
use Indexer.Fetcher
@ -10,7 +12,6 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do
import EthereumJSONRPC, only: [quantity_to_integer: 1]
alias Explorer.Chain.Events.Subscriber
alias Explorer.Chain.PolygonEdge.WithdrawalExit
alias Indexer.Fetcher.PolygonEdge
@ -40,8 +41,6 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do
env = Application.get_all_env(:indexer)[__MODULE__]
Subscriber.to(:polygon_edge_reorg_block, :realtime)
PolygonEdge.init_l1(
WithdrawalExit,
env,
@ -55,13 +54,7 @@ defmodule Indexer.Fetcher.PolygonEdge.WithdrawalExit do
@impl GenServer
def handle_info(:continue, state) do
PolygonEdge.handle_continue(state, @exit_processed_event, __MODULE__, @fetcher_name)
end
@impl GenServer
def handle_info({:chain_event, :polygon_edge_reorg_block, :realtime, block_number}, state) do
PolygonEdge.reorg_block_push(@fetcher_name, block_number)
{:noreply, state}
PolygonEdge.handle_continue(state, @exit_processed_event, __MODULE__)
end
@impl GenServer

@ -0,0 +1,413 @@
defmodule Indexer.Fetcher.PolygonZkevm.Bridge do
@moduledoc """
Contains common functions for Indexer.Fetcher.PolygonZkevm.Bridge* modules.
"""
require Logger
import EthereumJSONRPC,
only: [
integer_to_quantity: 1,
json_rpc: 2,
quantity_to_integer: 1,
request: 1,
timestamp_to_datetime: 1
]
import Explorer.Chain.SmartContract, only: [burn_address_hash_string: 0]
import Explorer.Helper, only: [decode_data: 2]
alias EthereumJSONRPC.Logs
alias Explorer.Chain
alias Explorer.Chain.PolygonZkevm.Reader
alias Explorer.SmartContract.Reader, as: SmartContractReader
alias Indexer.Helper
alias Indexer.Transform.Addresses
# 32-byte signature of the event BridgeEvent(uint8 leafType, uint32 originNetwork, address originAddress, uint32 destinationNetwork, address destinationAddress, uint256 amount, bytes metadata, uint32 depositCount)
@bridge_event "0x501781209a1f8899323b96b4ef08b168df93e0a90c673d1e4cce39366cb62f9b"
@bridge_event_params [{:uint, 8}, {:uint, 32}, :address, {:uint, 32}, :address, {:uint, 256}, :bytes, {:uint, 32}]
# 32-byte signature of the event ClaimEvent(uint32 index, uint32 originNetwork, address originAddress, address destinationAddress, uint256 amount)
@claim_event "0x25308c93ceeed162da955b3f7ce3e3f93606579e40fb92029faa9efe27545983"
@claim_event_params [{:uint, 32}, {:uint, 32}, :address, :address, {:uint, 256}]
@symbol_method_selector "95d89b41"
@decimals_method_selector "313ce567"
@erc20_abi [
%{
"constant" => true,
"inputs" => [],
"name" => "symbol",
"outputs" => [%{"name" => "", "type" => "string"}],
"payable" => false,
"stateMutability" => "view",
"type" => "function"
},
%{
"constant" => true,
"inputs" => [],
"name" => "decimals",
"outputs" => [%{"name" => "", "type" => "uint8"}],
"payable" => false,
"stateMutability" => "view",
"type" => "function"
}
]
@doc """
Filters the given list of events keeping only `BridgeEvent` and `ClaimEvent` ones
emitted by the bridge contract.
"""
@spec filter_bridge_events(list(), binary()) :: list()
def filter_bridge_events(events, bridge_contract) do
Enum.filter(events, fn event ->
Helper.address_hash_to_string(event.address_hash, true) == bridge_contract and
Enum.member?([@bridge_event, @claim_event], Helper.log_topic_to_string(event.first_topic))
end)
end
@doc """
Fetches `BridgeEvent` and `ClaimEvent` events of the bridge contract from an RPC node
for the given range of blocks.
"""
@spec get_logs_all({non_neg_integer(), non_neg_integer()}, binary(), list()) :: list()
def get_logs_all({chunk_start, chunk_end}, bridge_contract, json_rpc_named_arguments) do
{:ok, result} =
get_logs(
chunk_start,
chunk_end,
bridge_contract,
[[@bridge_event, @claim_event]],
json_rpc_named_arguments
)
Logs.elixir_to_params(result)
end
defp get_logs(from_block, to_block, address, topics, json_rpc_named_arguments, retries \\ 100_000_000) do
processed_from_block = if is_integer(from_block), do: integer_to_quantity(from_block), else: from_block
processed_to_block = if is_integer(to_block), do: integer_to_quantity(to_block), else: to_block
req =
request(%{
id: 0,
method: "eth_getLogs",
params: [
%{
:fromBlock => processed_from_block,
:toBlock => processed_to_block,
:address => address,
:topics => topics
}
]
})
error_message = &"Cannot fetch logs for the block range #{from_block}..#{to_block}. Error: #{inspect(&1)}"
Helper.repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries)
end
@doc """
Imports the given zkEVM bridge operations into database.
Used by Indexer.Fetcher.PolygonZkevm.BridgeL1 and Indexer.Fetcher.PolygonZkevm.BridgeL2 fetchers.
Doesn't return anything.
"""
@spec import_operations(list()) :: no_return()
def import_operations(operations) do
addresses =
Addresses.extract_addresses(%{
polygon_zkevm_bridge_operations: operations
})
{:ok, _} =
Chain.import(%{
addresses: %{params: addresses, on_conflict: :nothing},
polygon_zkevm_bridge_operations: %{params: operations},
timeout: :infinity
})
end
@doc """
Converts the list of zkEVM bridge events to the list of operations
preparing them for importing to the database.
"""
@spec prepare_operations(list(), list() | nil, list(), map() | nil) :: list()
def prepare_operations(events, json_rpc_named_arguments, json_rpc_named_arguments_l1, block_to_timestamp \\ nil) do
{block_to_timestamp, token_address_to_id} =
if is_nil(block_to_timestamp) do
bridge_events = Enum.filter(events, fn event -> event.first_topic == @bridge_event end)
l1_token_addresses =
bridge_events
|> Enum.reduce(%MapSet{}, fn event, acc ->
case bridge_event_parse(event) do
{{nil, _}, _, _} -> acc
{{token_address, nil}, _, _} -> MapSet.put(acc, token_address)
end
end)
|> MapSet.to_list()
{
blocks_to_timestamps(bridge_events, json_rpc_named_arguments),
token_addresses_to_ids(l1_token_addresses, json_rpc_named_arguments_l1)
}
else
# this is called in realtime
{block_to_timestamp, %{}}
end
Enum.map(events, fn event ->
{index, l1_token_id, l1_token_address, l2_token_address, amount, block_number, block_timestamp} =
if event.first_topic == @bridge_event do
{
{l1_token_address, l2_token_address},
amount,
deposit_count
} = bridge_event_parse(event)
l1_token_id = Map.get(token_address_to_id, l1_token_address)
block_number = quantity_to_integer(event.block_number)
block_timestamp = Map.get(block_to_timestamp, block_number)
# credo:disable-for-lines:2 Credo.Check.Refactor.Nesting
l1_token_address =
if is_nil(l1_token_id) do
l1_token_address
end
{deposit_count, l1_token_id, l1_token_address, l2_token_address, amount, block_number, block_timestamp}
else
[index, _origin_network, _origin_address, _destination_address, amount] =
decode_data(event.data, @claim_event_params)
{index, nil, nil, nil, amount, nil, nil}
end
is_l1 = json_rpc_named_arguments == json_rpc_named_arguments_l1
result = %{
type: operation_type(event.first_topic, is_l1),
index: index,
amount: amount
}
transaction_hash_field =
if is_l1 do
:l1_transaction_hash
else
:l2_transaction_hash
end
result
|> extend_result(transaction_hash_field, event.transaction_hash)
|> extend_result(:l1_token_id, l1_token_id)
|> extend_result(:l1_token_address, l1_token_address)
|> extend_result(:l2_token_address, l2_token_address)
|> extend_result(:block_number, block_number)
|> extend_result(:block_timestamp, block_timestamp)
end)
end
defp blocks_to_timestamps(events, json_rpc_named_arguments) do
events
|> Helper.get_blocks_by_events(json_rpc_named_arguments, 100_000_000)
|> Enum.reduce(%{}, fn block, acc ->
block_number = quantity_to_integer(Map.get(block, "number"))
timestamp = timestamp_to_datetime(Map.get(block, "timestamp"))
Map.put(acc, block_number, timestamp)
end)
end
defp bridge_event_parse(event) do
[
leaf_type,
origin_network,
origin_address,
_destination_network,
_destination_address,
amount,
_metadata,
deposit_count
] = decode_data(event.data, @bridge_event_params)
{token_address_by_origin_address(origin_address, origin_network, leaf_type), amount, deposit_count}
end
defp operation_type(first_topic, is_l1) do
if first_topic == @bridge_event do
if is_l1, do: :deposit, else: :withdrawal
else
if is_l1, do: :withdrawal, else: :deposit
end
end
@doc """
Fetches L1 token data for the given token addresses,
builds `L1 token address -> L1 token id` map for them,
and writes the data to the database. Returns the resulting map.
"""
@spec token_addresses_to_ids(list(), list()) :: map()
def token_addresses_to_ids(l1_token_addresses, json_rpc_named_arguments) do
token_data =
l1_token_addresses
|> get_token_data(json_rpc_named_arguments)
tokens_existing =
token_data
|> Map.keys()
|> Reader.token_addresses_to_ids_from_db()
tokens_to_insert =
token_data
|> Enum.reject(fn {address, _} -> Map.has_key?(tokens_existing, address) end)
|> Enum.map(fn {address, data} -> Map.put(data, :address, address) end)
{:ok, inserts} =
Chain.import(%{
polygon_zkevm_bridge_l1_tokens: %{params: tokens_to_insert},
timeout: :infinity
})
tokens_inserted = Map.get(inserts, :insert_polygon_zkevm_bridge_l1_tokens, [])
# we need to query not inserted tokens from DB separately as they
# could be inserted by another module at the same time (a race condition).
# this is an unlikely case but we handle it here as well
tokens_not_inserted =
tokens_to_insert
|> Enum.reject(fn token ->
Enum.any?(tokens_inserted, fn inserted -> token.address == Helper.address_hash_to_string(inserted.address) end)
end)
|> Enum.map(& &1.address)
tokens_inserted_outside = Reader.token_addresses_to_ids_from_db(tokens_not_inserted)
tokens_inserted
|> Enum.reduce(%{}, fn t, acc -> Map.put(acc, Helper.address_hash_to_string(t.address), t.id) end)
|> Map.merge(tokens_existing)
|> Map.merge(tokens_inserted_outside)
end
defp token_address_by_origin_address(origin_address, origin_network, leaf_type) do
with true <- leaf_type != 1 and origin_network <= 1,
token_address = "0x" <> Base.encode16(origin_address, case: :lower),
true <- token_address != burn_address_hash_string() do
if origin_network == 0 do
# this is L1 address
{token_address, nil}
else
# this is L2 address
{nil, token_address}
end
else
_ -> {nil, nil}
end
end
defp get_token_data(token_addresses, json_rpc_named_arguments) do
# first, we're trying to read token data from the DB.
# if tokens are not in the DB, read them through RPC.
token_addresses
|> Reader.get_token_data_from_db()
|> get_token_data_from_rpc(json_rpc_named_arguments)
end
defp get_token_data_from_rpc({token_data, token_addresses}, json_rpc_named_arguments) do
{requests, responses} = get_token_data_request_symbol_decimals(token_addresses, json_rpc_named_arguments)
requests
|> Enum.zip(responses)
|> Enum.reduce(token_data, fn {request, {status, response} = _resp}, token_data_acc ->
if status == :ok do
response = parse_response(response)
address = Helper.address_hash_to_string(request.contract_address, true)
new_data = get_new_data(token_data_acc[address] || %{}, request, response)
Map.put(token_data_acc, address, new_data)
else
token_data_acc
end
end)
end
defp get_token_data_request_symbol_decimals(token_addresses, json_rpc_named_arguments) do
requests =
token_addresses
|> Enum.map(fn address ->
# we will call symbol() and decimals() public getters
Enum.map([@symbol_method_selector, @decimals_method_selector], fn method_id ->
%{
contract_address: address,
method_id: method_id,
args: []
}
end)
end)
|> List.flatten()
{responses, error_messages} = read_contracts_with_retries(requests, @erc20_abi, json_rpc_named_arguments, 3)
if !Enum.empty?(error_messages) or Enum.count(requests) != Enum.count(responses) do
Logger.warning(
"Cannot read symbol and decimals of an ERC-20 token contract. Error messages: #{Enum.join(error_messages, ", ")}. Addresses: #{Enum.join(token_addresses, ", ")}"
)
end
{requests, responses}
end
defp read_contracts_with_retries(requests, abi, json_rpc_named_arguments, retries_left) when retries_left > 0 do
responses = SmartContractReader.query_contracts(requests, abi, json_rpc_named_arguments: json_rpc_named_arguments)
error_messages =
Enum.reduce(responses, [], fn {status, error_message}, acc ->
acc ++
if status == :error do
[error_message]
else
[]
end
end)
if Enum.empty?(error_messages) do
{responses, []}
else
retries_left = retries_left - 1
if retries_left == 0 do
{responses, Enum.uniq(error_messages)}
else
:timer.sleep(1000)
read_contracts_with_retries(requests, abi, json_rpc_named_arguments, retries_left)
end
end
end
defp get_new_data(data, request, response) do
if atomized_key(request.method_id) == :symbol do
Map.put(data, :symbol, response)
else
Map.put(data, :decimals, response)
end
end
defp extend_result(result, _key, value) when is_nil(value), do: result
defp extend_result(result, key, value) when is_atom(key), do: Map.put(result, key, value)
defp atomized_key("symbol"), do: :symbol
defp atomized_key("decimals"), do: :decimals
defp atomized_key(@symbol_method_selector), do: :symbol
defp atomized_key(@decimals_method_selector), do: :decimals
defp parse_response(response) do
case response do
[item] -> item
items -> items
end
end
end

@ -0,0 +1,210 @@
defmodule Indexer.Fetcher.PolygonZkevm.BridgeL1 do
@moduledoc """
Fills polygon_zkevm_bridge DB table.
"""
use GenServer
use Indexer.Fetcher
require Logger
import Ecto.Query
import Explorer.Helper, only: [parse_integer: 1]
import Indexer.Fetcher.PolygonZkevm.Bridge,
only: [get_logs_all: 3, import_operations: 1, prepare_operations: 3]
alias Explorer.Chain.PolygonZkevm.{Bridge, Reader}
alias Explorer.Repo
alias Indexer.Fetcher.RollupL1ReorgMonitor
alias Indexer.Helper
@eth_get_logs_range_size 1000
@fetcher_name :polygon_zkevm_bridge_l1
def child_spec(start_link_arguments) do
spec = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
restart: :transient,
type: :worker
}
Supervisor.child_spec(spec, [])
end
def start_link(args, gen_server_options \\ []) do
GenServer.start_link(__MODULE__, args, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl GenServer
def init(_args) do
{:ok, %{}, {:continue, :ok}}
end
@impl GenServer
def handle_continue(_, state) do
Logger.metadata(fetcher: @fetcher_name)
# two seconds pause needed to avoid exceeding Supervisor restart intensity when DB issues
Process.send_after(self(), :init_with_delay, 2000)
{:noreply, state}
end
@impl GenServer
def handle_info(:init_with_delay, _state) do
env = Application.get_all_env(:indexer)[__MODULE__]
with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])},
{:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(RollupL1ReorgMonitor))},
rpc = env[:rpc],
{:rpc_undefined, false} <- {:rpc_undefined, is_nil(rpc)},
{:bridge_contract_address_is_valid, true} <-
{:bridge_contract_address_is_valid, Helper.address_correct?(env[:bridge_contract])},
start_block = parse_integer(env[:start_block]),
false <- is_nil(start_block),
true <- start_block > 0,
{last_l1_block_number, last_l1_transaction_hash} = Reader.last_l1_item(),
json_rpc_named_arguments = Helper.json_rpc_named_arguments(rpc),
{:ok, block_check_interval, safe_block} <- Helper.get_block_check_interval(json_rpc_named_arguments),
{:start_block_valid, true, _, _} <-
{:start_block_valid,
(start_block <= last_l1_block_number || last_l1_block_number == 0) && start_block <= safe_block,
last_l1_block_number, safe_block},
{:ok, last_l1_tx} <- Helper.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments),
{:l1_tx_not_found, false} <- {:l1_tx_not_found, !is_nil(last_l1_transaction_hash) && is_nil(last_l1_tx)} do
Process.send(self(), :continue, [])
{:noreply,
%{
block_check_interval: block_check_interval,
bridge_contract: env[:bridge_contract],
json_rpc_named_arguments: json_rpc_named_arguments,
end_block: safe_block,
start_block: max(start_block, last_l1_block_number)
}}
else
{:start_block_undefined, true} ->
# the process shouldn't start if the start block is not defined
{:stop, :normal, %{}}
{:reorg_monitor_started, false} ->
Logger.error("Cannot start this process as Indexer.Fetcher.RollupL1ReorgMonitor is not started.")
{:stop, :normal, %{}}
{:rpc_undefined, true} ->
Logger.error("L1 RPC URL is not defined.")
{:stop, :normal, %{}}
{:bridge_contract_address_is_valid, false} ->
Logger.error("PolygonZkEVMBridge contract address is invalid or not defined.")
{:stop, :normal, %{}}
{:start_block_valid, false, last_l1_block_number, safe_block} ->
Logger.error("Invalid L1 Start Block value. Please, check the value and polygon_zkevm_bridge table.")
Logger.error("last_l1_block_number = #{inspect(last_l1_block_number)}")
Logger.error("safe_block = #{inspect(safe_block)}")
{:stop, :normal, %{}}
{:error, error_data} ->
Logger.error(
"Cannot get last L1 transaction from RPC by its hash, latest block, or block timestamp by its number due to RPC error: #{inspect(error_data)}"
)
{:stop, :normal, %{}}
{:l1_tx_not_found, true} ->
Logger.error(
"Cannot find last L1 transaction from RPC by its hash. Probably, there was a reorg on L1 chain. Please, check polygon_zkevm_bridge table."
)
{:stop, :normal, %{}}
_ ->
Logger.error("L1 Start Block is invalid or zero.")
{:stop, :normal, %{}}
end
end
@impl GenServer
def handle_info(
:continue,
%{
bridge_contract: bridge_contract,
block_check_interval: block_check_interval,
start_block: start_block,
end_block: end_block,
json_rpc_named_arguments: json_rpc_named_arguments
} = state
) do
time_before = Timex.now()
last_written_block =
start_block..end_block
|> Enum.chunk_every(@eth_get_logs_range_size)
|> Enum.reduce_while(start_block - 1, fn current_chunk, _ ->
chunk_start = List.first(current_chunk)
chunk_end = List.last(current_chunk)
if chunk_start <= chunk_end do
Helper.log_blocks_chunk_handling(chunk_start, chunk_end, start_block, end_block, nil, "L1")
operations =
{chunk_start, chunk_end}
|> get_logs_all(bridge_contract, json_rpc_named_arguments)
|> prepare_operations(json_rpc_named_arguments, json_rpc_named_arguments)
import_operations(operations)
Helper.log_blocks_chunk_handling(
chunk_start,
chunk_end,
start_block,
end_block,
"#{Enum.count(operations)} L1 operation(s)",
"L1"
)
end
reorg_block = RollupL1ReorgMonitor.reorg_block_pop(__MODULE__)
if !is_nil(reorg_block) && reorg_block > 0 do
reorg_handle(reorg_block)
{:halt, if(reorg_block <= chunk_end, do: reorg_block - 1, else: chunk_end)}
else
{:cont, chunk_end}
end
end)
new_start_block = last_written_block + 1
{:ok, new_end_block} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
delay =
if new_end_block == last_written_block do
# there is no new block, so wait for some time to let the chain issue the new block
max(block_check_interval - Timex.diff(Timex.now(), time_before, :milliseconds), 0)
else
0
end
Process.send_after(self(), :continue, delay)
{:noreply, %{state | start_block: new_start_block, end_block: new_end_block}}
end
@impl GenServer
def handle_info({ref, _result}, state) do
Process.demonitor(ref, [:flush])
{:noreply, state}
end
defp reorg_handle(reorg_block) do
{deleted_count, _} =
Repo.delete_all(from(b in Bridge, where: b.type == :deposit and b.block_number >= ^reorg_block))
if deleted_count > 0 do
Logger.warning(
"As L1 reorg was detected, some deposits with block_number >= #{reorg_block} were removed from polygon_zkevm_bridge table. Number of removed rows: #{deleted_count}."
)
end
end
end

@ -0,0 +1,78 @@
defmodule Indexer.Fetcher.PolygonZkevm.BridgeL1Tokens do
@moduledoc """
Fetches information about L1 tokens for zkEVM bridge.
"""
use Indexer.Fetcher, restart: :permanent
use Spandex.Decorators
import Ecto.Query
alias Explorer.Repo
alias Indexer.{BufferedTask, Helper}
alias Indexer.Fetcher.PolygonZkevm.{Bridge, BridgeL1}
@behaviour BufferedTask
@default_max_batch_size 1
@default_max_concurrency 10
@doc false
def child_spec([init_options, gen_server_options]) do
rpc = Application.get_all_env(:indexer)[BridgeL1][:rpc]
json_rpc_named_arguments = Helper.json_rpc_named_arguments(rpc)
merged_init_opts =
defaults()
|> Keyword.merge(init_options)
|> Keyword.merge(state: json_rpc_named_arguments)
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(_, _, _) do
{0, []}
end
@impl BufferedTask
def run(l1_token_addresses, json_rpc_named_arguments) when is_list(l1_token_addresses) do
l1_token_addresses
|> Bridge.token_addresses_to_ids(json_rpc_named_arguments)
|> Enum.each(fn {l1_token_address, l1_token_id} ->
Repo.update_all(
from(b in Explorer.Chain.PolygonZkevm.Bridge, where: b.l1_token_address == ^l1_token_address),
set: [l1_token_id: l1_token_id, l1_token_address: nil]
)
end)
end
@doc """
Fetches L1 token data asynchronously.
"""
def async_fetch(data) do
async_fetch(data, Application.get_env(:indexer, __MODULE__.Supervisor)[:enabled])
end
def async_fetch(_data, false), do: :ok
def async_fetch(operations, _enabled) do
l1_token_addresses =
operations
|> Enum.reject(fn operation -> is_nil(operation.l1_token_address) end)
|> Enum.map(fn operation -> operation.l1_token_address end)
|> Enum.uniq()
BufferedTask.buffer(__MODULE__, l1_token_addresses)
end
defp defaults do
[
flush_interval: 100,
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
poll: false,
task_supervisor: __MODULE__.TaskSupervisor
]
end
end

@ -0,0 +1,176 @@
defmodule Indexer.Fetcher.PolygonZkevm.BridgeL2 do
@moduledoc """
Fills polygon_zkevm_bridge DB table.
"""
use GenServer
use Indexer.Fetcher
require Logger
import Ecto.Query
import Explorer.Helper, only: [parse_integer: 1]
import Indexer.Fetcher.PolygonZkevm.Bridge,
only: [get_logs_all: 3, import_operations: 1, prepare_operations: 3]
alias Explorer.Chain.PolygonZkevm.{Bridge, Reader}
alias Explorer.Repo
alias Indexer.Helper
@eth_get_logs_range_size 1000
@fetcher_name :polygon_zkevm_bridge_l2
def child_spec(start_link_arguments) do
spec = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
restart: :transient,
type: :worker
}
Supervisor.child_spec(spec, [])
end
def start_link(args, gen_server_options \\ []) do
GenServer.start_link(__MODULE__, args, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl GenServer
def init(args) do
json_rpc_named_arguments = args[:json_rpc_named_arguments]
{:ok, %{}, {:continue, json_rpc_named_arguments}}
end
@impl GenServer
def handle_continue(json_rpc_named_arguments, _state) do
Logger.metadata(fetcher: @fetcher_name)
# two seconds pause needed to avoid exceeding Supervisor restart intensity when DB issues
Process.send_after(self(), :init_with_delay, 2000)
{:noreply, %{json_rpc_named_arguments: json_rpc_named_arguments}}
end
@impl GenServer
def handle_info(:init_with_delay, %{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
env = Application.get_all_env(:indexer)[__MODULE__]
with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])},
rpc_l1 = Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonZkevm.BridgeL1][:rpc],
{:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(rpc_l1)},
{:bridge_contract_address_is_valid, true} <-
{:bridge_contract_address_is_valid, Helper.address_correct?(env[:bridge_contract])},
start_block = parse_integer(env[:start_block]),
false <- is_nil(start_block),
true <- start_block > 0,
{last_l2_block_number, last_l2_transaction_hash} = Reader.last_l2_item(),
{:ok, latest_block} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000),
{:start_block_valid, true} <-
{:start_block_valid,
(start_block <= last_l2_block_number || last_l2_block_number == 0) && start_block <= latest_block},
{:ok, last_l2_tx} <- Helper.get_transaction_by_hash(last_l2_transaction_hash, json_rpc_named_arguments),
{:l2_tx_not_found, false} <- {:l2_tx_not_found, !is_nil(last_l2_transaction_hash) && is_nil(last_l2_tx)} do
Process.send(self(), :continue, [])
{:noreply,
%{
bridge_contract: env[:bridge_contract],
json_rpc_named_arguments: json_rpc_named_arguments,
json_rpc_named_arguments_l1: Helper.json_rpc_named_arguments(rpc_l1),
end_block: latest_block,
start_block: max(start_block, last_l2_block_number)
}}
else
{:start_block_undefined, true} ->
# the process shouldn't start if the start block is not defined
{:stop, :normal, state}
{:rpc_l1_undefined, true} ->
Logger.error("L1 RPC URL is not defined.")
{:stop, :normal, state}
{:bridge_contract_address_is_valid, false} ->
Logger.error("PolygonZkEVMBridge contract address is invalid or not defined.")
{:stop, :normal, state}
{:start_block_valid, false} ->
Logger.error("Invalid L2 Start Block value. Please, check the value and polygon_zkevm_bridge table.")
{:stop, :normal, state}
{:error, error_data} ->
Logger.error(
"Cannot get last L2 transaction from RPC by its hash or latest block due to RPC error: #{inspect(error_data)}"
)
{:stop, :normal, state}
{:l2_tx_not_found, true} ->
Logger.error(
"Cannot find last L2 transaction from RPC by its hash. Probably, there was a reorg on L2 chain. Please, check polygon_zkevm_bridge table."
)
{:stop, :normal, state}
_ ->
Logger.error("L2 Start Block is invalid or zero.")
{:stop, :normal, state}
end
end
@impl GenServer
def handle_info(
:continue,
%{
bridge_contract: bridge_contract,
start_block: start_block,
end_block: end_block,
json_rpc_named_arguments: json_rpc_named_arguments,
json_rpc_named_arguments_l1: json_rpc_named_arguments_l1
} = state
) do
start_block..end_block
|> Enum.chunk_every(@eth_get_logs_range_size)
|> Enum.each(fn current_chunk ->
chunk_start = List.first(current_chunk)
chunk_end = List.last(current_chunk)
if chunk_start <= chunk_end do
Helper.log_blocks_chunk_handling(chunk_start, chunk_end, start_block, end_block, nil, "L2")
operations =
{chunk_start, chunk_end}
|> get_logs_all(bridge_contract, json_rpc_named_arguments)
|> prepare_operations(json_rpc_named_arguments, json_rpc_named_arguments_l1)
import_operations(operations)
Helper.log_blocks_chunk_handling(
chunk_start,
chunk_end,
start_block,
end_block,
"#{Enum.count(operations)} L2 operation(s)",
"L2"
)
end
end)
{:stop, :normal, state}
end
@impl GenServer
def handle_info({ref, _result}, state) do
Process.demonitor(ref, [:flush])
{:noreply, state}
end
def reorg_handle(reorg_block) do
{deleted_count, _} =
Repo.delete_all(from(b in Bridge, where: b.type == :withdrawal and b.block_number >= ^reorg_block))
if deleted_count > 0 do
Logger.warning(
"As L2 reorg was detected, some withdrawals with block_number >= #{reorg_block} were removed from polygon_zkevm_bridge table. Number of removed rows: #{deleted_count}."
)
end
end
end

@ -1,6 +1,6 @@
defmodule Indexer.Fetcher.Zkevm.TransactionBatch do
defmodule Indexer.Fetcher.PolygonZkevm.TransactionBatch do
@moduledoc """
Fills zkevm_transaction_batches DB table.
Fills polygon_zkevm_transaction_batches DB table.
"""
use GenServer
@ -12,7 +12,7 @@ defmodule Indexer.Fetcher.Zkevm.TransactionBatch do
alias Explorer.Chain
alias Explorer.Chain.Events.Publisher
alias Explorer.Chain.Zkevm.Reader
alias Explorer.Chain.PolygonZkevm.Reader
alias Indexer.Helper
@zero_hash "0000000000000000000000000000000000000000000000000000000000000000"
@ -34,9 +34,9 @@ defmodule Indexer.Fetcher.Zkevm.TransactionBatch do
@impl GenServer
def init(args) do
Logger.metadata(fetcher: :zkevm_transaction_batches)
Logger.metadata(fetcher: :polygon_zkevm_transaction_batches)
config = Application.get_all_env(:indexer)[Indexer.Fetcher.Zkevm.TransactionBatch]
config = Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonZkevm.TransactionBatch]
chunk_size = config[:chunk_size]
recheck_interval = config[:recheck_interval]
@ -249,20 +249,13 @@ defmodule Indexer.Fetcher.Zkevm.TransactionBatch do
{[batch | batches], l2_txs ++ l2_txs_append, l1_txs, next_id, hash_to_id}
end)
# here we explicitly check CHAIN_TYPE as Dialyzer throws an error otherwise
import_options =
if System.get_env("CHAIN_TYPE") == "polygon_zkevm" do
%{
zkevm_lifecycle_transactions: %{params: l1_txs_to_import},
zkevm_transaction_batches: %{params: batches_to_import},
zkevm_batch_transactions: %{params: l2_txs_to_import},
timeout: :infinity
}
else
%{}
end
{:ok, _} = Chain.import(import_options)
{:ok, _} =
Chain.import(%{
polygon_zkevm_lifecycle_transactions: %{params: l1_txs_to_import},
polygon_zkevm_transaction_batches: %{params: batches_to_import},
polygon_zkevm_batch_transactions: %{params: l2_txs_to_import},
timeout: :infinity
})
confirmed_batches =
Enum.filter(batches_to_import, fn batch -> not is_nil(batch.sequence_id) and batch.sequence_id > 0 end)

@ -0,0 +1,153 @@
defmodule Indexer.Fetcher.RollupL1ReorgMonitor do
@moduledoc """
A module to catch L1 reorgs and notify a rollup module about it.
"""
use GenServer
use Indexer.Fetcher
require Logger
alias Indexer.{BoundQueue, Helper}
@fetcher_name :rollup_l1_reorg_monitor
def child_spec(start_link_arguments) do
spec = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
restart: :transient,
type: :worker
}
Supervisor.child_spec(spec, [])
end
def start_link(args, gen_server_options \\ []) do
GenServer.start_link(__MODULE__, args, Keyword.put_new(gen_server_options, :name, __MODULE__))
end
@impl GenServer
def init(_args) do
Logger.metadata(fetcher: @fetcher_name)
modules_can_use_reorg_monitor = [
Indexer.Fetcher.PolygonEdge.Deposit,
Indexer.Fetcher.PolygonEdge.WithdrawalExit,
Indexer.Fetcher.PolygonZkevm.BridgeL1,
Indexer.Fetcher.Shibarium.L1
]
modules_using_reorg_monitor =
modules_can_use_reorg_monitor
|> Enum.reject(fn module ->
module_config = Application.get_all_env(:indexer)[module]
is_nil(module_config[:start_block]) and is_nil(module_config[:start_block_l1])
end)
if Enum.empty?(modules_using_reorg_monitor) do
# don't start reorg monitor as there is no module which would use it
:ignore
else
# As there cannot be different modules for different rollups at the same time,
# it's correct to only get the first item of the list.
# For example, Indexer.Fetcher.PolygonEdge.Deposit and Indexer.Fetcher.PolygonEdge.WithdrawalExit can be in the list
# because they are for the same rollup, but Indexer.Fetcher.Shibarium.L1 and Indexer.Fetcher.PolygonZkevm.BridgeL1 cannot (as they are for different rollups).
module_using_reorg_monitor = Enum.at(modules_using_reorg_monitor, 0)
l1_rpc =
if Enum.member?(
[Indexer.Fetcher.PolygonEdge.Deposit, Indexer.Fetcher.PolygonEdge.WithdrawalExit],
module_using_reorg_monitor
) do
# there can be more than one PolygonEdge.* modules, so we get the common L1 RPC URL for them from Indexer.Fetcher.PolygonEdge
Application.get_all_env(:indexer)[Indexer.Fetcher.PolygonEdge][:polygon_edge_l1_rpc]
else
Application.get_all_env(:indexer)[module_using_reorg_monitor][:rpc]
end
json_rpc_named_arguments = Helper.json_rpc_named_arguments(l1_rpc)
{:ok, block_check_interval, _} = Helper.get_block_check_interval(json_rpc_named_arguments)
Process.send(self(), :reorg_monitor, [])
{:ok,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
modules: modules_using_reorg_monitor,
prev_latest: 0
}}
end
end
@impl GenServer
def handle_info(
:reorg_monitor,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
modules: modules,
prev_latest: prev_latest
} = state
) do
{:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
if latest < prev_latest do
Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.")
Enum.each(modules, &reorg_block_push(latest, &1))
end
Process.send_after(self(), :reorg_monitor, block_check_interval)
{:noreply, %{state | prev_latest: latest}}
end
@doc """
Pops the number of reorg block from the front of the queue for the specified rollup module.
Returns `nil` if the reorg queue is empty.
"""
@spec reorg_block_pop(module()) :: non_neg_integer() | nil
def reorg_block_pop(module) do
table_name = reorg_table_name(module)
case BoundQueue.pop_front(reorg_queue_get(table_name)) do
{:ok, {block_number, updated_queue}} ->
:ets.insert(table_name, {:queue, updated_queue})
block_number
{:error, :empty} ->
nil
end
end
defp reorg_block_push(block_number, module) do
table_name = reorg_table_name(module)
{:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number)
:ets.insert(table_name, {:queue, updated_queue})
end
defp reorg_queue_get(table_name) do
if :ets.whereis(table_name) == :undefined do
:ets.new(table_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
with info when info != :undefined <- :ets.info(table_name),
[{_, value}] <- :ets.lookup(table_name, :queue) do
value
else
_ -> %BoundQueue{}
end
end
defp reorg_table_name(module) do
:"#{module}#{:_reorgs}"
end
end

@ -23,11 +23,10 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
import Indexer.Fetcher.Shibarium.Helper,
only: [calc_operation_hash: 5, prepare_insert_items: 2, recalculate_cached_count: 0]
alias EthereumJSONRPC.Block.ByNumber
alias EthereumJSONRPC.Blocks
alias Explorer.Chain.Shibarium.Bridge
alias Explorer.{Chain, Repo}
alias Indexer.{BoundQueue, Helper}
alias Indexer.Fetcher.RollupL1ReorgMonitor
alias Indexer.Helper
@block_check_interval_range_size 100
@eth_get_logs_range_size 1000
@ -111,6 +110,7 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
env = Application.get_all_env(:indexer)[__MODULE__]
with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])},
{:reorg_monitor_started, true} <- {:reorg_monitor_started, !is_nil(Process.whereis(RollupL1ReorgMonitor))},
rpc = env[:rpc],
{:rpc_undefined, false} <- {:rpc_undefined, is_nil(rpc)},
{:deposit_manager_address_is_valid, true} <-
@ -140,7 +140,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
{:start_block_valid, true} <- {:start_block_valid, start_block <= latest_block} do
recalculate_cached_count()
Process.send(self(), :reorg_monitor, [])
Process.send(self(), :continue, [])
{:noreply,
@ -154,14 +153,17 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
block_check_interval: block_check_interval,
start_block: max(start_block, last_l1_block_number),
end_block: latest_block,
json_rpc_named_arguments: json_rpc_named_arguments,
reorg_monitor_prev_latest: 0
json_rpc_named_arguments: json_rpc_named_arguments
}}
else
{:start_block_undefined, true} ->
# the process shouldn't start if the start block is not defined
{:stop, :normal, %{}}
{:reorg_monitor_started, false} ->
Logger.error("Cannot start this process as Indexer.Fetcher.RollupL1ReorgMonitor is not started.")
{:stop, :normal, %{}}
{:rpc_undefined, true} ->
Logger.error("L1 RPC URL is not defined.")
{:stop, :normal, %{}}
@ -214,27 +216,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
end
end
@impl GenServer
def handle_info(
:reorg_monitor,
%{
block_check_interval: block_check_interval,
json_rpc_named_arguments: json_rpc_named_arguments,
reorg_monitor_prev_latest: prev_latest
} = state
) do
{:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
if latest < prev_latest do
Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.")
reorg_block_push(latest)
end
Process.send_after(self(), :reorg_monitor, block_check_interval)
{:noreply, %{state | reorg_monitor_prev_latest: latest}}
end
@impl GenServer
def handle_info(
:continue,
@ -292,7 +273,7 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
)
end
reorg_block = reorg_block_pop()
reorg_block = RollupL1ReorgMonitor.reorg_block_pop(__MODULE__)
if !is_nil(reorg_block) && reorg_block > 0 do
reorg_handle(reorg_block)
@ -348,25 +329,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
end
end
defp get_blocks_by_events(events, json_rpc_named_arguments, retries) do
request =
events
|> Enum.reduce(%{}, fn event, acc ->
Map.put(acc, event["blockNumber"], 0)
end)
|> Stream.map(fn {block_number, _} -> %{number: block_number} end)
|> Stream.with_index()
|> Enum.into(%{}, fn {params, id} -> {id, params} end)
|> Blocks.requests(&ByNumber.request(&1, false, false))
error_message = &"Cannot fetch blocks with batch request. Error: #{inspect(&1)}. Request: #{inspect(request)}"
case Helper.repeated_call(&json_rpc/2, [request, json_rpc_named_arguments], error_message, retries) do
{:ok, results} -> Enum.map(results, fn %{result: result} -> result end)
{:error, _} -> []
end
end
defp get_last_l1_item do
query =
from(sb in Bridge,
@ -581,7 +543,7 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
timestamps =
events
|> filter_deposit_events()
|> get_blocks_by_events(json_rpc_named_arguments, 100_000_000)
|> Helper.get_blocks_by_events(json_rpc_named_arguments, 100_000_000)
|> Enum.reduce(%{}, fn block, acc ->
block_number = quantity_to_integer(Map.get(block, "number"))
{:ok, timestamp} = DateTime.from_unix(quantity_to_integer(Map.get(block, "timestamp")))
@ -647,25 +609,6 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
"0x#{truncated_hash}"
end
defp reorg_block_pop do
table_name = reorg_table_name(@fetcher_name)
case BoundQueue.pop_front(reorg_queue_get(table_name)) do
{:ok, {block_number, updated_queue}} ->
:ets.insert(table_name, {:queue, updated_queue})
block_number
{:error, :empty} ->
nil
end
end
defp reorg_block_push(block_number) do
table_name = reorg_table_name(@fetcher_name)
{:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number)
:ets.insert(table_name, {:queue, updated_queue})
end
defp reorg_handle(reorg_block) do
{deleted_count, _} =
Repo.delete_all(from(sb in Bridge, where: sb.l1_block_number >= ^reorg_block and is_nil(sb.l2_transaction_hash)))
@ -696,27 +639,4 @@ defmodule Indexer.Fetcher.Shibarium.L1 do
)
end
end
defp reorg_queue_get(table_name) do
if :ets.whereis(table_name) == :undefined do
:ets.new(table_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
with info when info != :undefined <- :ets.info(table_name),
[{_, value}] <- :ets.lookup(table_name, :queue) do
value
else
_ -> %BoundQueue{}
end
end
defp reorg_table_name(fetcher_name) do
:"#{fetcher_name}#{:_reorgs}"
end
end

@ -14,8 +14,31 @@ defmodule Indexer.Helper do
]
alias EthereumJSONRPC.Block.ByNumber
alias EthereumJSONRPC.Blocks
alias Explorer.Chain.Hash
@block_check_interval_range_size 100
@block_by_number_chunk_size 50
@doc """
Checks whether the given Ethereum address looks correct.
The address should begin with 0x prefix and then contain 40 hexadecimal digits (can be in mixed case).
This function doesn't check if the address is checksummed.
"""
@spec address_correct?(binary()) :: boolean()
def address_correct?(address) when is_binary(address) do
String.match?(address, ~r/^0x[[:xdigit:]]{40}$/i)
end
def address_correct?(_address) do
false
end
@doc """
Converts Explorer.Chain.Hash representation of the given address to a string
beginning with 0x prefix. If the given address is already a string, it is not modified.
The second argument forces the result to be downcased.
"""
@spec address_hash_to_string(binary(), boolean()) :: binary()
def address_hash_to_string(hash, downcase \\ false)
@ -35,13 +58,43 @@ defmodule Indexer.Helper do
end
end
@spec address_correct?(binary()) :: boolean()
def address_correct?(address) when is_binary(address) do
String.match?(address, ~r/^0x[[:xdigit:]]{40}$/i)
@doc """
Calculates average block time in milliseconds (based on the latest 100 blocks) divided by 2.
Sends corresponding requests to the RPC node.
Returns a tuple {:ok, block_check_interval, last_safe_block}
where `last_safe_block` is the number of the recent `safe` or `latest` block (depending on which one is available).
Returns {:error, description} in case of error.
"""
@spec get_block_check_interval(list()) :: {:ok, non_neg_integer(), non_neg_integer()} | {:error, any()}
def get_block_check_interval(json_rpc_named_arguments) do
{last_safe_block, _} = get_safe_block(json_rpc_named_arguments)
first_block = max(last_safe_block - @block_check_interval_range_size, 1)
with {:ok, first_block_timestamp} <-
get_block_timestamp_by_number(first_block, json_rpc_named_arguments, 100_000_000),
{:ok, last_safe_block_timestamp} <-
get_block_timestamp_by_number(last_safe_block, json_rpc_named_arguments, 100_000_000) do
block_check_interval =
ceil((last_safe_block_timestamp - first_block_timestamp) / (last_safe_block - first_block) * 1000 / 2)
Logger.info("Block check interval is calculated as #{block_check_interval} ms.")
{:ok, block_check_interval, last_safe_block}
else
{:error, error} ->
{:error, "Failed to calculate block check interval due to #{inspect(error)}"}
end
end
def address_correct?(_address) do
false
defp get_safe_block(json_rpc_named_arguments) do
case get_block_number_by_tag("safe", json_rpc_named_arguments) do
{:ok, safe_block} ->
{safe_block, false}
{:error, :not_found} ->
{:ok, latest_block} = get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000)
{latest_block, true}
end
end
@doc """
@ -76,6 +129,25 @@ defmodule Indexer.Helper do
repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries)
end
@doc """
Forms JSON RPC named arguments for the given RPC URL.
"""
@spec json_rpc_named_arguments(binary()) :: list()
def json_rpc_named_arguments(rpc_url) do
[
transport: EthereumJSONRPC.HTTP,
transport_options: [
http: EthereumJSONRPC.HTTP.HTTPoison,
url: rpc_url,
http_options: [
recv_timeout: :timer.minutes(10),
timeout: :timer.minutes(10),
hackney: [pool: :ethereum_jsonrpc]
]
]
]
end
@doc """
Prints a log of progress when handling something splitted to block chunks.
"""
@ -151,6 +223,43 @@ defmodule Indexer.Helper do
end
end
@doc """
Fetches blocks info from the given list of events (logs).
Performs a specified number of retries (up to) if the first attempt returns error.
"""
@spec get_blocks_by_events(list(), list(), non_neg_integer()) :: list()
def get_blocks_by_events(events, json_rpc_named_arguments, retries) do
events
|> Enum.reduce(%{}, fn event, acc ->
block_number =
if is_map(event) do
event.block_number
else
event["blockNumber"]
end
Map.put(acc, block_number, 0)
end)
|> Stream.map(fn {block_number, _} -> %{number: block_number} end)
|> Stream.with_index()
|> Enum.into(%{}, fn {params, id} -> {id, params} end)
|> Blocks.requests(&ByNumber.request(&1, false, false))
|> Enum.chunk_every(@block_by_number_chunk_size)
|> Enum.reduce([], fn current_requests, results_acc ->
error_message =
&"Cannot fetch blocks with batch request. Error: #{inspect(&1)}. Request: #{inspect(current_requests)}"
# credo:disable-for-lines:3 Credo.Check.Refactor.Nesting
results =
case repeated_call(&json_rpc/2, [current_requests, json_rpc_named_arguments], error_message, retries) do
{:ok, results} -> Enum.map(results, fn %{result: result} -> result end)
{:error, _} -> []
end
results_acc ++ results
end)
end
@doc """
Fetches block timestamp by its number using RPC request.
Performs a specified number of retries (up to) if the first attempt returns error.

@ -32,7 +32,6 @@ defmodule Indexer.Supervisor do
InternalTransaction,
PendingBlockOperationsSanitizer,
PendingTransaction,
PolygonEdge,
ReplacedTransaction,
RootstockData,
Token,
@ -44,8 +43,6 @@ defmodule Indexer.Supervisor do
Withdrawal
}
alias Indexer.Fetcher.Zkevm.TransactionBatch
alias Indexer.Temporary.{
BlocksTransactionsMismatch,
UncatalogedTokenTransfers,
@ -134,7 +131,7 @@ defmodule Indexer.Supervisor do
{TokenUpdater.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{ReplacedTransaction.Supervisor, [[memory_monitor: memory_monitor]]},
configure(PolygonEdge.Supervisor, [[memory_monitor: memory_monitor]]),
{Indexer.Fetcher.RollupL1ReorgMonitor.Supervisor, [[memory_monitor: memory_monitor]]},
configure(Indexer.Fetcher.PolygonEdge.Deposit.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.PolygonEdge.DepositExecute.Supervisor, [
[memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments]
@ -147,7 +144,12 @@ defmodule Indexer.Supervisor do
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]
]),
configure(Indexer.Fetcher.Shibarium.L1.Supervisor, [[memory_monitor: memory_monitor]]),
configure(TransactionBatch.Supervisor, [
configure(Indexer.Fetcher.PolygonZkevm.BridgeL1.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.PolygonZkevm.BridgeL1Tokens.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.PolygonZkevm.BridgeL2.Supervisor, [
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]
]),
configure(Indexer.Fetcher.PolygonZkevm.TransactionBatch.Supervisor, [
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]
]),
{Indexer.Fetcher.Beacon.Blob.Supervisor, [[memory_monitor: memory_monitor]]},

@ -148,6 +148,11 @@ defmodule Indexer.Transform.Addresses do
%{from: :block_number, to: :fetched_coin_balance_block_number},
%{from: :address_hash, to: :hash}
]
],
polygon_zkevm_bridge_operations: [
[
%{from: :l2_token_address, to: :hash}
]
]
}
@ -455,6 +460,11 @@ defmodule Indexer.Transform.Addresses do
required(:address_hash) => String.t(),
required(:block_number) => non_neg_integer()
}
],
optional(:polygon_zkevm_bridge_operations) => [
%{
optional(:l2_token_address) => String.t()
}
]
}) :: [params]
def extract_addresses(fetched_data, options \\ []) when is_map(fetched_data) and is_list(options) do

@ -0,0 +1,77 @@
defmodule Indexer.Transform.PolygonZkevm.Bridge do
@moduledoc """
Helper functions for transforming data for Polygon zkEVM Bridge operations.
"""
require Logger
import Indexer.Fetcher.PolygonZkevm.Bridge,
only: [filter_bridge_events: 2, prepare_operations: 4]
alias Indexer.Fetcher.PolygonZkevm.{BridgeL1, BridgeL2}
alias Indexer.Helper
@doc """
Returns a list of operations given a list of blocks and logs.
"""
@spec parse(list(), list()) :: list()
def parse(blocks, logs) do
prev_metadata = Logger.metadata()
Logger.metadata(fetcher: :polygon_zkevm_bridge_l2_realtime)
items =
with false <- is_nil(Application.get_env(:indexer, BridgeL2)[:start_block]),
false <- System.get_env("CHAIN_TYPE") != "polygon_zkevm",
rpc_l1 = Application.get_all_env(:indexer)[BridgeL1][:rpc],
{:rpc_l1_undefined, false} <- {:rpc_l1_undefined, is_nil(rpc_l1)},
bridge_contract = Application.get_env(:indexer, BridgeL2)[:bridge_contract],
{:bridge_contract_address_is_valid, true} <-
{:bridge_contract_address_is_valid, Helper.address_correct?(bridge_contract)} do
bridge_contract = String.downcase(bridge_contract)
block_numbers = Enum.map(blocks, fn block -> block.number end)
start_block = Enum.min(block_numbers)
end_block = Enum.max(block_numbers)
Helper.log_blocks_chunk_handling(start_block, end_block, start_block, end_block, nil, "L2")
json_rpc_named_arguments_l1 = Helper.json_rpc_named_arguments(rpc_l1)
block_to_timestamp = Enum.reduce(blocks, %{}, fn block, acc -> Map.put(acc, block.number, block.timestamp) end)
items =
logs
|> filter_bridge_events(bridge_contract)
|> prepare_operations(nil, json_rpc_named_arguments_l1, block_to_timestamp)
Helper.log_blocks_chunk_handling(
start_block,
end_block,
start_block,
end_block,
"#{Enum.count(items)} L2 operation(s)",
"L2"
)
items
else
true ->
[]
{:rpc_l1_undefined, true} ->
Logger.error("L1 RPC URL is not defined. Cannot use #{__MODULE__} for parsing logs.")
[]
{:bridge_contract_address_is_valid, false} ->
Logger.error(
"PolygonZkEVMBridge contract address is invalid or not defined. Cannot use #{__MODULE__} for parsing logs."
)
[]
end
Logger.reset_metadata(prev_metadata)
items
end
end

@ -649,8 +649,6 @@ config :indexer, Indexer.Fetcher.Withdrawal.Supervisor,
config :indexer, Indexer.Fetcher.Withdrawal, first_block: System.get_env("WITHDRAWALS_FIRST_BLOCK")
config :indexer, Indexer.Fetcher.PolygonEdge.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_edge"
config :indexer, Indexer.Fetcher.PolygonEdge.Deposit.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_edge"
config :indexer, Indexer.Fetcher.PolygonEdge.DepositExecute.Supervisor,
@ -682,14 +680,6 @@ config :indexer, Indexer.Fetcher.PolygonEdge.WithdrawalExit,
start_block_l1: System.get_env("INDEXER_POLYGON_EDGE_L1_WITHDRAWALS_START_BLOCK"),
exit_helper: System.get_env("INDEXER_POLYGON_EDGE_L1_EXIT_HELPER_CONTRACT")
config :indexer, Indexer.Fetcher.Zkevm.TransactionBatch,
chunk_size: ConfigHelper.parse_integer_env_var("INDEXER_ZKEVM_BATCHES_CHUNK_SIZE", 20),
recheck_interval: ConfigHelper.parse_integer_env_var("INDEXER_ZKEVM_BATCHES_RECHECK_INTERVAL", 60)
config :indexer, Indexer.Fetcher.Zkevm.TransactionBatch.Supervisor,
enabled:
ConfigHelper.chain_type() == "polygon_zkevm" && ConfigHelper.parse_bool_env_var("INDEXER_ZKEVM_BATCHES_ENABLED")
config :indexer, Indexer.Fetcher.RootstockData.Supervisor,
disabled?:
ConfigHelper.chain_type() != "rsk" || ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_ROOTSTOCK_DATA_FETCHER")
@ -735,6 +725,33 @@ config :indexer, Indexer.Fetcher.Shibarium.L1.Supervisor, enabled: ConfigHelper.
config :indexer, Indexer.Fetcher.Shibarium.L2.Supervisor, enabled: ConfigHelper.chain_type() == "shibarium"
config :indexer, Indexer.Fetcher.PolygonZkevm.BridgeL1,
rpc: System.get_env("INDEXER_POLYGON_ZKEVM_L1_RPC"),
start_block: System.get_env("INDEXER_POLYGON_ZKEVM_L1_BRIDGE_START_BLOCK"),
bridge_contract: System.get_env("INDEXER_POLYGON_ZKEVM_L1_BRIDGE_CONTRACT"),
native_symbol: System.get_env("INDEXER_POLYGON_ZKEVM_L1_BRIDGE_NATIVE_SYMBOL", "ETH"),
native_decimals: ConfigHelper.parse_integer_env_var("INDEXER_POLYGON_ZKEVM_L1_BRIDGE_NATIVE_DECIMALS", 18)
config :indexer, Indexer.Fetcher.PolygonZkevm.BridgeL1.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_zkevm"
config :indexer, Indexer.Fetcher.PolygonZkevm.BridgeL1Tokens.Supervisor,
enabled: ConfigHelper.chain_type() == "polygon_zkevm"
config :indexer, Indexer.Fetcher.PolygonZkevm.BridgeL2,
start_block: System.get_env("INDEXER_POLYGON_ZKEVM_L2_BRIDGE_START_BLOCK"),
bridge_contract: System.get_env("INDEXER_POLYGON_ZKEVM_L2_BRIDGE_CONTRACT")
config :indexer, Indexer.Fetcher.PolygonZkevm.BridgeL2.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_zkevm"
config :indexer, Indexer.Fetcher.PolygonZkevm.TransactionBatch,
chunk_size: ConfigHelper.parse_integer_env_var("INDEXER_POLYGON_ZKEVM_BATCHES_CHUNK_SIZE", 20),
recheck_interval: ConfigHelper.parse_integer_env_var("INDEXER_POLYGON_ZKEVM_BATCHES_RECHECK_INTERVAL", 60)
config :indexer, Indexer.Fetcher.PolygonZkevm.TransactionBatch.Supervisor,
enabled:
ConfigHelper.chain_type() == "polygon_zkevm" &&
ConfigHelper.parse_bool_env_var("INDEXER_POLYGON_ZKEVM_BATCHES_ENABLED")
Code.require_file("#{config_env()}.exs", "config/runtime")
for config <- "../apps/*/config/runtime/#{config_env()}.exs" |> Path.expand(__DIR__) |> Path.wildcard() do

@ -74,8 +74,8 @@ config :explorer, Explorer.Repo.Account,
pool_size: ConfigHelper.parse_integer_env_var("ACCOUNT_POOL_SIZE", 10),
queue_target: queue_target
# Configure PolygonEdge database
config :explorer, Explorer.Repo.PolygonEdge,
# Configure Beacon Chain database
config :explorer, Explorer.Repo.Beacon,
database: database,
hostname: hostname,
url: System.get_env("DATABASE_URL"),
@ -83,8 +83,8 @@ config :explorer, Explorer.Repo.PolygonEdge,
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1
# Configure PolygonZkevm database
config :explorer, Explorer.Repo.PolygonZkevm,
# Configures BridgedTokens database
config :explorer, Explorer.Repo.BridgedTokens,
database: database,
hostname: hostname,
url: System.get_env("DATABASE_URL"),
@ -92,8 +92,8 @@ config :explorer, Explorer.Repo.PolygonZkevm,
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1
# Configure Rootstock database
config :explorer, Explorer.Repo.RSK,
# Configure PolygonEdge database
config :explorer, Explorer.Repo.PolygonEdge,
database: database,
hostname: hostname,
url: System.get_env("DATABASE_URL"),
@ -101,8 +101,8 @@ config :explorer, Explorer.Repo.RSK,
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1
# Configure Beacon Chain database
config :explorer, Explorer.Repo.Beacon,
# Configure PolygonZkevm database
config :explorer, Explorer.Repo.PolygonZkevm,
database: database,
hostname: hostname,
url: System.get_env("DATABASE_URL"),
@ -110,11 +110,13 @@ config :explorer, Explorer.Repo.Beacon,
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1
# Configure Suave database
config :explorer, Explorer.Repo.Suave,
# Configure Rootstock database
config :explorer, Explorer.Repo.RSK,
database: database,
hostname: hostname,
url: ExplorerConfigHelper.get_suave_db_url(),
url: System.get_env("DATABASE_URL"),
# actually this repo is not started, and its pool size remains unused.
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1
# Configure Shibarium database
@ -124,13 +126,11 @@ config :explorer, Explorer.Repo.Shibarium,
url: System.get_env("DATABASE_URL"),
pool_size: 1
# Configures BridgedTokens database
config :explorer, Explorer.Repo.BridgedTokens,
# Configure Suave database
config :explorer, Explorer.Repo.Suave,
database: database,
hostname: hostname,
url: System.get_env("DATABASE_URL"),
# actually this repo is not started, and its pool size remains unused.
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
url: ExplorerConfigHelper.get_suave_db_url(),
pool_size: 1
variant = Variant.get()

@ -51,41 +51,41 @@ config :explorer, Explorer.Repo.Account,
ssl: ExplorerConfigHelper.ssl_enabled?(),
queue_target: queue_target
# Configures PolygonEdge database
config :explorer, Explorer.Repo.PolygonEdge,
# Configure Beacon Chain database
config :explorer, Explorer.Repo.Beacon,
url: System.get_env("DATABASE_URL"),
# actually this repo is not started, and its pool size remains unused.
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1,
ssl: ExplorerConfigHelper.ssl_enabled?()
# Configures PolygonZkevm database
config :explorer, Explorer.Repo.PolygonZkevm,
# Configures BridgedTokens database
config :explorer, Explorer.Repo.BridgedTokens,
url: System.get_env("DATABASE_URL"),
# actually this repo is not started, and its pool size remains unused.
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1,
ssl: ExplorerConfigHelper.ssl_enabled?()
# Configures Rootstock database
config :explorer, Explorer.Repo.RSK,
# Configures PolygonEdge database
config :explorer, Explorer.Repo.PolygonEdge,
url: System.get_env("DATABASE_URL"),
# actually this repo is not started, and its pool size remains unused.
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1,
ssl: ExplorerConfigHelper.ssl_enabled?()
# Configure Beacon Chain database
config :explorer, Explorer.Repo.Beacon,
# Configures PolygonZkevm database
config :explorer, Explorer.Repo.PolygonZkevm,
url: System.get_env("DATABASE_URL"),
# actually this repo is not started, and its pool size remains unused.
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1,
ssl: ExplorerConfigHelper.ssl_enabled?()
# Configures Suave database
config :explorer, Explorer.Repo.Suave,
url: ExplorerConfigHelper.get_suave_db_url(),
# Configures Rootstock database
config :explorer, Explorer.Repo.RSK,
url: System.get_env("DATABASE_URL"),
# actually this repo is not started, and its pool size remains unused.
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
pool_size: 1,
ssl: ExplorerConfigHelper.ssl_enabled?()
@ -95,11 +95,9 @@ config :explorer, Explorer.Repo.Shibarium,
pool_size: 1,
ssl: ExplorerConfigHelper.ssl_enabled?()
# Configures BridgedTokens database
config :explorer, Explorer.Repo.BridgedTokens,
url: System.get_env("DATABASE_URL"),
# actually this repo is not started, and its pool size remains unused.
# separating repos for different CHAIN_TYPE is implemented only for the sake of keeping DB schema update relevant to the current chain type
# Configures Suave database
config :explorer, Explorer.Repo.Suave,
url: ExplorerConfigHelper.get_suave_db_url(),
pool_size: 1,
ssl: ExplorerConfigHelper.ssl_enabled?()

@ -397,6 +397,8 @@
"retryable",
"returnaddress",
"reuseaddr",
"rollup",
"rollups",
"RPC's",
"RPCs",
"safelow",

@ -169,9 +169,16 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_POLYGON_EDGE_L2_STATE_RECEIVER_CONTRACT=
# INDEXER_POLYGON_EDGE_L2_DEPOSITS_START_BLOCK=
# INDEXER_POLYGON_EDGE_ETH_GET_LOGS_RANGE_SIZE=
# INDEXER_ZKEVM_BATCHES_ENABLED=
# INDEXER_ZKEVM_BATCHES_CHUNK_SIZE=
# INDEXER_ZKEVM_BATCHES_RECHECK_INTERVAL=
# INDEXER_POLYGON_ZKEVM_BATCHES_ENABLED=
# INDEXER_POLYGON_ZKEVM_BATCHES_CHUNK_SIZE=
# INDEXER_POLYGON_ZKEVM_BATCHES_RECHECK_INTERVAL=
# INDEXER_POLYGON_ZKEVM_L1_RPC=
# INDEXER_POLYGON_ZKEVM_L1_BRIDGE_START_BLOCK=
# INDEXER_POLYGON_ZKEVM_L1_BRIDGE_CONTRACT=
# INDEXER_POLYGON_ZKEVM_L1_BRIDGE_NATIVE_SYMBOL=
# INDEXER_POLYGON_ZKEVM_L1_BRIDGE_NATIVE_DECIMALS=
# INDEXER_POLYGON_ZKEVM_L2_BRIDGE_START_BLOCK=
# INDEXER_POLYGON_ZKEVM_L2_BRIDGE_CONTRACT=
# INDEXER_REALTIME_FETCHER_MAX_GAP=
# INDEXER_FETCHER_INIT_QUERY_LIMIT=
# INDEXER_TOKEN_BALANCES_FETCHER_INIT_QUERY_LIMIT=

Loading…
Cancel
Save