Extend zkEVM batches indexer

pull/7584/head
POA 2 years ago
parent e630e51939
commit 6673b1cee1
  1. 76
      apps/explorer/lib/explorer/chain/import/runner/zkevm_batch_txns.ex
  2. 4
      apps/explorer/lib/explorer/chain/import/runner/zkevm_txn_batches.ex
  3. 3
      apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex
  4. 32
      apps/explorer/lib/explorer/chain/zkevm_batch_txn.ex
  5. 19
      apps/explorer/lib/explorer/chain/zkevm_txn_batch.ex
  6. 19
      apps/explorer/priv/repo/migrations/20230420110800_create_zkevm_tables.exs
  7. 76
      apps/indexer/lib/indexer/fetcher/zkevm_txn_batch.ex
  8. 3
      apps/indexer/lib/indexer/supervisor.ex

@ -0,0 +1,76 @@
defmodule Explorer.Chain.Import.Runner.ZkevmBatchTxns do
@moduledoc """
Bulk imports `t:Explorer.Chain.ZkevmBatchTxn.t/0`.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Import, ZkevmBatchTxn}
alias Explorer.Prometheus.Instrumenter
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type imported :: [ZkevmBatchTxn.t()]
@impl Import.Runner
def ecto_schema_module, do: ZkevmBatchTxn
@impl Import.Runner
def option_key, do: :zkevm_batch_txns
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, %{timestamps: timestamps} = options) do
insert_options =
options
|> Map.get(option_key(), %{})
|> Map.take(~w(on_conflict timeout)a)
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :insert_zkevm_batch_txns, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end,
:block_referencing,
:zkevm_batch_txns,
:zkevm_batch_txns
)
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert(Repo.t(), [map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) ::
{:ok, [ZkevmBatchTxn.t()]}
| {:error, [Changeset.t()]}
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = _options) when is_list(changes_list) do
# Enforce ZkevmBatchTxn ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, & &1.hash)
{:ok, inserted} =
Import.insert_changes_list(
repo,
ordered_changes_list,
for: ZkevmBatchTxn,
returning: true,
timeout: timeout,
timestamps: timestamps,
conflict_target: :hash,
on_conflict: :nothing
)
{:ok, inserted}
end
end

@ -85,7 +85,6 @@ defmodule Explorer.Chain.Import.Runner.ZkevmTxnBatches do
set: [
# don't update `number` as it is a primary key and used for the conflict target
timestamp: fragment("EXCLUDED.timestamp"),
l2_transaction_hashes: fragment("EXCLUDED.l2_transaction_hashes"),
global_exit_root: fragment("EXCLUDED.global_exit_root"),
acc_input_hash: fragment("EXCLUDED.acc_input_hash"),
state_root: fragment("EXCLUDED.state_root"),
@ -97,9 +96,8 @@ defmodule Explorer.Chain.Import.Runner.ZkevmTxnBatches do
],
where:
fragment(
"(EXCLUDED.timestamp, EXCLUDED.l2_transaction_hashes, EXCLUDED.global_exit_root, EXCLUDED.acc_input_hash, EXCLUDED.state_root, EXCLUDED.sequence_id, EXCLUDED.verify_id) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?)",
"(EXCLUDED.timestamp, EXCLUDED.global_exit_root, EXCLUDED.acc_input_hash, EXCLUDED.state_root, EXCLUDED.sequence_id, EXCLUDED.verify_id) IS DISTINCT FROM (?, ?, ?, ?, ?, ?)",
tb.timestamp,
tb.l2_transaction_hashes,
tb.global_exit_root,
tb.acc_input_hash,
tb.state_root,

@ -34,7 +34,8 @@ defmodule Explorer.Chain.Import.Stage.BlockReferencing do
@default_runners ++
[
Runner.ZkevmLifecycleTxns,
Runner.ZkevmTxnBatches
Runner.ZkevmTxnBatches,
Runner.ZkevmBatchTxns
]
_ -> @default_runners
end

@ -0,0 +1,32 @@
defmodule Explorer.Chain.ZkevmBatchTxn do
@moduledoc "Models a list of transactions related to a batch for zkEVM."
use Explorer.Schema
alias Explorer.Chain.{Hash, Transaction, ZkevmTxnBatch}
@required_attrs ~w(batch_number hash)a
@type t :: %__MODULE__{
batch_number: non_neg_integer(),
batch: %Ecto.Association.NotLoaded{} | ZkevmTxnBatch.t() | nil,
hash: Hash.t(),
l2_transaction: %Ecto.Association.NotLoaded{} | Transaction.t() | nil
}
@primary_key false
schema "zkevm_batch_l2_transactions" do
belongs_to(:batch, ZkevmTxnBatch, foreign_key: :batch_number, references: :number, type: :integer)
belongs_to(:l2_transaction, Transaction, foreign_key: :hash, references: :hash, type: Hash.Full)
timestamps()
end
def changeset(%__MODULE__{} = transactions, attrs \\ %{}) do
transactions
|> cast(attrs, @required_attrs)
|> validate_required(@required_attrs)
|> foreign_key_constraint(:batch_number)
|> unique_constraint(:hash)
end
end

@ -3,28 +3,28 @@ defmodule Explorer.Chain.ZkevmTxnBatch do
use Explorer.Schema
alias Explorer.Chain.{Hash, ZkevmLifecycleTxn}
alias Explorer.Chain.{Hash, ZkevmBatchTxn, ZkevmLifecycleTxn}
@required_attrs ~w(number timestamp l2_transaction_hashes global_exit_root acc_input_hash state_root sequence_id verify_id)a
@optional_attrs ~w(sequence_id verify_id)a
@required_attrs ~w(number timestamp global_exit_root acc_input_hash state_root)a
@type t :: %__MODULE__{
number: non_neg_integer(),
timestamp: DateTime.t(),
l2_transaction_hashes: [Hash.t()],
global_exit_root: Hash.t(),
acc_input_hash: Hash.t(),
state_root: Hash.t(),
sequence_id: non_neg_integer() | nil,
sequence_transaction: %Ecto.Association.NotLoaded{} | ZkevmLifecycleTxn.t() | nil,
verify_id: non_neg_integer() | nil,
verify_transaction: %Ecto.Association.NotLoaded{} | ZkevmLifecycleTxn.t() | nil
verify_transaction: %Ecto.Association.NotLoaded{} | ZkevmLifecycleTxn.t() | nil,
l2_transactions: %Ecto.Association.NotLoaded{} | [ZkevmBatchTxn.t()]
}
@primary_key false
@primary_key {:number, :integer, autogenerate: false}
schema "zkevm_transaction_batches" do
field(:number, :integer, primary_key: true)
field(:timestamp, :utc_datetime_usec)
field(:l2_transaction_hashes, {:array, Hash.Full})
field(:global_exit_root, Hash.Full)
field(:acc_input_hash, Hash.Full)
field(:state_root, Hash.Full)
@ -32,14 +32,17 @@ defmodule Explorer.Chain.ZkevmTxnBatch do
belongs_to(:sequence_transaction, ZkevmLifecycleTxn, foreign_key: :sequence_id, references: :id, type: :integer)
belongs_to(:verify_transaction, ZkevmLifecycleTxn, foreign_key: :verify_id, references: :id, type: :integer)
has_many(:l2_transactions, ZkevmBatchTxn, foreign_key: :batch_number)
timestamps()
end
def changeset(%__MODULE__{} = batches, attrs \\ %{}) do
batches
|> cast(attrs, @required_attrs)
|> cast(attrs, @required_attrs ++ @optional_attrs)
|> validate_required(@required_attrs)
|> foreign_key_constraint(:sequence_id)
|> foreign_key_constraint(:verify_id)
|> unique_constraint(:number)
end
end

@ -14,7 +14,6 @@ defmodule Explorer.Repo.Migrations.CreateZkevmTables do
create table(:zkevm_transaction_batches, primary_key: false) do
add(:number, :integer, null: false, primary_key: true)
add(:timestamp, :"timestamp without time zone", null: false)
add(:l2_transaction_hashes, {:array, :bytea}, null: false)
add(:global_exit_root, :bytea, null: false)
add(:acc_input_hash, :bytea, null: false)
add(:state_root, :bytea, null: false)
@ -33,5 +32,23 @@ defmodule Explorer.Repo.Migrations.CreateZkevmTables do
timestamps(null: false, type: :utc_datetime_usec)
end
create table(:zkevm_batch_l2_transactions, primary_key: false) do
add(
:batch_number,
references(:zkevm_transaction_batches,
column: :number,
on_delete: :delete_all,
on_update: :update_all,
type: :integer
),
null: false
)
add(:hash, :bytea, null: false)
timestamps(null: false, type: :utc_datetime_usec)
end
create(unique_index(:zkevm_batch_l2_transactions, :hash))
end
end

@ -37,9 +37,9 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do
def init(args) do
Logger.metadata(fetcher: :zkevm_txn_batches)
#Logger.configure(truncate: :infinity)
# Logger.configure(truncate: :infinity)
#enabled = Application.get_all_env(:indexer)[Indexer.Fetcher.ZkevmTxnBatch][:enabled]
# enabled = Application.get_all_env(:indexer)[Indexer.Fetcher.ZkevmTxnBatch][:enabled]
Process.send(self(), :continue, [])
@ -75,6 +75,12 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do
end
end
@impl GenServer
def handle_info({ref, _result}, state) do
Process.demonitor(ref, [:flush])
{:noreply, state}
end
defp get_last_verified_batch_number do
query =
from(tb in ZkevmTxnBatch,
@ -123,20 +129,34 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do
batch_start
|> Range.new(batch_end, 1)
|> Enum.map(fn batch_number ->
EthereumJSONRPC.request(%{id: batch_number, method: "zkevm_getBatchByNumber", params: [integer_to_quantity(batch_number), false]})
EthereumJSONRPC.request(%{
id: batch_number,
method: "zkevm_getBatchByNumber",
params: [integer_to_quantity(batch_number), false]
})
end)
error_message = &"Cannot call zkevm_getBatchByNumber for the batch range #{batch_start}..#{batch_end}. Error: #{inspect(&1)}"
error_message =
&"Cannot call zkevm_getBatchByNumber for the batch range #{batch_start}..#{batch_end}. Error: #{inspect(&1)}"
{:ok, responses} = repeated_call(&json_rpc/2, [requests, json_rpc_named_arguments], error_message, 3)
#Logger.warn("Hashes for the batch range #{batch_start}..#{batch_end}:")
# Logger.warn("Hashes for the batch range #{batch_start}..#{batch_end}:")
{sequence_hashes, verify_hashes} =
responses
|> Enum.reduce({[], []}, fn res, {sequences, verifies} = _acc ->
"0x" <> send_sequences_tx_hash = Map.get(res.result, "sendSequencesTxHash")
"0x" <> verify_batch_tx_hash = Map.get(res.result, "verifyBatchTxHash")
send_sequences_tx_hash =
case Map.get(res.result, "sendSequencesTxHash") do
"0x" <> send_sequences_tx_hash -> send_sequences_tx_hash
nil -> "0000000000000000000000000000000000000000000000000000000000000000"
end
verify_batch_tx_hash =
case Map.get(res.result, "verifyBatchTxHash") do
"0x" <> verify_batch_tx_hash -> verify_batch_tx_hash
nil -> "0000000000000000000000000000000000000000000000000000000000000000"
end
sequences =
if send_sequences_tx_hash != "0000000000000000000000000000000000000000000000000000000000000000" do
@ -171,9 +191,10 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do
Map.put(acc, hash.bytes, id)
end)
{batches_to_import, l1_txs_to_import, _, _} =
{batches_to_import, l2_txs_to_import, l1_txs_to_import, _, _} =
responses
|> Enum.reduce({[], [], get_next_id(), hash_to_id}, fn res, {batches, l1_txs, next_id, hash_to_id} = _acc ->
|> Enum.reduce({[], [], [], get_next_id(), hash_to_id}, fn res,
{batches, l2_txs, l1_txs, next_id, hash_to_id} = _acc ->
number = quantity_to_integer(Map.get(res.result, "number"))
{:ok, timestamp} = DateTime.from_unix(quantity_to_integer(Map.get(res.result, "timestamp")))
l2_transaction_hashes = Map.get(res.result, "transactions")
@ -181,17 +202,27 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do
acc_input_hash = Map.get(res.result, "accInputHash")
state_root = Map.get(res.result, "stateRoot")
"0x" <> send_sequences_tx_hash = Map.get(res.result, "sendSequencesTxHash")
"0x" <> verify_batch_tx_hash = Map.get(res.result, "verifyBatchTxHash")
send_sequences_tx_hash =
case Map.get(res.result, "sendSequencesTxHash") do
"0x" <> send_sequences_tx_hash -> send_sequences_tx_hash
nil -> "0000000000000000000000000000000000000000000000000000000000000000"
end
verify_batch_tx_hash =
case Map.get(res.result, "verifyBatchTxHash") do
"0x" <> verify_batch_tx_hash -> verify_batch_tx_hash
nil -> "0000000000000000000000000000000000000000000000000000000000000000"
end
{sequence_id, l1_txs, next_id, hash_to_id} =
if send_sequences_tx_hash != "0000000000000000000000000000000000000000000000000000000000000000" do
sequence_tx_hash = Base.decode16!(send_sequences_tx_hash, case: :mixed)
id = Map.get(hash_to_id, sequence_tx_hash)
if is_nil(id) do
{next_id, l1_txs ++ [%{id: next_id, hash: sequence_tx_hash, is_verify: false}], next_id + 1, Map.put(hash_to_id, sequence_tx_hash, next_id)}
{next_id, l1_txs ++ [%{id: next_id, hash: sequence_tx_hash, is_verify: false}], next_id + 1,
Map.put(hash_to_id, sequence_tx_hash, next_id)}
else
{id, l1_txs, next_id, hash_to_id}
end
@ -202,11 +233,12 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do
{verify_id, l1_txs, next_id, hash_to_id} =
if verify_batch_tx_hash != "0000000000000000000000000000000000000000000000000000000000000000" do
verify_tx_hash = Base.decode16!(verify_batch_tx_hash, case: :mixed)
id = Map.get(hash_to_id, verify_tx_hash)
if is_nil(id) do
{next_id, l1_txs ++ [%{id: next_id, hash: verify_tx_hash, is_verify: true}], next_id + 1, Map.put(hash_to_id, verify_tx_hash, next_id)}
{next_id, l1_txs ++ [%{id: next_id, hash: verify_tx_hash, is_verify: true}], next_id + 1,
Map.put(hash_to_id, verify_tx_hash, next_id)}
else
{id, l1_txs, next_id, hash_to_id}
end
@ -217,7 +249,6 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do
batch = %{
number: number,
timestamp: timestamp,
l2_transaction_hashes: l2_transaction_hashes || [],
global_exit_root: global_exit_root,
acc_input_hash: acc_input_hash,
state_root: state_root,
@ -225,13 +256,24 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do
verify_id: verify_id
}
{batches ++ [batch], l1_txs, next_id, hash_to_id}
l2_txs_append =
l2_transaction_hashes
|> Kernel.||([])
|> Enum.map(fn l2_tx_hash ->
%{
batch_number: number,
hash: l2_tx_hash
}
end)
{batches ++ [batch], l2_txs ++ l2_txs_append, l1_txs, next_id, hash_to_id}
end)
{:ok, _} =
Chain.import(%{
zkevm_lifecycle_txns: %{params: l1_txs_to_import},
zkevm_txn_batches: %{params: batches_to_import},
zkevm_batch_txns: %{params: l2_txs_to_import},
timeout: :infinity
})
end

@ -133,7 +133,8 @@ defmodule Indexer.Supervisor do
{Indexer.Fetcher.PolygonEdge.Withdrawal.Supervisor,
[[memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments]]},
{Indexer.Fetcher.PolygonEdge.WithdrawalExit.Supervisor, [[memory_monitor: memory_monitor]]},
{ZkevmTxnBatch.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{ZkevmTxnBatch.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
# Out-of-band fetchers
{EmptyBlocksSanitizer.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments]]},

Loading…
Cancel
Save