Prepare zkevm_bridge table and its importer

pull/9098/head
POA 12 months ago
parent b6859d3547
commit 8cf0f63b33
  1. 115
      apps/explorer/lib/explorer/chain/import/runner/zkevm/bridge_operations.ex
  2. 51
      apps/explorer/lib/explorer/chain/zkevm/bridge.ex
  3. 24
      apps/explorer/priv/polygon_zkevm/migrations/20231010093238_add_bridge_table.exs
  4. 6
      config/runtime.exs

@ -0,0 +1,115 @@
defmodule Explorer.Chain.Import.Runner.Zkevm.BridgeOperations do
@moduledoc """
Bulk imports `t:Explorer.Chain.Zkevm.Bridge.t/0`.
"""
require Ecto.Query
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Import
alias Explorer.Chain.Zkevm.Bridge, as: ZkevmBridge
alias Explorer.Prometheus.Instrumenter
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type imported :: [ZkevmBridge.t()]
@impl Import.Runner
def ecto_schema_module, do: ZkevmBridge
@impl Import.Runner
def option_key, do: :zkevm_bridge_operations
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, %{timestamps: timestamps} = options) do
insert_options =
options
|> Map.get(option_key(), %{})
|> Map.take(~w(on_conflict timeout)a)
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :insert_zkevm_bridge_operations, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end,
:block_referencing,
:zkevm_bridge_operations,
:zkevm_bridge_operations
)
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert(Repo.t(), [map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) ::
{:ok, [ZkevmBridge.t()]}
| {:error, [Changeset.t()]}
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Enforce ZkevmBridge ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, &{&1.type, &1.index})
{:ok, inserted} =
Import.insert_changes_list(
repo,
ordered_changes_list,
conflict_target: [:type, :index],
on_conflict: on_conflict,
for: ZkevmBridge,
returning: true,
timeout: timeout,
timestamps: timestamps
)
{:ok, inserted}
end
defp default_on_conflict do
from(
op in ZkevmBridge,
update: [
set: [
# Don't update `type` as it is part of the composite primary key and used for the conflict target
# Don't update `index` as it is part of the composite primary key and used for the conflict target
l1_transaction_hash: fragment("EXCLUDED.l1_transaction_hash"),
l2_transaction_hash: fragment("EXCLUDED.l2_transaction_hash"),
l1_token_address: fragment("EXCLUDED.l1_token_address"),
l1_token_decimals: fragment("EXCLUDED.l1_token_decimals"),
l1_token_symbol: fragment("EXCLUDED.l1_token_symbol"),
amount: fragment("EXCLUDED.amount"),
block_number: fragment("EXCLUDED.block_number"),
block_timestamp: fragment("EXCLUDED.block_timestamp"),
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", op.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", op.updated_at)
]
],
where:
fragment(
"(EXCLUDED.l1_transaction_hash, EXCLUDED.l2_transaction_hash, EXCLUDED.l1_token_address, EXCLUDED.l1_token_decimals, EXCLUDED.l1_token_symbol, EXCLUDED.amount, EXCLUDED.block_number, EXCLUDED.block_timestamp) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?, ?)",
op.l1_transaction_hash,
op.l2_transaction_hash,
op.l1_token_address,
op.l1_token_decimals,
op.l1_token_symbol,
op.amount,
op.block_number,
op.block_timestamp
)
)
end
end

@ -0,0 +1,51 @@
defmodule Explorer.Chain.Zkevm.Bridge do
@moduledoc "Models a bridge operation for Polygon zkEVM."
use Explorer.Schema
alias Explorer.Chain.{Block, Hash}
@optional_attrs ~w(l1_transaction_hash l2_transaction_hash l1_token_address l1_token_decimals l1_token_symbol block_number block_timestamp)a
@required_attrs ~w(type index amount)a
@type t :: %__MODULE__{
type: String.t(),
index: non_neg_integer(),
l1_transaction_hash: Hash.t() | nil,
l2_transaction_hash: Hash.t() | nil,
l1_token_address: Hash.Address.t() | nil,
l1_token_decimals: non_neg_integer() | nil,
l1_token_symbol: String.t() | nil,
amount: Decimal.t(),
block_number: Block.block_number() | nil,
block_timestamp: DateTime.t() | nil
}
@primary_key false
schema "zkevm_bridge" do
field(:type, Ecto.Enum, values: [:deposit, :withdrawal], primary_key: true)
field(:index, :integer, primary_key: true)
field(:l1_transaction_hash, Hash.Full)
field(:l2_transaction_hash, Hash.Full)
field(:l1_token_address, Hash.Address)
field(:l1_token_decimals, :integer)
field(:l1_token_symbol, :string)
field(:amount, :decimal)
field(:block_number, :integer)
field(:block_timestamp, :utc_datetime_usec)
timestamps()
end
@doc """
Checks that the `attrs` are valid.
"""
@spec changeset(Ecto.Schema.t(), map()) :: Ecto.Schema.t()
def changeset(%__MODULE__{} = operations, attrs \\ %{}) do
operations
|> cast(attrs, @required_attrs ++ @optional_attrs)
|> validate_required(@required_attrs)
|> unique_constraint([:type, :index])
end
end

@ -0,0 +1,24 @@
defmodule Explorer.Repo.PolygonZkevm.Migrations.AddBridgeTable do
use Ecto.Migration
def change do
execute(
"CREATE TYPE zkevm_bridge_op_type AS ENUM ('deposit', 'withdrawal')",
"DROP TYPE zkevm_bridge_op_type"
)
create table(:zkevm_bridge, primary_key: false) do
add(:type, :zkevm_bridge_op_type, null: false, primary_key: true)
add(:index, :integer, null: false, primary_key: true)
add(:l1_transaction_hash, :bytea, null: true, default: nil)
add(:l2_transaction_hash, :bytea, null: true, default: nil)
add(:l1_token_address, :bytea, null: true, default: nil)
add(:l1_token_decimals, :smallint, null: true, default: nil)
add(:l1_token_symbol, :string, size: 16, null: true, default: nil)
add(:amount, :numeric, precision: 100, null: false)
add(:block_number, :bigint, null: true, default: nil)
add(:block_timestamp, :"timestamp without time zone", null: true, default: nil)
timestamps(null: false, type: :utc_datetime_usec)
end
end
end

@ -728,12 +728,12 @@ config :indexer, Indexer.Fetcher.Shibarium.L1.Supervisor, enabled: ConfigHelper.
config :indexer, Indexer.Fetcher.Shibarium.L2.Supervisor, enabled: ConfigHelper.chain_type() == "shibarium" config :indexer, Indexer.Fetcher.Shibarium.L2.Supervisor, enabled: ConfigHelper.chain_type() == "shibarium"
config :indexer, Indexer.Fetcher.Zkevm.TransactionBatch, config :indexer, Indexer.Fetcher.Zkevm.TransactionBatch,
chunk_size: ConfigHelper.parse_integer_env_var("INDEXER_ZKEVM_BATCHES_CHUNK_SIZE", 20), chunk_size: ConfigHelper.parse_integer_env_var("INDEXER_POLYGON_ZKEVM_BATCHES_CHUNK_SIZE", 20),
recheck_interval: ConfigHelper.parse_integer_env_var("INDEXER_ZKEVM_BATCHES_RECHECK_INTERVAL", 60) recheck_interval: ConfigHelper.parse_integer_env_var("INDEXER_POLYGON_ZKEVM_BATCHES_RECHECK_INTERVAL", 60)
config :indexer, Indexer.Fetcher.Zkevm.TransactionBatch.Supervisor, config :indexer, Indexer.Fetcher.Zkevm.TransactionBatch.Supervisor,
enabled: enabled:
ConfigHelper.chain_type() == "polygon_zkevm" && ConfigHelper.parse_bool_env_var("INDEXER_ZKEVM_BATCHES_ENABLED") ConfigHelper.chain_type() == "polygon_zkevm" && ConfigHelper.parse_bool_env_var("INDEXER_POLYGON_ZKEVM_BATCHES_ENABLED")
Code.require_file("#{config_env()}.exs", "config/runtime") Code.require_file("#{config_env()}.exs", "config/runtime")

Loading…
Cancel
Save