Move L1 RPC requests from realtime block handler to a separate GenServer

pull/9098/head
POA 10 months ago
parent ae812aa633
commit a6d2a80187
  1. 4
      apps/explorer/lib/explorer/chain/import/runner/zkevm/bridge_operations.ex
  2. 2
      apps/explorer/lib/explorer/chain/zkevm/bridge.ex
  3. 3
      apps/explorer/priv/polygon_zkevm/migrations/20231010093238_add_bridge_tables.exs
  4. 27
      apps/indexer/lib/indexer/block/fetcher.ex
  5. 2
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  6. 107
      apps/indexer/lib/indexer/fetcher/zkevm/bridge.ex
  7. 78
      apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1_tokens.ex
  8. 1
      apps/indexer/lib/indexer/supervisor.ex
  9. 1
      config/runtime.exs

@ -89,6 +89,7 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.BridgeOperations do
l1_transaction_hash: fragment("COALESCE(EXCLUDED.l1_transaction_hash, ?)", op.l1_transaction_hash),
l2_transaction_hash: fragment("COALESCE(EXCLUDED.l2_transaction_hash, ?)", op.l2_transaction_hash),
l1_token_id: fragment("COALESCE(EXCLUDED.l1_token_id, ?)", op.l1_token_id),
l1_token_address: fragment("COALESCE(EXCLUDED.l1_token_address, ?)", op.l1_token_address),
l2_token_address: fragment("COALESCE(EXCLUDED.l2_token_address, ?)", op.l2_token_address),
amount: fragment("EXCLUDED.amount"),
block_number: fragment("COALESCE(EXCLUDED.block_number, ?)", op.block_number),
@ -99,10 +100,11 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.BridgeOperations do
],
where:
fragment(
"(EXCLUDED.l1_transaction_hash, EXCLUDED.l2_transaction_hash, EXCLUDED.l1_token_id, EXCLUDED.l2_token_address, EXCLUDED.amount, EXCLUDED.block_number, EXCLUDED.block_timestamp) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?)",
"(EXCLUDED.l1_transaction_hash, EXCLUDED.l2_transaction_hash, EXCLUDED.l1_token_id, EXCLUDED.l1_token_address, EXCLUDED.l2_token_address, EXCLUDED.amount, EXCLUDED.block_number, EXCLUDED.block_timestamp) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?, ?)",
op.l1_transaction_hash,
op.l2_transaction_hash,
op.l1_token_id,
op.l1_token_address,
op.l2_token_address,
op.amount,
op.block_number,

@ -17,6 +17,7 @@ defmodule Explorer.Chain.Zkevm.Bridge do
l2_transaction_hash: Hash.t() | nil,
l1_token: %Ecto.Association.NotLoaded{} | BridgeL1Token.t() | nil,
l1_token_id: non_neg_integer() | nil,
l1_token_address: Hash.Address.t() | nil,
l2_token: %Ecto.Association.NotLoaded{} | Token.t() | nil,
l2_token_address: Hash.Address.t() | nil,
amount: Decimal.t(),
@ -31,6 +32,7 @@ defmodule Explorer.Chain.Zkevm.Bridge do
field(:l1_transaction_hash, Hash.Full)
field(:l2_transaction_hash, Hash.Full)
belongs_to(:l1_token, BridgeL1Token, foreign_key: :l1_token_id, references: :id, type: :integer)
field(:l1_token_address, Hash.Address)
belongs_to(:l2_token, Token, foreign_key: :l2_token_address, references: :contract_address_hash, type: Hash.Address)
field(:amount, :decimal)
field(:block_number, :integer)

@ -29,11 +29,14 @@ defmodule Explorer.Repo.PolygonZkevm.Migrations.AddBridgeTables do
null: true
)
add(:l1_token_address, :bytea, null: true)
add(:l2_token_address, :bytea, null: true)
add(:amount, :numeric, precision: 100, null: false)
add(:block_number, :bigint, null: true)
add(:block_timestamp, :"timestamp without time zone", null: true)
timestamps(null: false, type: :utc_datetime_usec)
end
create(index(:zkevm_bridge, :l1_token_address))
end
end

@ -17,6 +17,7 @@ defmodule Indexer.Block.Fetcher do
alias Explorer.Chain.Cache.{Accounts, BlockNumber, Transactions, Uncles}
alias Indexer.Block.Fetcher.Receipts
alias Indexer.Fetcher.TokenInstance.Realtime, as: TokenInstanceRealtime
alias Indexer.Fetcher.Zkevm.BridgeL1Tokens, as: ZkevmBridgeL1Tokens
alias Indexer.Fetcher.{
Beacon.Blob,
@ -326,6 +327,19 @@ defmodule Indexer.Block.Fetcher do
def async_import_token_instances(_), do: :ok
def async_import_blobs(%{blocks: blocks}) do
timestamps =
blocks
|> Enum.filter(fn block -> block |> Map.get(:blob_gas_used, 0) > 0 end)
|> Enum.map(&Map.get(&1, :timestamp))
if !Enum.empty?(timestamps) do
Blob.async_fetch(timestamps)
end
end
def async_import_blobs(_), do: :ok
def async_import_block_rewards([]), do: :ok
def async_import_block_rewards(errors) when is_list(errors) do
@ -408,18 +422,11 @@ defmodule Indexer.Block.Fetcher do
def async_import_replaced_transactions(_), do: :ok
def async_import_blobs(%{blocks: blocks}) do
timestamps =
blocks
|> Enum.filter(fn block -> block |> Map.get(:blob_gas_used, 0) > 0 end)
|> Enum.map(&Map.get(&1, :timestamp))
if !Enum.empty?(timestamps) do
Blob.async_fetch(timestamps)
end
def async_import_zkevm_bridge_l1_tokens(%{zkevm_bridge_operations: operations}) do
ZkevmBridgeL1Tokens.async_fetch(operations)
end
def async_import_blobs(_), do: :ok
def async_import_zkevm_bridge_l1_tokens(_), do: :ok
defp block_reward_errors_to_block_numbers(block_reward_errors) when is_list(block_reward_errors) do
Enum.map(block_reward_errors, &block_reward_error_to_block_number/1)

@ -22,6 +22,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
async_import_token_balances: 1,
async_import_token_instances: 1,
async_import_uncles: 1,
async_import_zkevm_bridge_l1_tokens: 1,
fetch_and_import_range: 2
]
@ -451,6 +452,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
async_import_uncles(imported)
async_import_replaced_transactions(imported)
async_import_blobs(imported)
async_import_zkevm_bridge_l1_tokens(imported)
end
defp balances(

@ -134,44 +134,54 @@ defmodule Indexer.Fetcher.Zkevm.Bridge do
@spec prepare_operations(list(), list() | nil, list(), map() | nil) :: list()
def prepare_operations(events, json_rpc_named_arguments, json_rpc_named_arguments_l1, block_to_timestamp \\ nil) do
bridge_events = Enum.filter(events, fn event -> event.first_topic == @bridge_event end)
block_to_timestamp =
{block_to_timestamp, token_address_to_id} =
if is_nil(block_to_timestamp) do
blocks_to_timestamps(bridge_events, json_rpc_named_arguments)
bridge_events = Enum.filter(events, fn event -> event.first_topic == @bridge_event end)
l1_token_addresses =
bridge_events
|> Enum.reduce(%MapSet{}, fn event, acc ->
case bridge_event_parse(event) do
{{nil, _}, _, _} -> acc
{{token_address, nil}, _, _} -> MapSet.put(acc, token_address)
end
end)
|> MapSet.to_list()
{
blocks_to_timestamps(bridge_events, json_rpc_named_arguments),
token_addresses_to_ids(l1_token_addresses, json_rpc_named_arguments_l1)
}
else
block_to_timestamp
# this is called in realtime
{block_to_timestamp, %{}}
end
token_address_to_id = token_addresses_to_ids(bridge_events, json_rpc_named_arguments_l1)
Enum.map(events, fn event ->
{index, l1_token_id, l2_token_address, amount, block_number, block_timestamp} =
{index, l1_token_id, l1_token_address, l2_token_address, amount, block_number, block_timestamp} =
if event.first_topic == @bridge_event do
[
leaf_type,
origin_network,
origin_address,
_destination_network,
_destination_address,
{
{l1_token_address, l2_token_address},
amount,
_metadata,
deposit_count
] = decode_data(event.data, @bridge_event_params)
{l1_token_address, l2_token_address} =
token_address_by_origin_address(origin_address, origin_network, leaf_type)
} = bridge_event_parse(event)
l1_token_id = Map.get(token_address_to_id, l1_token_address)
block_number = quantity_to_integer(event.block_number)
block_timestamp = Map.get(block_to_timestamp, block_number)
{deposit_count, l1_token_id, l2_token_address, amount, block_number, block_timestamp}
# credo:disable-for-lines:2 Credo.Check.Refactor.Nesting
l1_token_address =
if is_nil(l1_token_id) do
l1_token_address
end
{deposit_count, l1_token_id, l1_token_address, l2_token_address, amount, block_number, block_timestamp}
else
[index, _origin_network, _origin_address, _destination_address, amount] =
decode_data(event.data, @claim_event_params)
{index, nil, nil, amount, nil, nil}
{index, nil, nil, nil, amount, nil, nil}
end
is_l1 = json_rpc_named_arguments == json_rpc_named_arguments_l1
@ -192,6 +202,7 @@ defmodule Indexer.Fetcher.Zkevm.Bridge do
result
|> extend_result(transaction_hash_field, event.transaction_hash)
|> extend_result(:l1_token_id, l1_token_id)
|> extend_result(:l1_token_address, l1_token_address)
|> extend_result(:l2_token_address, l2_token_address)
|> extend_result(:block_number, block_number)
|> extend_result(:block_timestamp, block_timestamp)
@ -208,6 +219,21 @@ defmodule Indexer.Fetcher.Zkevm.Bridge do
end)
end
defp bridge_event_parse(event) do
[
leaf_type,
origin_network,
origin_address,
_destination_network,
_destination_address,
amount,
_metadata,
deposit_count
] = decode_data(event.data, @bridge_event_params)
{token_address_by_origin_address(origin_address, origin_network, leaf_type), amount, deposit_count}
end
defp operation_type(first_topic, is_l1) do
if first_topic == @bridge_event do
if is_l1, do: :deposit, else: :withdrawal
@ -216,27 +242,9 @@ defmodule Indexer.Fetcher.Zkevm.Bridge do
end
end
defp token_addresses_to_ids(events, json_rpc_named_arguments) do
def token_addresses_to_ids(l1_token_addresses, json_rpc_named_arguments) do
token_data =
events
|> Enum.reduce(%MapSet{}, fn event, acc ->
[
leaf_type,
origin_network,
origin_address,
_destination_network,
_destination_address,
_amount,
_metadata,
_deposit_count
] = decode_data(event.data, @bridge_event_params)
case token_address_by_origin_address(origin_address, origin_network, leaf_type) do
{nil, _} -> acc
{token_address, nil} -> MapSet.put(acc, token_address)
end
end)
|> MapSet.to_list()
l1_token_addresses
|> get_token_data(json_rpc_named_arguments)
tokens_existing =
@ -249,18 +257,11 @@ defmodule Indexer.Fetcher.Zkevm.Bridge do
|> Enum.reject(fn {address, _} -> Map.has_key?(tokens_existing, address) end)
|> Enum.map(fn {address, data} -> Map.put(data, :address, address) end)
# here we explicitly check CHAIN_TYPE as Dialyzer throws an error otherwise
import_options =
if System.get_env("CHAIN_TYPE") == "polygon_zkevm" do
%{
zkevm_bridge_l1_tokens: %{params: tokens_to_insert},
timeout: :infinity
}
else
%{}
end
{:ok, inserts} = Chain.import(import_options)
{:ok, inserts} =
Chain.import(%{
zkevm_bridge_l1_tokens: %{params: tokens_to_insert},
timeout: :infinity
})
tokens_inserted = Map.get(inserts, :insert_zkevm_bridge_l1_tokens, [])

@ -0,0 +1,78 @@
defmodule Indexer.Fetcher.Zkevm.BridgeL1Tokens do
@moduledoc """
Fetches information about L1 tokens for zkEVM bridge.
"""
use Indexer.Fetcher, restart: :permanent
use Spandex.Decorators
import Ecto.Query
alias Explorer.Repo
alias Indexer.BufferedTask
alias Indexer.Fetcher.Zkevm.{Bridge, BridgeL1}
@behaviour BufferedTask
@default_max_batch_size 1
@default_max_concurrency 10
@doc false
def child_spec([init_options, gen_server_options]) do
rpc = Application.get_all_env(:indexer)[BridgeL1][:rpc]
json_rpc_named_arguments = Bridge.json_rpc_named_arguments(rpc)
merged_init_opts =
defaults()
|> Keyword.merge(init_options)
|> Keyword.merge(state: json_rpc_named_arguments)
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(_, _, _) do
{0, []}
end
@impl BufferedTask
def run(l1_token_addresses, json_rpc_named_arguments) when is_list(l1_token_addresses) do
l1_token_addresses
|> Bridge.token_addresses_to_ids(json_rpc_named_arguments)
|> Enum.each(fn {l1_token_address, l1_token_id} ->
Repo.update_all(
from(b in Explorer.Chain.Zkevm.Bridge, where: b.l1_token_address == ^l1_token_address),
set: [l1_token_id: l1_token_id, l1_token_address: nil]
)
end)
end
@doc """
Fetches L1 token data asynchronously.
"""
def async_fetch(data) do
async_fetch(data, Application.get_env(:indexer, __MODULE__.Supervisor)[:enabled])
end
def async_fetch(_data, false), do: :ok
def async_fetch(operations, _enabled) do
l1_token_addresses =
operations
|> Enum.reject(fn operation -> is_nil(operation.l1_token_address) end)
|> Enum.map(fn operation -> operation.l1_token_address end)
|> Enum.uniq()
BufferedTask.buffer(__MODULE__, l1_token_addresses)
end
defp defaults do
[
flush_interval: 100,
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
poll: false,
task_supervisor: __MODULE__.TaskSupervisor
]
end
end

@ -146,6 +146,7 @@ defmodule Indexer.Supervisor do
]),
configure(Indexer.Fetcher.Shibarium.L1.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.Zkevm.BridgeL1.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.Zkevm.BridgeL1Tokens.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.Zkevm.BridgeL2.Supervisor, [
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]
]),

@ -735,6 +735,7 @@ config :indexer, Indexer.Fetcher.Zkevm.BridgeL1,
native_decimals: ConfigHelper.parse_integer_env_var("INDEXER_POLYGON_ZKEVM_L1_BRIDGE_NATIVE_DECIMALS", 18)
config :indexer, Indexer.Fetcher.Zkevm.BridgeL1.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_zkevm"
config :indexer, Indexer.Fetcher.Zkevm.BridgeL1Tokens.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_zkevm"
config :indexer, Indexer.Fetcher.Zkevm.BridgeL2,
start_block: System.get_env("INDEXER_POLYGON_ZKEVM_L2_BRIDGE_START_BLOCK"),

Loading…
Cancel
Save