diff --git a/apps/explorer/lib/explorer/chain/import/runner/zkevm_batch_txns.ex b/apps/explorer/lib/explorer/chain/import/runner/zkevm_batch_txns.ex new file mode 100644 index 0000000000..f7ac950e0c --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/runner/zkevm_batch_txns.ex @@ -0,0 +1,76 @@ +defmodule Explorer.Chain.Import.Runner.ZkevmBatchTxns do + @moduledoc """ + Bulk imports `t:Explorer.Chain.ZkevmBatchTxn.t/0`. + """ + + require Ecto.Query + + alias Ecto.{Changeset, Multi, Repo} + alias Explorer.Chain.{Import, ZkevmBatchTxn} + alias Explorer.Prometheus.Instrumenter + + @behaviour Import.Runner + + # milliseconds + @timeout 60_000 + + @type imported :: [ZkevmBatchTxn.t()] + + @impl Import.Runner + def ecto_schema_module, do: ZkevmBatchTxn + + @impl Import.Runner + def option_key, do: :zkevm_batch_txns + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } + end + + @impl Import.Runner + def run(multi, changes_list, %{timestamps: timestamps} = options) do + insert_options = + options + |> Map.get(option_key(), %{}) + |> Map.take(~w(on_conflict timeout)a) + |> Map.put_new(:timeout, @timeout) + |> Map.put(:timestamps, timestamps) + + Multi.run(multi, :insert_zkevm_batch_txns, fn repo, _ -> + Instrumenter.block_import_stage_runner( + fn -> insert(repo, changes_list, insert_options) end, + :block_referencing, + :zkevm_batch_txns, + :zkevm_batch_txns + ) + end) + end + + @impl Import.Runner + def timeout, do: @timeout + + @spec insert(Repo.t(), [map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) :: + {:ok, [ZkevmBatchTxn.t()]} + | {:error, [Changeset.t()]} + def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = _options) when is_list(changes_list) do + # Enforce ZkevmBatchTxn ShareLocks order (see docs: sharelock.md) + ordered_changes_list = Enum.sort_by(changes_list, & &1.hash) + + {:ok, inserted} = + Import.insert_changes_list( + repo, + ordered_changes_list, + for: ZkevmBatchTxn, + returning: true, + timeout: timeout, + timestamps: timestamps, + conflict_target: :hash, + on_conflict: :nothing + ) + + {:ok, inserted} + end +end diff --git a/apps/explorer/lib/explorer/chain/import/runner/zkevm_txn_batches.ex b/apps/explorer/lib/explorer/chain/import/runner/zkevm_txn_batches.ex index 3d3b27aebf..ed1e5f1d8f 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/zkevm_txn_batches.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/zkevm_txn_batches.ex @@ -85,7 +85,6 @@ defmodule Explorer.Chain.Import.Runner.ZkevmTxnBatches do set: [ # don't update `number` as it is a primary key and used for the conflict target timestamp: fragment("EXCLUDED.timestamp"), - l2_transaction_hashes: fragment("EXCLUDED.l2_transaction_hashes"), global_exit_root: fragment("EXCLUDED.global_exit_root"), acc_input_hash: fragment("EXCLUDED.acc_input_hash"), state_root: fragment("EXCLUDED.state_root"), @@ -97,9 +96,8 @@ defmodule Explorer.Chain.Import.Runner.ZkevmTxnBatches do ], where: fragment( - "(EXCLUDED.timestamp, EXCLUDED.l2_transaction_hashes, EXCLUDED.global_exit_root, EXCLUDED.acc_input_hash, EXCLUDED.state_root, EXCLUDED.sequence_id, EXCLUDED.verify_id) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?)", + "(EXCLUDED.timestamp, EXCLUDED.global_exit_root, EXCLUDED.acc_input_hash, EXCLUDED.state_root, EXCLUDED.sequence_id, EXCLUDED.verify_id) IS DISTINCT FROM (?, ?, ?, ?, ?, ?)", tb.timestamp, - tb.l2_transaction_hashes, tb.global_exit_root, tb.acc_input_hash, tb.state_root, diff --git a/apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex b/apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex index 3330e99359..99ae4be58f 100644 --- a/apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex +++ b/apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex @@ -34,7 +34,8 @@ defmodule Explorer.Chain.Import.Stage.BlockReferencing do @default_runners ++ [ Runner.ZkevmLifecycleTxns, - Runner.ZkevmTxnBatches + Runner.ZkevmTxnBatches, + Runner.ZkevmBatchTxns ] _ -> @default_runners end diff --git a/apps/explorer/lib/explorer/chain/zkevm_batch_txn.ex b/apps/explorer/lib/explorer/chain/zkevm_batch_txn.ex new file mode 100644 index 0000000000..8a30661a3e --- /dev/null +++ b/apps/explorer/lib/explorer/chain/zkevm_batch_txn.ex @@ -0,0 +1,32 @@ +defmodule Explorer.Chain.ZkevmBatchTxn do + @moduledoc "Models a list of transactions related to a batch for zkEVM." + + use Explorer.Schema + + alias Explorer.Chain.{Hash, Transaction, ZkevmTxnBatch} + + @required_attrs ~w(batch_number hash)a + + @type t :: %__MODULE__{ + batch_number: non_neg_integer(), + batch: %Ecto.Association.NotLoaded{} | ZkevmTxnBatch.t() | nil, + hash: Hash.t(), + l2_transaction: %Ecto.Association.NotLoaded{} | Transaction.t() | nil + } + + @primary_key false + schema "zkevm_batch_l2_transactions" do + belongs_to(:batch, ZkevmTxnBatch, foreign_key: :batch_number, references: :number, type: :integer) + belongs_to(:l2_transaction, Transaction, foreign_key: :hash, references: :hash, type: Hash.Full) + + timestamps() + end + + def changeset(%__MODULE__{} = transactions, attrs \\ %{}) do + transactions + |> cast(attrs, @required_attrs) + |> validate_required(@required_attrs) + |> foreign_key_constraint(:batch_number) + |> unique_constraint(:hash) + end +end diff --git a/apps/explorer/lib/explorer/chain/zkevm_txn_batch.ex b/apps/explorer/lib/explorer/chain/zkevm_txn_batch.ex index e5359bac62..7cd380af1e 100644 --- a/apps/explorer/lib/explorer/chain/zkevm_txn_batch.ex +++ b/apps/explorer/lib/explorer/chain/zkevm_txn_batch.ex @@ -3,28 +3,28 @@ defmodule Explorer.Chain.ZkevmTxnBatch do use Explorer.Schema - alias Explorer.Chain.{Hash, ZkevmLifecycleTxn} + alias Explorer.Chain.{Hash, ZkevmBatchTxn, ZkevmLifecycleTxn} - @required_attrs ~w(number timestamp l2_transaction_hashes global_exit_root acc_input_hash state_root sequence_id verify_id)a + @optional_attrs ~w(sequence_id verify_id)a + + @required_attrs ~w(number timestamp global_exit_root acc_input_hash state_root)a @type t :: %__MODULE__{ number: non_neg_integer(), timestamp: DateTime.t(), - l2_transaction_hashes: [Hash.t()], global_exit_root: Hash.t(), acc_input_hash: Hash.t(), state_root: Hash.t(), sequence_id: non_neg_integer() | nil, sequence_transaction: %Ecto.Association.NotLoaded{} | ZkevmLifecycleTxn.t() | nil, verify_id: non_neg_integer() | nil, - verify_transaction: %Ecto.Association.NotLoaded{} | ZkevmLifecycleTxn.t() | nil + verify_transaction: %Ecto.Association.NotLoaded{} | ZkevmLifecycleTxn.t() | nil, + l2_transactions: %Ecto.Association.NotLoaded{} | [ZkevmBatchTxn.t()] } - @primary_key false + @primary_key {:number, :integer, autogenerate: false} schema "zkevm_transaction_batches" do - field(:number, :integer, primary_key: true) field(:timestamp, :utc_datetime_usec) - field(:l2_transaction_hashes, {:array, Hash.Full}) field(:global_exit_root, Hash.Full) field(:acc_input_hash, Hash.Full) field(:state_root, Hash.Full) @@ -32,14 +32,17 @@ defmodule Explorer.Chain.ZkevmTxnBatch do belongs_to(:sequence_transaction, ZkevmLifecycleTxn, foreign_key: :sequence_id, references: :id, type: :integer) belongs_to(:verify_transaction, ZkevmLifecycleTxn, foreign_key: :verify_id, references: :id, type: :integer) + has_many(:l2_transactions, ZkevmBatchTxn, foreign_key: :batch_number) + timestamps() end def changeset(%__MODULE__{} = batches, attrs \\ %{}) do batches - |> cast(attrs, @required_attrs) + |> cast(attrs, @required_attrs ++ @optional_attrs) |> validate_required(@required_attrs) |> foreign_key_constraint(:sequence_id) |> foreign_key_constraint(:verify_id) + |> unique_constraint(:number) end end diff --git a/apps/explorer/priv/repo/migrations/20230420110800_create_zkevm_tables.exs b/apps/explorer/priv/repo/migrations/20230420110800_create_zkevm_tables.exs index 73c8e41534..eeb9d1d6e2 100644 --- a/apps/explorer/priv/repo/migrations/20230420110800_create_zkevm_tables.exs +++ b/apps/explorer/priv/repo/migrations/20230420110800_create_zkevm_tables.exs @@ -14,7 +14,6 @@ defmodule Explorer.Repo.Migrations.CreateZkevmTables do create table(:zkevm_transaction_batches, primary_key: false) do add(:number, :integer, null: false, primary_key: true) add(:timestamp, :"timestamp without time zone", null: false) - add(:l2_transaction_hashes, {:array, :bytea}, null: false) add(:global_exit_root, :bytea, null: false) add(:acc_input_hash, :bytea, null: false) add(:state_root, :bytea, null: false) @@ -33,5 +32,23 @@ defmodule Explorer.Repo.Migrations.CreateZkevmTables do timestamps(null: false, type: :utc_datetime_usec) end + + create table(:zkevm_batch_l2_transactions, primary_key: false) do + add( + :batch_number, + references(:zkevm_transaction_batches, + column: :number, + on_delete: :delete_all, + on_update: :update_all, + type: :integer + ), + null: false + ) + + add(:hash, :bytea, null: false) + timestamps(null: false, type: :utc_datetime_usec) + end + + create(unique_index(:zkevm_batch_l2_transactions, :hash)) end end diff --git a/apps/indexer/lib/indexer/fetcher/zkevm_txn_batch.ex b/apps/indexer/lib/indexer/fetcher/zkevm_txn_batch.ex index 3484e45ce2..a15310fb25 100644 --- a/apps/indexer/lib/indexer/fetcher/zkevm_txn_batch.ex +++ b/apps/indexer/lib/indexer/fetcher/zkevm_txn_batch.ex @@ -37,9 +37,9 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do def init(args) do Logger.metadata(fetcher: :zkevm_txn_batches) - #Logger.configure(truncate: :infinity) + # Logger.configure(truncate: :infinity) - #enabled = Application.get_all_env(:indexer)[Indexer.Fetcher.ZkevmTxnBatch][:enabled] + # enabled = Application.get_all_env(:indexer)[Indexer.Fetcher.ZkevmTxnBatch][:enabled] Process.send(self(), :continue, []) @@ -75,6 +75,12 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do end end + @impl GenServer + def handle_info({ref, _result}, state) do + Process.demonitor(ref, [:flush]) + {:noreply, state} + end + defp get_last_verified_batch_number do query = from(tb in ZkevmTxnBatch, @@ -123,20 +129,34 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do batch_start |> Range.new(batch_end, 1) |> Enum.map(fn batch_number -> - EthereumJSONRPC.request(%{id: batch_number, method: "zkevm_getBatchByNumber", params: [integer_to_quantity(batch_number), false]}) + EthereumJSONRPC.request(%{ + id: batch_number, + method: "zkevm_getBatchByNumber", + params: [integer_to_quantity(batch_number), false] + }) end) - error_message = &"Cannot call zkevm_getBatchByNumber for the batch range #{batch_start}..#{batch_end}. Error: #{inspect(&1)}" + error_message = + &"Cannot call zkevm_getBatchByNumber for the batch range #{batch_start}..#{batch_end}. Error: #{inspect(&1)}" {:ok, responses} = repeated_call(&json_rpc/2, [requests, json_rpc_named_arguments], error_message, 3) - #Logger.warn("Hashes for the batch range #{batch_start}..#{batch_end}:") + # Logger.warn("Hashes for the batch range #{batch_start}..#{batch_end}:") {sequence_hashes, verify_hashes} = responses |> Enum.reduce({[], []}, fn res, {sequences, verifies} = _acc -> - "0x" <> send_sequences_tx_hash = Map.get(res.result, "sendSequencesTxHash") - "0x" <> verify_batch_tx_hash = Map.get(res.result, "verifyBatchTxHash") + send_sequences_tx_hash = + case Map.get(res.result, "sendSequencesTxHash") do + "0x" <> send_sequences_tx_hash -> send_sequences_tx_hash + nil -> "0000000000000000000000000000000000000000000000000000000000000000" + end + + verify_batch_tx_hash = + case Map.get(res.result, "verifyBatchTxHash") do + "0x" <> verify_batch_tx_hash -> verify_batch_tx_hash + nil -> "0000000000000000000000000000000000000000000000000000000000000000" + end sequences = if send_sequences_tx_hash != "0000000000000000000000000000000000000000000000000000000000000000" do @@ -171,9 +191,10 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do Map.put(acc, hash.bytes, id) end) - {batches_to_import, l1_txs_to_import, _, _} = + {batches_to_import, l2_txs_to_import, l1_txs_to_import, _, _} = responses - |> Enum.reduce({[], [], get_next_id(), hash_to_id}, fn res, {batches, l1_txs, next_id, hash_to_id} = _acc -> + |> Enum.reduce({[], [], [], get_next_id(), hash_to_id}, fn res, + {batches, l2_txs, l1_txs, next_id, hash_to_id} = _acc -> number = quantity_to_integer(Map.get(res.result, "number")) {:ok, timestamp} = DateTime.from_unix(quantity_to_integer(Map.get(res.result, "timestamp"))) l2_transaction_hashes = Map.get(res.result, "transactions") @@ -181,17 +202,27 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do acc_input_hash = Map.get(res.result, "accInputHash") state_root = Map.get(res.result, "stateRoot") - "0x" <> send_sequences_tx_hash = Map.get(res.result, "sendSequencesTxHash") - "0x" <> verify_batch_tx_hash = Map.get(res.result, "verifyBatchTxHash") + send_sequences_tx_hash = + case Map.get(res.result, "sendSequencesTxHash") do + "0x" <> send_sequences_tx_hash -> send_sequences_tx_hash + nil -> "0000000000000000000000000000000000000000000000000000000000000000" + end + + verify_batch_tx_hash = + case Map.get(res.result, "verifyBatchTxHash") do + "0x" <> verify_batch_tx_hash -> verify_batch_tx_hash + nil -> "0000000000000000000000000000000000000000000000000000000000000000" + end {sequence_id, l1_txs, next_id, hash_to_id} = if send_sequences_tx_hash != "0000000000000000000000000000000000000000000000000000000000000000" do sequence_tx_hash = Base.decode16!(send_sequences_tx_hash, case: :mixed) - + id = Map.get(hash_to_id, sequence_tx_hash) if is_nil(id) do - {next_id, l1_txs ++ [%{id: next_id, hash: sequence_tx_hash, is_verify: false}], next_id + 1, Map.put(hash_to_id, sequence_tx_hash, next_id)} + {next_id, l1_txs ++ [%{id: next_id, hash: sequence_tx_hash, is_verify: false}], next_id + 1, + Map.put(hash_to_id, sequence_tx_hash, next_id)} else {id, l1_txs, next_id, hash_to_id} end @@ -202,11 +233,12 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do {verify_id, l1_txs, next_id, hash_to_id} = if verify_batch_tx_hash != "0000000000000000000000000000000000000000000000000000000000000000" do verify_tx_hash = Base.decode16!(verify_batch_tx_hash, case: :mixed) - + id = Map.get(hash_to_id, verify_tx_hash) if is_nil(id) do - {next_id, l1_txs ++ [%{id: next_id, hash: verify_tx_hash, is_verify: true}], next_id + 1, Map.put(hash_to_id, verify_tx_hash, next_id)} + {next_id, l1_txs ++ [%{id: next_id, hash: verify_tx_hash, is_verify: true}], next_id + 1, + Map.put(hash_to_id, verify_tx_hash, next_id)} else {id, l1_txs, next_id, hash_to_id} end @@ -217,7 +249,6 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do batch = %{ number: number, timestamp: timestamp, - l2_transaction_hashes: l2_transaction_hashes || [], global_exit_root: global_exit_root, acc_input_hash: acc_input_hash, state_root: state_root, @@ -225,13 +256,24 @@ defmodule Indexer.Fetcher.ZkevmTxnBatch do verify_id: verify_id } - {batches ++ [batch], l1_txs, next_id, hash_to_id} + l2_txs_append = + l2_transaction_hashes + |> Kernel.||([]) + |> Enum.map(fn l2_tx_hash -> + %{ + batch_number: number, + hash: l2_tx_hash + } + end) + + {batches ++ [batch], l2_txs ++ l2_txs_append, l1_txs, next_id, hash_to_id} end) {:ok, _} = Chain.import(%{ zkevm_lifecycle_txns: %{params: l1_txs_to_import}, zkevm_txn_batches: %{params: batches_to_import}, + zkevm_batch_txns: %{params: l2_txs_to_import}, timeout: :infinity }) end diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index 0a16c674b6..14ae27a23c 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -133,7 +133,8 @@ defmodule Indexer.Supervisor do {Indexer.Fetcher.PolygonEdge.Withdrawal.Supervisor, [[memory_monitor: memory_monitor, json_rpc_named_arguments: json_rpc_named_arguments]]}, {Indexer.Fetcher.PolygonEdge.WithdrawalExit.Supervisor, [[memory_monitor: memory_monitor]]}, - {ZkevmTxnBatch.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, + {ZkevmTxnBatch.Supervisor, + [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, # Out-of-band fetchers {EmptyBlocksSanitizer.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments]]},