Prepare for filling and scanning zkevm_bridge_l1_tokens table

pull/9098/head
POA 11 months ago
parent a285285b82
commit 19ef37c398
  1. 2
      apps/explorer/lib/explorer/chain/import/runner/zkevm/bridge_l1_tokens.ex
  2. 138
      apps/indexer/lib/indexer/fetcher/zkevm/bridge_l1.ex
  3. 117
      apps/indexer/lib/indexer/helper.ex
  4. 6
      apps/indexer/lib/indexer/supervisor.ex
  5. 2
      config/runtime.exs

@ -62,7 +62,7 @@ defmodule Explorer.Chain.Import.Runner.Zkevm.BridgeL1Tokens do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Enforce BridgeL1Token ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, &{&1.id})
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address})
{:ok, inserted} =
Import.insert_changes_list(

@ -18,6 +18,8 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
request: 1
]
import Explorer.Chain.SmartContract, only: [burn_address_hash_string: 0]
import Explorer.Helper, only: [parse_integer: 1, decode_data: 2]
alias EthereumJSONRPC.Block.ByNumber
@ -31,7 +33,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
@block_check_interval_range_size 100
@eth_get_logs_range_size 1000
@fetcher_name :zkevm_bridge_l1
@empty_hash "0x0000000000000000000000000000000000000000000000000000000000000000"
# @empty_hash "0x0000000000000000000000000000000000000000000000000000000000000000"
# 32-byte signature of the event BridgeEvent(uint8 leafType, uint32 originNetwork, address originAddress, uint32 destinationNetwork, address destinationAddress, uint256 amount, bytes metadata, uint32 depositCount)
@bridge_event "0x501781209a1f8899323b96b4ef08b168df93e0a90c673d1e4cce39366cb62f9b"
@ -95,7 +97,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])},
rpc = env[:rpc],
{:rpc_undefined, false} <- {:rpc_undefined, is_nil(rpc)},
{:bridge_contract_address_is_valid, true} <- {:bridge_contract_address_is_valid, Helper.is_address_correct?(env[:bridge_contract])},
{:bridge_contract_address_is_valid, true} <- {:bridge_contract_address_is_valid, Helper.address_correct?(env[:bridge_contract])},
start_block = parse_integer(env[:start_block]),
false <- is_nil(start_block),
true <- start_block > 0,
@ -294,7 +296,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
defp get_last_l1_item do
query =
from(b in Bridge,
select: {b.l1_block_number, b.l1_transaction_hash},
select: {b.block_number, b.l1_transaction_hash},
where: b.type == :deposit and not is_nil(b.block_number),
order_by: [desc: b.index],
limit: 1
@ -352,12 +354,12 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
end
end
defp get_token_data(token_addresses) do
defp get_token_data(token_addresses, json_rpc_named_arguments) do
# first, we're trying to read token data from the DB.
# if tokens are not in the DB, read them through RPC.
token_addresses
|> get_token_data_from_db()
|> get_token_data_from_rpc()
|> get_token_data_from_rpc(json_rpc_named_arguments)
end
defp get_token_data_from_db(token_addresses) do
@ -386,8 +388,8 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
{token_data, token_addresses_for_rpc}
end
defp get_token_data_from_rpc({token_data, token_addresses}) do
{requests, responses} = get_token_data_request_symbol_decimals(token_addresses)
defp get_token_data_from_rpc({token_data, token_addresses}, json_rpc_named_arguments) do
{requests, responses} = get_token_data_request_symbol_decimals(token_addresses, json_rpc_named_arguments)
requests
|> Enum.zip(responses)
@ -415,13 +417,13 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
defp get_new_data(data, request, response) do
if atomized_key(request.method_id) == :symbol do
%{data | symbol: response}
Map.put(data, :symbol, response)
else
%{data | decimals: response}
Map.put(data, :decimals, response)
end
end
defp get_token_data_request_symbol_decimals(token_addresses) do
defp get_token_data_request_symbol_decimals(token_addresses, json_rpc_named_arguments) do
requests =
token_addresses
|> Enum.map(fn address ->
@ -436,7 +438,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
end)
|> List.flatten()
{responses, error_messages} = read_contracts_with_retries(requests, @erc20_abi, 3)
{responses, error_messages} = read_contracts_with_retries(requests, @erc20_abi, json_rpc_named_arguments, 3)
if !Enum.empty?(error_messages) or Enum.count(requests) != Enum.count(responses) do
Logger.warning(
@ -476,67 +478,87 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
Map.put(acc, block_number, timestamp)
end)
bridge_event_params = [{:uint, 8}, {:uint, 32}, :address, {:uint, 32}, :address, {:uint, 256}, :bytes, {:uint, 32}]
claim_event_params = [{:uint, 32}, {:uint, 32}, :address, :address, {:uint, 256}]
token_data =
deposit_events
|> Enum.reduce(%{}, fn event, acc ->
[leaf_type, _origin_network, origin_address, _destination_network, _destination_address, _amount, _metadata, _deposit_count] = decode_data(event["data"], [{:uint, 8}, {:uint, 32}, :address, {:uint, 32}, :address, {:uint, 256}, :bytes, {:uint, 32}])
[leaf_type, _origin_network, origin_address, _destination_network, _destination_address, _amount, _metadata, _deposit_count] = decode_data(event["data"], bridge_event_params)
origin_address = "0x" <> Base.encode16(origin_address, case: :lower)
if leaf_type != 1 do
if leaf_type != 1 and origin_address != burn_address_hash_string() do
Map.put(acc, origin_address, true)
else
acc
end
end)
|> Map.values()
|> get_token_data()
|> Map.keys()
|> get_token_data(json_rpc_named_arguments)
tokens =
token_data
|> Enum.map(fn {address, data} ->
Map.put(data, :address, address)
end)
# todo: select known tokens from zkevm_bridge_l1_tokens table
Logger.warn("tokens: #{inspect(tokens)}")
# todo: insert only unknown tokens
{:ok, tokens_inserted} = Chain.import(%{
zkevm_bridge_l1_tokens: %{params: tokens},
timeout: :infinity
})
# todo: select remaining tokens from zkevm_bridge_l1_tokens table if they are not in `tokens_inserted`
Logger.warn("tokens_inserted: #{inspect(tokens_inserted)}")
events
|> Enum.map(fn event ->
topic0 = Enum.at(event["topics"], 0)
# user = get_op_user(topic0, event)
# {amounts_or_ids, operation_id} = get_op_amounts(topic0, event)
# {erc1155_ids, erc1155_amounts} = get_op_erc1155_data(topic0, event)
# l1_block_number = quantity_to_integer(event["blockNumber"])
# {operation_type, timestamp} =
# if is_deposit(topic0) do
# {:deposit, Map.get(timestamps, l1_block_number)}
# else
# {:withdrawal, nil}
# end
# token_type =
# cond do
# Enum.member?([@new_deposit_block_event, @withdraw_event], topic0) ->
# "bone"
# Enum.member?([@locked_ether_event, @exited_ether_event], topic0) ->
# "eth"
# true ->
# "other"
# end
# %{
# user: user,
# amount_or_id: amount_or_id,
# erc1155_ids: if(Enum.empty?(erc1155_ids), do: nil, else: erc1155_ids),
# erc1155_amounts: if(Enum.empty?(erc1155_amounts), do: nil, else: erc1155_amounts),
# l1_transaction_hash: event["transactionHash"],
# l1_block_number: l1_block_number,
# l2_transaction_hash: @empty_hash,
# operation_hash: calc_operation_hash(user, amount_or_id, erc1155_ids, erc1155_amounts, operation_id),
# operation_type: operation_type,
# token_type: token_type,
# timestamp: timestamp
# }
{type, index, amount, block_number, block_timestamp} =
if Enum.at(event["topics"], 0) == @bridge_event do
[_leaf_type, _origin_network, _origin_address, _destination_network, _destination_address, amount, _metadata, deposit_count] = decode_data(event["data"], bridge_event_params)
block_number = quantity_to_integer(event["blockNumber"])
block_timestamp = Map.get(timestamps, block_number)
{:deposit, deposit_count, amount, block_number, block_timestamp}
else
[index, _origin_network, _origin_address, _destination_address, amount] = decode_data(event["data"], claim_event_params)
{:withdrawal, index, amount, nil, nil}
end
result =
%{
type: type,
index: index,
l1_transaction_hash: event["transactionHash"],
# l1_token_id: ...,
amount: amount
}
result =
if not is_nil(block_number) do
Map.put(result, :block_number, block_number)
else
result
end
if not is_nil(block_timestamp) do
Map.put(result, :block_timestamp, block_timestamp)
else
result
end
end)
end
defp read_contracts_with_retries(requests, abi, retries_left) when retries_left > 0 do
responses = Reader.query_contracts(requests, abi)
defp read_contracts_with_retries(requests, abi, json_rpc_named_arguments, retries_left) when retries_left > 0 do
responses = Reader.query_contracts(requests, abi, json_rpc_named_arguments: json_rpc_named_arguments)
error_messages =
Enum.reduce(responses, [], fn {status, error_message}, acc ->
@ -556,7 +578,7 @@ defmodule Indexer.Fetcher.Zkevm.BridgeL1 do
if retries_left == 0 do
{responses, Enum.uniq(error_messages)}
else
read_contracts_with_retries(requests, abi, retries_left)
read_contracts_with_retries(requests, abi, json_rpc_named_arguments, retries_left)
end
end
end

@ -16,6 +16,15 @@ defmodule Indexer.Helper do
alias EthereumJSONRPC.Block.ByNumber
alias Explorer.Chain.Hash
@spec address_correct?(binary()) :: boolean()
def address_correct?(address) when is_binary(address) do
String.match?(address, ~r/^0x[[:xdigit:]]{40}$/i)
end
def address_correct?(_address) do
false
end
@spec address_hash_to_string(binary(), boolean()) :: binary()
def address_hash_to_string(hash, downcase \\ false)
@ -35,13 +44,65 @@ defmodule Indexer.Helper do
end
end
@spec address_correct?(binary()) :: boolean()
def address_correct?(address) when is_binary(address) do
String.match?(address, ~r/^0x[[:xdigit:]]{40}$/i)
@spec get_block_number_by_tag(binary(), list(), integer()) :: {:ok, non_neg_integer()} | {:error, atom()}
def get_block_number_by_tag(tag, json_rpc_named_arguments, retries \\ 3) do
error_message = &"Cannot fetch #{tag} block number. Error: #{inspect(&1)}"
repeated_call(&fetch_block_number_by_tag/2, [tag, json_rpc_named_arguments], error_message, retries)
end
def address_correct?(_address) do
false
def get_transaction_by_hash(hash, json_rpc_named_arguments, retries_left \\ 3)
def get_transaction_by_hash(hash, _json_rpc_named_arguments, _retries_left) when is_nil(hash), do: {:ok, nil}
def get_transaction_by_hash(hash, json_rpc_named_arguments, retries) do
req =
request(%{
id: 0,
method: "eth_getTransactionByHash",
params: [hash]
})
error_message = &"eth_getTransactionByHash failed. Error: #{inspect(&1)}"
repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries)
end
def log_blocks_chunk_handling(chunk_start, chunk_end, start_block, end_block, items_count, layer) do
is_start = is_nil(items_count)
{type, found} =
if is_start do
{"Start", ""}
else
{"Finish", " Found #{items_count}."}
end
target_range =
if chunk_start != start_block or chunk_end != end_block do
progress =
if is_start do
""
else
percentage =
(chunk_end - start_block + 1)
|> Decimal.div(end_block - start_block + 1)
|> Decimal.mult(100)
|> Decimal.round(2)
|> Decimal.to_string()
" Progress: #{percentage}%"
end
" Target range: #{start_block}..#{end_block}.#{progress}"
else
""
end
if chunk_start == chunk_end do
Logger.info("#{type} handling #{layer} block ##{chunk_start}.#{found}#{target_range}")
else
Logger.info("#{type} handling #{layer} block range #{chunk_start}..#{chunk_end}.#{found}#{target_range}")
end
end
@doc """
@ -195,4 +256,50 @@ defmodule Indexer.Helper do
Hash.to_string(topic)
end
end
def repeated_call(func, args, error_message, retries_left) do
case apply(func, args) do
{:ok, _} = res ->
res
{:error, message} = err ->
retries_left = retries_left - 1
if retries_left <= 0 do
Logger.error(error_message.(message))
err
else
Logger.error("#{error_message.(message)} Retrying...")
:timer.sleep(3000)
repeated_call(func, args, error_message, retries_left)
end
end
end
def get_block_timestamp_by_number(number, json_rpc_named_arguments, retries \\ 3) do
func = &get_block_timestamp_by_number_inner/2
args = [number, json_rpc_named_arguments]
error_message = &"Cannot fetch block ##{number} or its timestamp. Error: #{inspect(&1)}"
repeated_call(func, args, error_message, retries)
end
defp get_block_timestamp_by_number_inner(number, json_rpc_named_arguments) do
result =
%{id: 0, number: number}
|> ByNumber.request(false)
|> json_rpc(json_rpc_named_arguments)
with {:ok, block} <- result,
false <- is_nil(block),
timestamp <- Map.get(block, "timestamp"),
false <- is_nil(timestamp) do
{:ok, quantity_to_integer(timestamp)}
else
{:error, message} ->
{:error, message}
true ->
{:error, "RPC returned nil."}
end
end
end

@ -146,9 +146,9 @@ defmodule Indexer.Supervisor do
]),
configure(Indexer.Fetcher.Shibarium.L1.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.Zkevm.BridgeL1.Supervisor, [[memory_monitor: memory_monitor]]),
configure(Indexer.Fetcher.Zkevm.BridgeL2.Supervisor, [
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]
]),
# configure(Indexer.Fetcher.Zkevm.BridgeL2.Supervisor, [
# [json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]
# ]),
configure(Indexer.Fetcher.Zkevm.TransactionBatch.Supervisor, [
[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]
]),

@ -738,7 +738,7 @@ config :indexer, Indexer.Fetcher.Zkevm.BridgeL1.Supervisor, enabled: ConfigHelpe
config :indexer, Indexer.Fetcher.Zkevm.BridgeL2,
start_block: System.get_env("INDEXER_POLYGON_ZKEVM_L2_BRIDGE_START_BLOCK"),
bridge_contract: System.get_env("INDEXER_POLYGON_ZKEVM_L2_BRIDGE_CONTRACT"),
bridge_contract: System.get_env("INDEXER_POLYGON_ZKEVM_L2_BRIDGE_CONTRACT")
config :indexer, Indexer.Fetcher.Zkevm.BridgeL2.Supervisor, enabled: ConfigHelper.chain_type() == "polygon_zkevm"

Loading…
Cancel
Save