Shibarium Bridge indexer and API v2 extension (#8929)
* Define shibarium_bridge table * Add ShibariumBridgeOperations runner * Add init for Indexer.Fetcher.Shibarium.L1 * Draft for Indexer.Fetcher.Shibarium.L1 * mix format for Indexer.Fetcher.Shibarium.L1 * Fix Indexer.Fetcher.Shibarium.L1 * Refactor Indexer.Fetcher.Shibarium.L1 * Add draft (incomplete) for Indexer.Fetcher.Shibarium.L2 * Extend draft (incomplete) for Indexer.Fetcher.Shibarium.L2 * Extend draft (incomplete) for Indexer.Fetcher.Shibarium.L2 * Complete unrefactored Indexer.Fetcher.Shibarium.L2 * Improve Indexer.Fetcher.Shibarium.L2 * Prepare Indexer.Fetcher.Shibarium.L2 for transformer * Add unrefactored Indexer.Transform.Shibarium.Bridge * Small refactoring of Shibarium modules * Refactoring * Add API v2 for Shibarium Deposits and Withdrawals * Small fixes * Update changelog * Fixes and refactoring * Partially add specs and docs * Partially add specs and docs * Log topic type changed to bytea * Small improvements * Cache deposits and withdrawals counters for Shibarium * Fixes for credo * Fixes for Chain.import related to CHAIN_TYPE * Reset GA cache * Small refactoring * Update mix.exs * Remove unnecessary credo ignore --------- Co-authored-by: POA <33550681+poa@users.noreply.github.com> Co-authored-by: Viktor Baranov <baranov.viktor.27@gmail.com>pull/9178/head
parent
875858a40c
commit
c7724f51df
@ -0,0 +1,79 @@ |
||||
defmodule BlockScoutWeb.API.V2.ShibariumController do |
||||
use BlockScoutWeb, :controller |
||||
|
||||
import BlockScoutWeb.Chain, |
||||
only: [ |
||||
next_page_params: 3, |
||||
paging_options: 1, |
||||
split_list_by_page: 1 |
||||
] |
||||
|
||||
alias Explorer.Chain.Cache.ShibariumCounter |
||||
alias Explorer.Chain.Shibarium.Reader |
||||
|
||||
action_fallback(BlockScoutWeb.API.V2.FallbackController) |
||||
|
||||
@spec deposits(Plug.Conn.t(), map()) :: Plug.Conn.t() |
||||
def deposits(conn, params) do |
||||
{deposits, next_page} = |
||||
params |
||||
|> paging_options() |
||||
|> Keyword.put(:api?, true) |
||||
|> Reader.deposits() |
||||
|> split_list_by_page() |
||||
|
||||
next_page_params = next_page_params(next_page, deposits, params) |
||||
|
||||
conn |
||||
|> put_status(200) |
||||
|> render(:shibarium_deposits, %{ |
||||
deposits: deposits, |
||||
next_page_params: next_page_params |
||||
}) |
||||
end |
||||
|
||||
@spec deposits_count(Plug.Conn.t(), map()) :: Plug.Conn.t() |
||||
def deposits_count(conn, _params) do |
||||
count = |
||||
case ShibariumCounter.deposits_count(api?: true) do |
||||
0 -> Reader.deposits_count(api?: true) |
||||
value -> value |
||||
end |
||||
|
||||
conn |
||||
|> put_status(200) |
||||
|> render(:shibarium_items_count, %{count: count}) |
||||
end |
||||
|
||||
@spec withdrawals(Plug.Conn.t(), map()) :: Plug.Conn.t() |
||||
def withdrawals(conn, params) do |
||||
{withdrawals, next_page} = |
||||
params |
||||
|> paging_options() |
||||
|> Keyword.put(:api?, true) |
||||
|> Reader.withdrawals() |
||||
|> split_list_by_page() |
||||
|
||||
next_page_params = next_page_params(next_page, withdrawals, params) |
||||
|
||||
conn |
||||
|> put_status(200) |
||||
|> render(:shibarium_withdrawals, %{ |
||||
withdrawals: withdrawals, |
||||
next_page_params: next_page_params |
||||
}) |
||||
end |
||||
|
||||
@spec withdrawals_count(Plug.Conn.t(), map()) :: Plug.Conn.t() |
||||
def withdrawals_count(conn, _params) do |
||||
count = |
||||
case ShibariumCounter.withdrawals_count(api?: true) do |
||||
0 -> Reader.withdrawals_count(api?: true) |
||||
value -> value |
||||
end |
||||
|
||||
conn |
||||
|> put_status(200) |
||||
|> render(:shibarium_items_count, %{count: count}) |
||||
end |
||||
end |
@ -0,0 +1,46 @@ |
||||
defmodule BlockScoutWeb.API.V2.ShibariumView do |
||||
use BlockScoutWeb, :view |
||||
|
||||
@spec render(String.t(), map()) :: map() |
||||
def render("shibarium_deposits.json", %{ |
||||
deposits: deposits, |
||||
next_page_params: next_page_params |
||||
}) do |
||||
%{ |
||||
items: |
||||
Enum.map(deposits, fn deposit -> |
||||
%{ |
||||
"l1_block_number" => deposit.l1_block_number, |
||||
"l1_transaction_hash" => deposit.l1_transaction_hash, |
||||
"l2_transaction_hash" => deposit.l2_transaction_hash, |
||||
"user" => deposit.user, |
||||
"timestamp" => deposit.timestamp |
||||
} |
||||
end), |
||||
next_page_params: next_page_params |
||||
} |
||||
end |
||||
|
||||
def render("shibarium_withdrawals.json", %{ |
||||
withdrawals: withdrawals, |
||||
next_page_params: next_page_params |
||||
}) do |
||||
%{ |
||||
items: |
||||
Enum.map(withdrawals, fn withdrawal -> |
||||
%{ |
||||
"l2_block_number" => withdrawal.l2_block_number, |
||||
"l2_transaction_hash" => withdrawal.l2_transaction_hash, |
||||
"l1_transaction_hash" => withdrawal.l1_transaction_hash, |
||||
"user" => withdrawal.user, |
||||
"timestamp" => withdrawal.timestamp |
||||
} |
||||
end), |
||||
next_page_params: next_page_params |
||||
} |
||||
end |
||||
|
||||
def render("shibarium_items_count.json", %{count: count}) do |
||||
count |
||||
end |
||||
end |
@ -0,0 +1,58 @@ |
||||
defmodule Explorer.Chain.Cache.ShibariumCounter do |
||||
@moduledoc """ |
||||
Caches the number of deposits and withdrawals for Shibarium Bridge. |
||||
""" |
||||
|
||||
alias Explorer.Chain |
||||
|
||||
@deposits_counter_type "shibarium_deposits_counter" |
||||
@withdrawals_counter_type "shibarium_withdrawals_counter" |
||||
|
||||
@doc """ |
||||
Fetches the cached deposits count from the `last_fetched_counters` table. |
||||
""" |
||||
def deposits_count(options \\ []) do |
||||
Chain.get_last_fetched_counter(@deposits_counter_type, options) |
||||
end |
||||
|
||||
@doc """ |
||||
Fetches the cached withdrawals count from the `last_fetched_counters` table. |
||||
""" |
||||
def withdrawals_count(options \\ []) do |
||||
Chain.get_last_fetched_counter(@withdrawals_counter_type, options) |
||||
end |
||||
|
||||
@doc """ |
||||
Stores or increments the current deposits count in the `last_fetched_counters` table. |
||||
""" |
||||
def deposits_count_save(count, just_increment \\ false) do |
||||
if just_increment do |
||||
Chain.increment_last_fetched_counter( |
||||
@deposits_counter_type, |
||||
count |
||||
) |
||||
else |
||||
Chain.upsert_last_fetched_counter(%{ |
||||
counter_type: @deposits_counter_type, |
||||
value: count |
||||
}) |
||||
end |
||||
end |
||||
|
||||
@doc """ |
||||
Stores or increments the current withdrawals count in the `last_fetched_counters` table. |
||||
""" |
||||
def withdrawals_count_save(count, just_increment \\ false) do |
||||
if just_increment do |
||||
Chain.increment_last_fetched_counter( |
||||
@withdrawals_counter_type, |
||||
count |
||||
) |
||||
else |
||||
Chain.upsert_last_fetched_counter(%{ |
||||
counter_type: @withdrawals_counter_type, |
||||
value: count |
||||
}) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,119 @@ |
||||
defmodule Explorer.Chain.Import.Runner.Shibarium.BridgeOperations do |
||||
@moduledoc """ |
||||
Bulk imports `t:Explorer.Chain.Shibarium.Bridge.t/0`. |
||||
""" |
||||
|
||||
require Ecto.Query |
||||
|
||||
import Ecto.Query, only: [from: 2] |
||||
|
||||
alias Ecto.{Changeset, Multi, Repo} |
||||
alias Explorer.Chain.Import |
||||
alias Explorer.Chain.Shibarium.Bridge, as: ShibariumBridge |
||||
alias Explorer.Prometheus.Instrumenter |
||||
|
||||
@behaviour Import.Runner |
||||
|
||||
# milliseconds |
||||
@timeout 60_000 |
||||
|
||||
@type imported :: [ShibariumBridge.t()] |
||||
|
||||
@impl Import.Runner |
||||
def ecto_schema_module, do: ShibariumBridge |
||||
|
||||
@impl Import.Runner |
||||
def option_key, do: :shibarium_bridge_operations |
||||
|
||||
@impl Import.Runner |
||||
def imported_table_row do |
||||
%{ |
||||
value_type: "[#{ecto_schema_module()}.t()]", |
||||
value_description: "List of `t:#{ecto_schema_module()}.t/0`s" |
||||
} |
||||
end |
||||
|
||||
@impl Import.Runner |
||||
def run(multi, changes_list, %{timestamps: timestamps} = options) do |
||||
insert_options = |
||||
options |
||||
|> Map.get(option_key(), %{}) |
||||
|> Map.take(~w(on_conflict timeout)a) |
||||
|> Map.put_new(:timeout, @timeout) |
||||
|> Map.put(:timestamps, timestamps) |
||||
|
||||
Multi.run(multi, :insert_shibarium_bridge_operations, fn repo, _ -> |
||||
Instrumenter.block_import_stage_runner( |
||||
fn -> insert(repo, changes_list, insert_options) end, |
||||
:block_referencing, |
||||
:shibarium_bridge_operations, |
||||
:shibarium_bridge_operations |
||||
) |
||||
end) |
||||
end |
||||
|
||||
@impl Import.Runner |
||||
def timeout, do: @timeout |
||||
|
||||
@spec insert(Repo.t(), [map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) :: |
||||
{:ok, [ShibariumBridge.t()]} |
||||
| {:error, [Changeset.t()]} |
||||
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do |
||||
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) |
||||
|
||||
# Enforce ShibariumBridge ShareLocks order (see docs: sharelock.md) |
||||
ordered_changes_list = |
||||
Enum.sort_by(changes_list, &{&1.operation_hash, &1.l1_transaction_hash, &1.l2_transaction_hash}) |
||||
|
||||
{:ok, inserted} = |
||||
Import.insert_changes_list( |
||||
repo, |
||||
ordered_changes_list, |
||||
conflict_target: [:operation_hash, :l1_transaction_hash, :l2_transaction_hash], |
||||
on_conflict: on_conflict, |
||||
for: ShibariumBridge, |
||||
returning: true, |
||||
timeout: timeout, |
||||
timestamps: timestamps |
||||
) |
||||
|
||||
{:ok, inserted} |
||||
end |
||||
|
||||
defp default_on_conflict do |
||||
from( |
||||
op in ShibariumBridge, |
||||
update: [ |
||||
set: [ |
||||
# Don't update `operation_hash` as it is part of the composite primary key and used for the conflict target |
||||
# Don't update `l1_transaction_hash` as it is part of the composite primary key and used for the conflict target |
||||
# Don't update `l2_transaction_hash` as it is part of the composite primary key and used for the conflict target |
||||
# Don't update `operation_type` as it is not changed |
||||
user: fragment("EXCLUDED.user"), |
||||
amount_or_id: fragment("EXCLUDED.amount_or_id"), |
||||
erc1155_ids: fragment("EXCLUDED.erc1155_ids"), |
||||
erc1155_amounts: fragment("EXCLUDED.erc1155_amounts"), |
||||
l1_block_number: fragment("EXCLUDED.l1_block_number"), |
||||
l2_block_number: fragment("EXCLUDED.l2_block_number"), |
||||
token_type: fragment("EXCLUDED.token_type"), |
||||
timestamp: fragment("EXCLUDED.timestamp"), |
||||
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", op.inserted_at), |
||||
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", op.updated_at) |
||||
] |
||||
], |
||||
where: |
||||
fragment( |
||||
"(EXCLUDED.user, EXCLUDED.amount_or_id, EXCLUDED.erc1155_ids, EXCLUDED.erc1155_amounts, EXCLUDED.operation_type, EXCLUDED.l1_block_number, EXCLUDED.l2_block_number, EXCLUDED.token_type, EXCLUDED.timestamp) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?, ?, ?)", |
||||
op.user, |
||||
op.amount_or_id, |
||||
op.erc1155_ids, |
||||
op.erc1155_amounts, |
||||
op.operation_type, |
||||
op.l1_block_number, |
||||
op.l2_block_number, |
||||
op.token_type, |
||||
op.timestamp |
||||
) |
||||
) |
||||
end |
||||
end |
@ -0,0 +1,85 @@ |
||||
defmodule Explorer.Chain.Shibarium.Bridge do |
||||
@moduledoc "Models Shibarium Bridge operation." |
||||
|
||||
use Explorer.Schema |
||||
|
||||
alias Explorer.Chain.{ |
||||
Address, |
||||
Hash, |
||||
Transaction |
||||
} |
||||
|
||||
@optional_attrs ~w(amount_or_id erc1155_ids erc1155_amounts l1_transaction_hash l1_block_number l2_transaction_hash l2_block_number timestamp)a |
||||
|
||||
@required_attrs ~w(user operation_hash operation_type token_type)a |
||||
|
||||
@allowed_attrs @optional_attrs ++ @required_attrs |
||||
|
||||
@typedoc """ |
||||
* `user_address` - address of the user that initiated operation |
||||
* `user` - foreign key of `user_address` |
||||
* `amount_or_id` - amount of the operation or NTF id (in case of ERC-721 token) |
||||
* `erc1155_ids` - an array of ERC-1155 token ids (when batch ERC-1155 token transfer) |
||||
* `erc1155_amounts` - an array of corresponding ERC-1155 token amounts (when batch ERC-1155 token transfer) |
||||
* `l1_transaction_hash` - transaction hash for L1 side |
||||
* `l1_block_number` - block number of `l1_transaction` |
||||
* `l2_transaction` - transaction hash for L2 side |
||||
* `l2_transaction_hash` - foreign key of `l2_transaction` |
||||
* `l2_block_number` - block number of `l2_transaction` |
||||
* `operation_hash` - keccak256 hash of the operation calculated as follows: ExKeccak.hash_256(user, amount_or_id, erc1155_ids, erc1155_amounts, operation_id) |
||||
* `operation_type` - `deposit` or `withdrawal` |
||||
* `token_type` - `bone` or `eth` or `other` |
||||
* `timestamp` - timestamp of the operation block (L1 block for deposit, L2 block - for withdrawal) |
||||
""" |
||||
@type t :: %__MODULE__{ |
||||
user_address: %Ecto.Association.NotLoaded{} | Address.t(), |
||||
user: Hash.Address.t(), |
||||
amount_or_id: Decimal.t() | nil, |
||||
erc1155_ids: [non_neg_integer()] | nil, |
||||
erc1155_amounts: [Decimal.t()] | nil, |
||||
l1_transaction_hash: Hash.t(), |
||||
l1_block_number: non_neg_integer() | nil, |
||||
l2_transaction: %Ecto.Association.NotLoaded{} | Transaction.t() | nil, |
||||
l2_transaction_hash: Hash.t(), |
||||
l2_block_number: non_neg_integer() | nil, |
||||
operation_hash: Hash.t(), |
||||
operation_type: String.t(), |
||||
token_type: String.t(), |
||||
timestamp: DateTime.t(), |
||||
inserted_at: DateTime.t(), |
||||
updated_at: DateTime.t() |
||||
} |
||||
|
||||
@primary_key false |
||||
schema "shibarium_bridge" do |
||||
belongs_to(:user_address, Address, foreign_key: :user, references: :hash, type: Hash.Address) |
||||
field(:amount_or_id, :decimal) |
||||
field(:erc1155_ids, {:array, :decimal}) |
||||
field(:erc1155_amounts, {:array, :decimal}) |
||||
field(:operation_hash, Hash.Full, primary_key: true) |
||||
field(:operation_type, Ecto.Enum, values: [:deposit, :withdrawal]) |
||||
field(:l1_transaction_hash, Hash.Full, primary_key: true) |
||||
field(:l1_block_number, :integer) |
||||
|
||||
belongs_to(:l2_transaction, Transaction, |
||||
foreign_key: :l2_transaction_hash, |
||||
references: :hash, |
||||
type: Hash.Full, |
||||
primary_key: true |
||||
) |
||||
|
||||
field(:l2_block_number, :integer) |
||||
field(:token_type, Ecto.Enum, values: [:bone, :eth, :other]) |
||||
field(:timestamp, :utc_datetime_usec) |
||||
|
||||
timestamps() |
||||
end |
||||
|
||||
@spec changeset(Ecto.Schema.t(), map()) :: Ecto.Schema.t() |
||||
def changeset(%__MODULE__{} = module, attrs \\ %{}) do |
||||
module |
||||
|> cast(attrs, @allowed_attrs) |
||||
|> validate_required(@required_attrs) |
||||
|> unique_constraint([:operation_hash, :l1_transaction_hash, :l2_transaction_hash]) |
||||
end |
||||
end |
@ -0,0 +1,108 @@ |
||||
defmodule Explorer.Chain.Shibarium.Reader do |
||||
@moduledoc "Contains read functions for Shibarium modules." |
||||
|
||||
import Ecto.Query, |
||||
only: [ |
||||
from: 2, |
||||
limit: 2 |
||||
] |
||||
|
||||
import Explorer.Chain, only: [default_paging_options: 0, select_repo: 1] |
||||
|
||||
alias Explorer.Chain.Shibarium.Bridge |
||||
alias Explorer.PagingOptions |
||||
|
||||
@doc """ |
||||
Returns a list of completed Shibarium deposits to display them in UI. |
||||
""" |
||||
@spec deposits(list()) :: list() |
||||
def deposits(options \\ []) do |
||||
paging_options = Keyword.get(options, :paging_options, default_paging_options()) |
||||
|
||||
base_query = |
||||
from( |
||||
sb in Bridge, |
||||
where: sb.operation_type == :deposit and not is_nil(sb.l1_block_number) and not is_nil(sb.l2_block_number), |
||||
select: %{ |
||||
l1_block_number: sb.l1_block_number, |
||||
l1_transaction_hash: sb.l1_transaction_hash, |
||||
l2_transaction_hash: sb.l2_transaction_hash, |
||||
user: sb.user, |
||||
timestamp: sb.timestamp |
||||
}, |
||||
order_by: [desc: sb.l1_block_number] |
||||
) |
||||
|
||||
base_query |
||||
|> page_deposits(paging_options) |
||||
|> limit(^paging_options.page_size) |
||||
|> select_repo(options).all() |
||||
end |
||||
|
||||
@doc """ |
||||
Returns a total number of completed Shibarium deposits. |
||||
""" |
||||
@spec deposits_count(list()) :: term() | nil |
||||
def deposits_count(options \\ []) do |
||||
query = |
||||
from( |
||||
sb in Bridge, |
||||
where: sb.operation_type == :deposit and not is_nil(sb.l1_block_number) and not is_nil(sb.l2_block_number) |
||||
) |
||||
|
||||
select_repo(options).aggregate(query, :count, timeout: :infinity) |
||||
end |
||||
|
||||
@doc """ |
||||
Returns a list of completed Shibarium withdrawals to display them in UI. |
||||
""" |
||||
@spec withdrawals(list()) :: list() |
||||
def withdrawals(options \\ []) do |
||||
paging_options = Keyword.get(options, :paging_options, default_paging_options()) |
||||
|
||||
base_query = |
||||
from( |
||||
sb in Bridge, |
||||
where: sb.operation_type == :withdrawal and not is_nil(sb.l1_block_number) and not is_nil(sb.l2_block_number), |
||||
select: %{ |
||||
l2_block_number: sb.l2_block_number, |
||||
l2_transaction_hash: sb.l2_transaction_hash, |
||||
l1_transaction_hash: sb.l1_transaction_hash, |
||||
user: sb.user, |
||||
timestamp: sb.timestamp |
||||
}, |
||||
order_by: [desc: sb.l2_block_number] |
||||
) |
||||
|
||||
base_query |
||||
|> page_withdrawals(paging_options) |
||||
|> limit(^paging_options.page_size) |
||||
|> select_repo(options).all() |
||||
end |
||||
|
||||
@doc """ |
||||
Returns a total number of completed Shibarium withdrawals. |
||||
""" |
||||
@spec withdrawals_count(list()) :: term() | nil |
||||
def withdrawals_count(options \\ []) do |
||||
query = |
||||
from( |
||||
sb in Bridge, |
||||
where: sb.operation_type == :withdrawal and not is_nil(sb.l1_block_number) and not is_nil(sb.l2_block_number) |
||||
) |
||||
|
||||
select_repo(options).aggregate(query, :count, timeout: :infinity) |
||||
end |
||||
|
||||
defp page_deposits(query, %PagingOptions{key: nil}), do: query |
||||
|
||||
defp page_deposits(query, %PagingOptions{key: {block_number}}) do |
||||
from(item in query, where: item.l1_block_number < ^block_number) |
||||
end |
||||
|
||||
defp page_withdrawals(query, %PagingOptions{key: nil}), do: query |
||||
|
||||
defp page_withdrawals(query, %PagingOptions{key: {block_number}}) do |
||||
from(item in query, where: item.l2_block_number < ^block_number) |
||||
end |
||||
end |
@ -0,0 +1,34 @@ |
||||
defmodule Explorer.Repo.Shibarium.Migrations.AddBridgeTable do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
execute( |
||||
"CREATE TYPE shibarium_bridge_operation_type AS ENUM ('deposit', 'withdrawal')", |
||||
"DROP TYPE shibarium_bridge_operation_type" |
||||
) |
||||
|
||||
execute( |
||||
"CREATE TYPE shibarium_bridge_token_type AS ENUM ('bone', 'eth', 'other')", |
||||
"DROP TYPE shibarium_bridge_token_type" |
||||
) |
||||
|
||||
create table(:shibarium_bridge, primary_key: false) do |
||||
add(:user, :bytea, null: false) |
||||
add(:amount_or_id, :numeric, precision: 100, null: true) |
||||
add(:erc1155_ids, {:array, :numeric}, precision: 78, scale: 0, null: true) |
||||
add(:erc1155_amounts, {:array, :decimal}, null: true) |
||||
add(:operation_hash, :bytea, primary_key: true) |
||||
add(:operation_type, :shibarium_bridge_operation_type, null: false) |
||||
add(:l1_transaction_hash, :bytea, primary_key: true) |
||||
add(:l1_block_number, :bigint, null: true) |
||||
add(:l2_transaction_hash, :bytea, primary_key: true) |
||||
add(:l2_block_number, :bigint, null: true) |
||||
add(:token_type, :shibarium_bridge_token_type, null: false) |
||||
add(:timestamp, :"timestamp without time zone", null: true) |
||||
timestamps(null: false, type: :utc_datetime_usec) |
||||
end |
||||
|
||||
create(index(:shibarium_bridge, [:l1_block_number, :operation_type])) |
||||
create(index(:shibarium_bridge, [:l2_block_number, :operation_type])) |
||||
end |
||||
end |
@ -0,0 +1,136 @@ |
||||
defmodule Indexer.Fetcher.Shibarium.Helper do |
||||
@moduledoc """ |
||||
Common functions for Indexer.Fetcher.Shibarium.* modules. |
||||
""" |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.Chain.Cache.ShibariumCounter |
||||
alias Explorer.Chain.Shibarium.{Bridge, Reader} |
||||
alias Explorer.Repo |
||||
|
||||
@empty_hash "0x0000000000000000000000000000000000000000000000000000000000000000" |
||||
|
||||
@doc """ |
||||
Calculates Shibarium Bridge operation hash as hash_256(user_address, amount_or_id, erc1155_ids, erc1155_amounts, operation_id). |
||||
""" |
||||
@spec calc_operation_hash(binary(), non_neg_integer() | nil, list(), list(), non_neg_integer()) :: binary() |
||||
def calc_operation_hash(user, amount_or_id, erc1155_ids, erc1155_amounts, operation_id) do |
||||
user_binary = |
||||
user |
||||
|> String.trim_leading("0x") |
||||
|> Base.decode16!(case: :mixed) |
||||
|
||||
amount_or_id = |
||||
if is_nil(amount_or_id) and not Enum.empty?(erc1155_ids) do |
||||
0 |
||||
else |
||||
amount_or_id |
||||
end |
||||
|
||||
operation_encoded = |
||||
ABI.encode("(address,uint256,uint256[],uint256[],uint256)", [ |
||||
{ |
||||
user_binary, |
||||
amount_or_id, |
||||
erc1155_ids, |
||||
erc1155_amounts, |
||||
operation_id |
||||
} |
||||
]) |
||||
|
||||
"0x" <> |
||||
(operation_encoded |
||||
|> ExKeccak.hash_256() |
||||
|> Base.encode16(case: :lower)) |
||||
end |
||||
|
||||
@doc """ |
||||
Prepares a list of Shibarium Bridge operations to import them into database. |
||||
Tries to bind the given operations to the existing ones in DB first. |
||||
If they don't exist, prepares the insertion list and returns it. |
||||
""" |
||||
@spec prepare_insert_items(list(), module()) :: list() |
||||
def prepare_insert_items(operations, calling_module) do |
||||
operations |
||||
|> Enum.reduce([], fn op, acc -> |
||||
if bind_existing_operation_in_db(op, calling_module) == 0 do |
||||
[op | acc] |
||||
else |
||||
acc |
||||
end |
||||
end) |
||||
|> Enum.reverse() |
||||
|> Enum.reduce(%{}, fn item, acc -> |
||||
Map.put(acc, {item.operation_hash, item.l1_transaction_hash, item.l2_transaction_hash}, item) |
||||
end) |
||||
|> Map.values() |
||||
end |
||||
|
||||
@doc """ |
||||
Recalculate the cached count of complete rows for deposits and withdrawals. |
||||
""" |
||||
@spec recalculate_cached_count() :: no_return() |
||||
def recalculate_cached_count do |
||||
ShibariumCounter.deposits_count_save(Reader.deposits_count()) |
||||
ShibariumCounter.withdrawals_count_save(Reader.withdrawals_count()) |
||||
end |
||||
|
||||
defp bind_existing_operation_in_db(op, calling_module) do |
||||
{query, set} = make_query_for_bind(op, calling_module) |
||||
|
||||
{updated_count, _} = |
||||
Repo.update_all( |
||||
from(b in Bridge, |
||||
join: s in subquery(query), |
||||
on: |
||||
b.operation_hash == s.operation_hash and b.l1_transaction_hash == s.l1_transaction_hash and |
||||
b.l2_transaction_hash == s.l2_transaction_hash |
||||
), |
||||
set: set |
||||
) |
||||
|
||||
# increment the cached count of complete rows |
||||
case updated_count > 0 && op.operation_type do |
||||
:deposit -> ShibariumCounter.deposits_count_save(updated_count, true) |
||||
:withdrawal -> ShibariumCounter.withdrawals_count_save(updated_count, true) |
||||
false -> nil |
||||
end |
||||
|
||||
updated_count |
||||
end |
||||
|
||||
defp make_query_for_bind(op, calling_module) when calling_module == Indexer.Fetcher.Shibarium.L1 do |
||||
query = |
||||
from(sb in Bridge, |
||||
where: |
||||
sb.operation_hash == ^op.operation_hash and sb.operation_type == ^op.operation_type and |
||||
sb.l2_transaction_hash != ^@empty_hash and sb.l1_transaction_hash == ^@empty_hash, |
||||
order_by: [asc: sb.l2_block_number], |
||||
limit: 1 |
||||
) |
||||
|
||||
set = |
||||
[l1_transaction_hash: op.l1_transaction_hash, l1_block_number: op.l1_block_number] ++ |
||||
if(op.operation_type == :deposit, do: [timestamp: op.timestamp], else: []) |
||||
|
||||
{query, set} |
||||
end |
||||
|
||||
defp make_query_for_bind(op, calling_module) when calling_module == Indexer.Fetcher.Shibarium.L2 do |
||||
query = |
||||
from(sb in Bridge, |
||||
where: |
||||
sb.operation_hash == ^op.operation_hash and sb.operation_type == ^op.operation_type and |
||||
sb.l1_transaction_hash != ^@empty_hash and sb.l2_transaction_hash == ^@empty_hash, |
||||
order_by: [asc: sb.l1_block_number], |
||||
limit: 1 |
||||
) |
||||
|
||||
set = |
||||
[l2_transaction_hash: op.l2_transaction_hash, l2_block_number: op.l2_block_number] ++ |
||||
if(op.operation_type == :withdrawal, do: [timestamp: op.timestamp], else: []) |
||||
|
||||
{query, set} |
||||
end |
||||
end |
@ -0,0 +1,722 @@ |
||||
defmodule Indexer.Fetcher.Shibarium.L1 do |
||||
@moduledoc """ |
||||
Fills shibarium_bridge DB table. |
||||
""" |
||||
|
||||
use GenServer |
||||
use Indexer.Fetcher |
||||
|
||||
require Logger |
||||
|
||||
import Ecto.Query |
||||
|
||||
import EthereumJSONRPC, |
||||
only: [ |
||||
integer_to_quantity: 1, |
||||
json_rpc: 2, |
||||
quantity_to_integer: 1, |
||||
request: 1 |
||||
] |
||||
|
||||
import Explorer.Helper, only: [parse_integer: 1, decode_data: 2] |
||||
|
||||
import Indexer.Fetcher.Shibarium.Helper, |
||||
only: [calc_operation_hash: 5, prepare_insert_items: 2, recalculate_cached_count: 0] |
||||
|
||||
alias EthereumJSONRPC.Block.ByNumber |
||||
alias EthereumJSONRPC.Blocks |
||||
alias Explorer.Chain.Shibarium.Bridge |
||||
alias Explorer.{Chain, Repo} |
||||
alias Indexer.{BoundQueue, Helper} |
||||
|
||||
@block_check_interval_range_size 100 |
||||
@eth_get_logs_range_size 1000 |
||||
@fetcher_name :shibarium_bridge_l1 |
||||
@empty_hash "0x0000000000000000000000000000000000000000000000000000000000000000" |
||||
|
||||
# 32-byte signature of the event NewDepositBlock(address indexed owner, address indexed token, uint256 amountOrNFTId, uint256 depositBlockId) |
||||
@new_deposit_block_event "0x1dadc8d0683c6f9824e885935c1bec6f76816730dcec148dda8cf25a7b9f797b" |
||||
|
||||
# 32-byte signature of the event LockedEther(address indexed depositor, address indexed depositReceiver, uint256 amount) |
||||
@locked_ether_event "0x3e799b2d61372379e767ef8f04d65089179b7a6f63f9be3065806456c7309f1b" |
||||
|
||||
# 32-byte signature of the event LockedERC20(address indexed depositor, address indexed depositReceiver, address indexed rootToken, uint256 amount) |
||||
@locked_erc20_event "0x9b217a401a5ddf7c4d474074aff9958a18d48690d77cc2151c4706aa7348b401" |
||||
|
||||
# 32-byte signature of the event LockedERC721(address indexed depositor, address indexed depositReceiver, address indexed rootToken, uint256 tokenId) |
||||
@locked_erc721_event "0x8357472e13612a8c3d6f3e9d71fbba8a78ab77dbdcc7fcf3b7b645585f0bbbfc" |
||||
|
||||
# 32-byte signature of the event LockedERC721Batch(address indexed depositor, address indexed depositReceiver, address indexed rootToken, uint256[] tokenIds) |
||||
@locked_erc721_batch_event "0x5345c2beb0e49c805f42bb70c4ec5c3c3d9680ce45b8f4529c028d5f3e0f7a0d" |
||||
|
||||
# 32-byte signature of the event LockedBatchERC1155(address indexed depositor, address indexed depositReceiver, address indexed rootToken, uint256[] ids, uint256[] amounts) |
||||
@locked_batch_erc1155_event "0x5a921678b5779e4471b77219741a417a6ad6ec5d89fa5c8ce8cd7bd2d9f34186" |
||||
|
||||
# 32-byte signature of the event Withdraw(uint256 indexed exitId, address indexed user, address indexed token, uint256 amount) |
||||
@withdraw_event "0xfeb2000dca3e617cd6f3a8bbb63014bb54a124aac6ccbf73ee7229b4cd01f120" |
||||
|
||||
# 32-byte signature of the event ExitedEther(address indexed exitor, uint256 amount) |
||||
@exited_ether_event "0x0fc0eed41f72d3da77d0f53b9594fc7073acd15ee9d7c536819a70a67c57ef3c" |
||||
|
||||
# 32-byte signature of the event Transfer(address indexed from, address indexed to, uint256 value) |
||||
@transfer_event "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" |
||||
|
||||
# 32-byte signature of the event TransferSingle(address indexed operator, address indexed from, address indexed to, uint256 id, uint256 value) |
||||
@transfer_single_event "0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62" |
||||
|
||||
# 32-byte signature of the event TransferBatch(address indexed operator, address indexed from, address indexed to, uint256[] ids, uint256[] values) |
||||
@transfer_batch_event "0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb" |
||||
|
||||
def child_spec(start_link_arguments) do |
||||
spec = %{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, start_link_arguments}, |
||||
restart: :transient, |
||||
type: :worker |
||||
} |
||||
|
||||
Supervisor.child_spec(spec, []) |
||||
end |
||||
|
||||
def start_link(args, gen_server_options \\ []) do |
||||
GenServer.start_link(__MODULE__, args, Keyword.put_new(gen_server_options, :name, __MODULE__)) |
||||
end |
||||
|
||||
@impl GenServer |
||||
def init(_args) do |
||||
{:ok, %{}, {:continue, :ok}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_continue(_, state) do |
||||
Logger.metadata(fetcher: @fetcher_name) |
||||
# two seconds pause needed to avoid exceeding Supervisor restart intensity when DB issues |
||||
Process.send_after(self(), :wait_for_l2, 2000) |
||||
{:noreply, state} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_info(:wait_for_l2, state) do |
||||
if is_nil(Process.whereis(Indexer.Fetcher.Shibarium.L2)) do |
||||
Process.send(self(), :init_with_delay, []) |
||||
else |
||||
Process.send_after(self(), :wait_for_l2, 2000) |
||||
end |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_info(:init_with_delay, _state) do |
||||
env = Application.get_all_env(:indexer)[__MODULE__] |
||||
|
||||
with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])}, |
||||
rpc = env[:rpc], |
||||
{:rpc_undefined, false} <- {:rpc_undefined, is_nil(rpc)}, |
||||
{:deposit_manager_address_is_valid, true} <- |
||||
{:deposit_manager_address_is_valid, Helper.address_correct?(env[:deposit_manager_proxy])}, |
||||
{:ether_predicate_address_is_valid, true} <- |
||||
{:ether_predicate_address_is_valid, Helper.address_correct?(env[:ether_predicate_proxy])}, |
||||
{:erc20_predicate_address_is_valid, true} <- |
||||
{:erc20_predicate_address_is_valid, Helper.address_correct?(env[:erc20_predicate_proxy])}, |
||||
{:erc721_predicate_address_is_valid, true} <- |
||||
{:erc721_predicate_address_is_valid, |
||||
is_nil(env[:erc721_predicate_proxy]) or Helper.address_correct?(env[:erc721_predicate_proxy])}, |
||||
{:erc1155_predicate_address_is_valid, true} <- |
||||
{:erc1155_predicate_address_is_valid, |
||||
is_nil(env[:erc1155_predicate_proxy]) or Helper.address_correct?(env[:erc1155_predicate_proxy])}, |
||||
{:withdraw_manager_address_is_valid, true} <- |
||||
{:withdraw_manager_address_is_valid, Helper.address_correct?(env[:withdraw_manager_proxy])}, |
||||
start_block = parse_integer(env[:start_block]), |
||||
false <- is_nil(start_block), |
||||
true <- start_block > 0, |
||||
{last_l1_block_number, last_l1_transaction_hash} <- get_last_l1_item(), |
||||
{:start_block_valid, true} <- |
||||
{:start_block_valid, start_block <= last_l1_block_number || last_l1_block_number == 0}, |
||||
json_rpc_named_arguments = json_rpc_named_arguments(rpc), |
||||
{:ok, last_l1_tx} <- Helper.get_transaction_by_hash(last_l1_transaction_hash, json_rpc_named_arguments), |
||||
{:l1_tx_not_found, false} <- {:l1_tx_not_found, !is_nil(last_l1_transaction_hash) && is_nil(last_l1_tx)}, |
||||
{:ok, block_check_interval, latest_block} <- get_block_check_interval(json_rpc_named_arguments), |
||||
{:start_block_valid, true} <- {:start_block_valid, start_block <= latest_block} do |
||||
recalculate_cached_count() |
||||
|
||||
Process.send(self(), :reorg_monitor, []) |
||||
Process.send(self(), :continue, []) |
||||
|
||||
{:noreply, |
||||
%{ |
||||
deposit_manager_proxy: env[:deposit_manager_proxy], |
||||
ether_predicate_proxy: env[:ether_predicate_proxy], |
||||
erc20_predicate_proxy: env[:erc20_predicate_proxy], |
||||
erc721_predicate_proxy: env[:erc721_predicate_proxy], |
||||
erc1155_predicate_proxy: env[:erc1155_predicate_proxy], |
||||
withdraw_manager_proxy: env[:withdraw_manager_proxy], |
||||
block_check_interval: block_check_interval, |
||||
start_block: max(start_block, last_l1_block_number), |
||||
end_block: latest_block, |
||||
json_rpc_named_arguments: json_rpc_named_arguments, |
||||
reorg_monitor_prev_latest: 0 |
||||
}} |
||||
else |
||||
{:start_block_undefined, true} -> |
||||
# the process shouldn't start if the start block is not defined |
||||
{:stop, :normal, %{}} |
||||
|
||||
{:rpc_undefined, true} -> |
||||
Logger.error("L1 RPC URL is not defined.") |
||||
{:stop, :normal, %{}} |
||||
|
||||
{:deposit_manager_address_is_valid, false} -> |
||||
Logger.error("DepositManagerProxy contract address is invalid or not defined.") |
||||
{:stop, :normal, %{}} |
||||
|
||||
{:ether_predicate_address_is_valid, false} -> |
||||
Logger.error("EtherPredicateProxy contract address is invalid or not defined.") |
||||
{:stop, :normal, %{}} |
||||
|
||||
{:erc20_predicate_address_is_valid, false} -> |
||||
Logger.error("ERC20PredicateProxy contract address is invalid or not defined.") |
||||
{:stop, :normal, %{}} |
||||
|
||||
{:erc721_predicate_address_is_valid, false} -> |
||||
Logger.error("ERC721PredicateProxy contract address is invalid.") |
||||
{:stop, :normal, %{}} |
||||
|
||||
{:erc1155_predicate_address_is_valid, false} -> |
||||
Logger.error("ERC1155PredicateProxy contract address is invalid.") |
||||
{:stop, :normal, %{}} |
||||
|
||||
{:withdraw_manager_address_is_valid, false} -> |
||||
Logger.error("WithdrawManagerProxy contract address is invalid or not defined.") |
||||
{:stop, :normal, %{}} |
||||
|
||||
{:start_block_valid, false} -> |
||||
Logger.error("Invalid L1 Start Block value. Please, check the value and shibarium_bridge table.") |
||||
{:stop, :normal, %{}} |
||||
|
||||
{:error, error_data} -> |
||||
Logger.error( |
||||
"Cannot get last L1 transaction from RPC by its hash, latest block, or block timestamp by its number due to RPC error: #{inspect(error_data)}" |
||||
) |
||||
|
||||
{:stop, :normal, %{}} |
||||
|
||||
{:l1_tx_not_found, true} -> |
||||
Logger.error( |
||||
"Cannot find last L1 transaction from RPC by its hash. Probably, there was a reorg on L1 chain. Please, check shibarium_bridge table." |
||||
) |
||||
|
||||
{:stop, :normal, %{}} |
||||
|
||||
_ -> |
||||
Logger.error("L1 Start Block is invalid or zero.") |
||||
{:stop, :normal, %{}} |
||||
end |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_info( |
||||
:reorg_monitor, |
||||
%{ |
||||
block_check_interval: block_check_interval, |
||||
json_rpc_named_arguments: json_rpc_named_arguments, |
||||
reorg_monitor_prev_latest: prev_latest |
||||
} = state |
||||
) do |
||||
{:ok, latest} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000) |
||||
|
||||
if latest < prev_latest do |
||||
Logger.warning("Reorg detected: previous latest block ##{prev_latest}, current latest block ##{latest}.") |
||||
reorg_block_push(latest) |
||||
end |
||||
|
||||
Process.send_after(self(), :reorg_monitor, block_check_interval) |
||||
|
||||
{:noreply, %{state | reorg_monitor_prev_latest: latest}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_info( |
||||
:continue, |
||||
%{ |
||||
deposit_manager_proxy: deposit_manager_proxy, |
||||
ether_predicate_proxy: ether_predicate_proxy, |
||||
erc20_predicate_proxy: erc20_predicate_proxy, |
||||
erc721_predicate_proxy: erc721_predicate_proxy, |
||||
erc1155_predicate_proxy: erc1155_predicate_proxy, |
||||
withdraw_manager_proxy: withdraw_manager_proxy, |
||||
block_check_interval: block_check_interval, |
||||
start_block: start_block, |
||||
end_block: end_block, |
||||
json_rpc_named_arguments: json_rpc_named_arguments |
||||
} = state |
||||
) do |
||||
time_before = Timex.now() |
||||
|
||||
last_written_block = |
||||
start_block..end_block |
||||
|> Enum.chunk_every(@eth_get_logs_range_size) |
||||
|> Enum.reduce_while(start_block - 1, fn current_chunk, _ -> |
||||
chunk_start = List.first(current_chunk) |
||||
chunk_end = List.last(current_chunk) |
||||
|
||||
if chunk_start <= chunk_end do |
||||
Helper.log_blocks_chunk_handling(chunk_start, chunk_end, start_block, end_block, nil, "L1") |
||||
|
||||
operations = |
||||
{chunk_start, chunk_end} |
||||
|> get_logs_all( |
||||
deposit_manager_proxy, |
||||
ether_predicate_proxy, |
||||
erc20_predicate_proxy, |
||||
erc721_predicate_proxy, |
||||
erc1155_predicate_proxy, |
||||
withdraw_manager_proxy, |
||||
json_rpc_named_arguments |
||||
) |
||||
|> prepare_operations(json_rpc_named_arguments) |
||||
|
||||
{:ok, _} = |
||||
Chain.import(%{ |
||||
shibarium_bridge_operations: %{params: prepare_insert_items(operations, __MODULE__)}, |
||||
timeout: :infinity |
||||
}) |
||||
|
||||
Helper.log_blocks_chunk_handling( |
||||
chunk_start, |
||||
chunk_end, |
||||
start_block, |
||||
end_block, |
||||
"#{Enum.count(operations)} L1 operation(s)", |
||||
"L1" |
||||
) |
||||
end |
||||
|
||||
reorg_block = reorg_block_pop() |
||||
|
||||
if !is_nil(reorg_block) && reorg_block > 0 do |
||||
reorg_handle(reorg_block) |
||||
{:halt, if(reorg_block <= chunk_end, do: reorg_block - 1, else: chunk_end)} |
||||
else |
||||
{:cont, chunk_end} |
||||
end |
||||
end) |
||||
|
||||
new_start_block = last_written_block + 1 |
||||
{:ok, new_end_block} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments, 100_000_000) |
||||
|
||||
delay = |
||||
if new_end_block == last_written_block do |
||||
# there is no new block, so wait for some time to let the chain issue the new block |
||||
max(block_check_interval - Timex.diff(Timex.now(), time_before, :milliseconds), 0) |
||||
else |
||||
0 |
||||
end |
||||
|
||||
Process.send_after(self(), :continue, delay) |
||||
|
||||
{:noreply, %{state | start_block: new_start_block, end_block: new_end_block}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_info({ref, _result}, state) do |
||||
Process.demonitor(ref, [:flush]) |
||||
{:noreply, state} |
||||
end |
||||
|
||||
defp filter_deposit_events(events) do |
||||
Enum.filter(events, fn event -> |
||||
topic0 = Enum.at(event["topics"], 0) |
||||
deposit?(topic0) |
||||
end) |
||||
end |
||||
|
||||
defp get_block_check_interval(json_rpc_named_arguments) do |
||||
with {:ok, latest_block} <- Helper.get_block_number_by_tag("latest", json_rpc_named_arguments), |
||||
first_block = max(latest_block - @block_check_interval_range_size, 1), |
||||
{:ok, first_block_timestamp} <- Helper.get_block_timestamp_by_number(first_block, json_rpc_named_arguments), |
||||
{:ok, last_safe_block_timestamp} <- |
||||
Helper.get_block_timestamp_by_number(latest_block, json_rpc_named_arguments) do |
||||
block_check_interval = |
||||
ceil((last_safe_block_timestamp - first_block_timestamp) / (latest_block - first_block) * 1000 / 2) |
||||
|
||||
Logger.info("Block check interval is calculated as #{block_check_interval} ms.") |
||||
{:ok, block_check_interval, latest_block} |
||||
else |
||||
{:error, error} -> |
||||
{:error, "Failed to calculate block check interval due to #{inspect(error)}"} |
||||
end |
||||
end |
||||
|
||||
defp get_blocks_by_events(events, json_rpc_named_arguments, retries) do |
||||
request = |
||||
events |
||||
|> Enum.reduce(%{}, fn event, acc -> |
||||
Map.put(acc, event["blockNumber"], 0) |
||||
end) |
||||
|> Stream.map(fn {block_number, _} -> %{number: block_number} end) |
||||
|> Stream.with_index() |
||||
|> Enum.into(%{}, fn {params, id} -> {id, params} end) |
||||
|> Blocks.requests(&ByNumber.request(&1, false, false)) |
||||
|
||||
error_message = &"Cannot fetch blocks with batch request. Error: #{inspect(&1)}. Request: #{inspect(request)}" |
||||
|
||||
case Helper.repeated_call(&json_rpc/2, [request, json_rpc_named_arguments], error_message, retries) do |
||||
{:ok, results} -> Enum.map(results, fn %{result: result} -> result end) |
||||
{:error, _} -> [] |
||||
end |
||||
end |
||||
|
||||
defp get_last_l1_item do |
||||
query = |
||||
from(sb in Bridge, |
||||
select: {sb.l1_block_number, sb.l1_transaction_hash}, |
||||
where: not is_nil(sb.l1_block_number), |
||||
order_by: [desc: sb.l1_block_number], |
||||
limit: 1 |
||||
) |
||||
|
||||
query |
||||
|> Repo.one() |
||||
|> Kernel.||({0, nil}) |
||||
end |
||||
|
||||
defp get_logs(from_block, to_block, address, topics, json_rpc_named_arguments, retries \\ 100_000_000) do |
||||
processed_from_block = integer_to_quantity(from_block) |
||||
processed_to_block = integer_to_quantity(to_block) |
||||
|
||||
req = |
||||
request(%{ |
||||
id: 0, |
||||
method: "eth_getLogs", |
||||
params: [ |
||||
%{ |
||||
:fromBlock => processed_from_block, |
||||
:toBlock => processed_to_block, |
||||
:address => address, |
||||
:topics => topics |
||||
} |
||||
] |
||||
}) |
||||
|
||||
error_message = &"Cannot fetch logs for the block range #{from_block}..#{to_block}. Error: #{inspect(&1)}" |
||||
|
||||
Helper.repeated_call(&json_rpc/2, [req, json_rpc_named_arguments], error_message, retries) |
||||
end |
||||
|
||||
defp get_logs_all( |
||||
{chunk_start, chunk_end}, |
||||
deposit_manager_proxy, |
||||
ether_predicate_proxy, |
||||
erc20_predicate_proxy, |
||||
erc721_predicate_proxy, |
||||
erc1155_predicate_proxy, |
||||
withdraw_manager_proxy, |
||||
json_rpc_named_arguments |
||||
) do |
||||
{:ok, known_tokens_result} = |
||||
get_logs( |
||||
chunk_start, |
||||
chunk_end, |
||||
[deposit_manager_proxy, ether_predicate_proxy, erc20_predicate_proxy, withdraw_manager_proxy], |
||||
[ |
||||
[ |
||||
@new_deposit_block_event, |
||||
@locked_ether_event, |
||||
@locked_erc20_event, |
||||
@locked_erc721_event, |
||||
@locked_erc721_batch_event, |
||||
@locked_batch_erc1155_event, |
||||
@withdraw_event, |
||||
@exited_ether_event |
||||
] |
||||
], |
||||
json_rpc_named_arguments |
||||
) |
||||
|
||||
contract_addresses = |
||||
if is_nil(erc721_predicate_proxy) do |
||||
[pad_address_hash(erc20_predicate_proxy)] |
||||
else |
||||
[pad_address_hash(erc20_predicate_proxy), pad_address_hash(erc721_predicate_proxy)] |
||||
end |
||||
|
||||
{:ok, unknown_erc20_erc721_tokens_result} = |
||||
get_logs( |
||||
chunk_start, |
||||
chunk_end, |
||||
nil, |
||||
[ |
||||
@transfer_event, |
||||
contract_addresses |
||||
], |
||||
json_rpc_named_arguments |
||||
) |
||||
|
||||
{:ok, unknown_erc1155_tokens_result} = |
||||
if is_nil(erc1155_predicate_proxy) do |
||||
{:ok, []} |
||||
else |
||||
get_logs( |
||||
chunk_start, |
||||
chunk_end, |
||||
nil, |
||||
[ |
||||
[@transfer_single_event, @transfer_batch_event], |
||||
nil, |
||||
pad_address_hash(erc1155_predicate_proxy) |
||||
], |
||||
json_rpc_named_arguments |
||||
) |
||||
end |
||||
|
||||
known_tokens_result ++ unknown_erc20_erc721_tokens_result ++ unknown_erc1155_tokens_result |
||||
end |
||||
|
||||
defp get_op_user(topic0, event) do |
||||
cond do |
||||
Enum.member?([@new_deposit_block_event, @exited_ether_event], topic0) -> |
||||
truncate_address_hash(Enum.at(event["topics"], 1)) |
||||
|
||||
Enum.member?( |
||||
[ |
||||
@locked_ether_event, |
||||
@locked_erc20_event, |
||||
@locked_erc721_event, |
||||
@locked_erc721_batch_event, |
||||
@locked_batch_erc1155_event, |
||||
@withdraw_event, |
||||
@transfer_event |
||||
], |
||||
topic0 |
||||
) -> |
||||
truncate_address_hash(Enum.at(event["topics"], 2)) |
||||
|
||||
Enum.member?([@transfer_single_event, @transfer_batch_event], topic0) -> |
||||
truncate_address_hash(Enum.at(event["topics"], 3)) |
||||
end |
||||
end |
||||
|
||||
defp get_op_amounts(topic0, event) do |
||||
cond do |
||||
topic0 == @new_deposit_block_event -> |
||||
[amount_or_nft_id, deposit_block_id] = decode_data(event["data"], [{:uint, 256}, {:uint, 256}]) |
||||
{[amount_or_nft_id], deposit_block_id} |
||||
|
||||
topic0 == @transfer_event -> |
||||
indexed_token_id = Enum.at(event["topics"], 3) |
||||
|
||||
if is_nil(indexed_token_id) do |
||||
{decode_data(event["data"], [{:uint, 256}]), 0} |
||||
else |
||||
{[quantity_to_integer(indexed_token_id)], 0} |
||||
end |
||||
|
||||
Enum.member?( |
||||
[ |
||||
@locked_ether_event, |
||||
@locked_erc20_event, |
||||
@locked_erc721_event, |
||||
@withdraw_event, |
||||
@exited_ether_event |
||||
], |
||||
topic0 |
||||
) -> |
||||
{decode_data(event["data"], [{:uint, 256}]), 0} |
||||
|
||||
topic0 == @locked_erc721_batch_event -> |
||||
[ids] = decode_data(event["data"], [{:array, {:uint, 256}}]) |
||||
{ids, 0} |
||||
|
||||
true -> |
||||
{[nil], 0} |
||||
end |
||||
end |
||||
|
||||
defp get_op_erc1155_data(topic0, event) do |
||||
cond do |
||||
Enum.member?([@locked_batch_erc1155_event, @transfer_batch_event], topic0) -> |
||||
[ids, amounts] = decode_data(event["data"], [{:array, {:uint, 256}}, {:array, {:uint, 256}}]) |
||||
{ids, amounts} |
||||
|
||||
Enum.member?([@transfer_single_event], topic0) -> |
||||
[id, amount] = decode_data(event["data"], [{:uint, 256}, {:uint, 256}]) |
||||
{[id], [amount]} |
||||
|
||||
true -> |
||||
{[], []} |
||||
end |
||||
end |
||||
|
||||
defp deposit?(topic0) do |
||||
Enum.member?( |
||||
[ |
||||
@new_deposit_block_event, |
||||
@locked_ether_event, |
||||
@locked_erc20_event, |
||||
@locked_erc721_event, |
||||
@locked_erc721_batch_event, |
||||
@locked_batch_erc1155_event |
||||
], |
||||
topic0 |
||||
) |
||||
end |
||||
|
||||
defp json_rpc_named_arguments(rpc_url) do |
||||
[ |
||||
transport: EthereumJSONRPC.HTTP, |
||||
transport_options: [ |
||||
http: EthereumJSONRPC.HTTP.HTTPoison, |
||||
url: rpc_url, |
||||
http_options: [ |
||||
recv_timeout: :timer.minutes(10), |
||||
timeout: :timer.minutes(10), |
||||
hackney: [pool: :ethereum_jsonrpc] |
||||
] |
||||
] |
||||
] |
||||
end |
||||
|
||||
defp prepare_operations(events, json_rpc_named_arguments) do |
||||
timestamps = |
||||
events |
||||
|> filter_deposit_events() |
||||
|> get_blocks_by_events(json_rpc_named_arguments, 100_000_000) |
||||
|> Enum.reduce(%{}, fn block, acc -> |
||||
block_number = quantity_to_integer(Map.get(block, "number")) |
||||
{:ok, timestamp} = DateTime.from_unix(quantity_to_integer(Map.get(block, "timestamp"))) |
||||
Map.put(acc, block_number, timestamp) |
||||
end) |
||||
|
||||
events |
||||
|> Enum.map(fn event -> |
||||
topic0 = Enum.at(event["topics"], 0) |
||||
|
||||
user = get_op_user(topic0, event) |
||||
{amounts_or_ids, operation_id} = get_op_amounts(topic0, event) |
||||
{erc1155_ids, erc1155_amounts} = get_op_erc1155_data(topic0, event) |
||||
|
||||
l1_block_number = quantity_to_integer(event["blockNumber"]) |
||||
|
||||
{operation_type, timestamp} = |
||||
if deposit?(topic0) do |
||||
{:deposit, Map.get(timestamps, l1_block_number)} |
||||
else |
||||
{:withdrawal, nil} |
||||
end |
||||
|
||||
token_type = |
||||
cond do |
||||
Enum.member?([@new_deposit_block_event, @withdraw_event], topic0) -> |
||||
"bone" |
||||
|
||||
Enum.member?([@locked_ether_event, @exited_ether_event], topic0) -> |
||||
"eth" |
||||
|
||||
true -> |
||||
"other" |
||||
end |
||||
|
||||
Enum.map(amounts_or_ids, fn amount_or_id -> |
||||
%{ |
||||
user: user, |
||||
amount_or_id: amount_or_id, |
||||
erc1155_ids: if(Enum.empty?(erc1155_ids), do: nil, else: erc1155_ids), |
||||
erc1155_amounts: if(Enum.empty?(erc1155_amounts), do: nil, else: erc1155_amounts), |
||||
l1_transaction_hash: event["transactionHash"], |
||||
l1_block_number: l1_block_number, |
||||
l2_transaction_hash: @empty_hash, |
||||
operation_hash: calc_operation_hash(user, amount_or_id, erc1155_ids, erc1155_amounts, operation_id), |
||||
operation_type: operation_type, |
||||
token_type: token_type, |
||||
timestamp: timestamp |
||||
} |
||||
end) |
||||
end) |
||||
|> List.flatten() |
||||
end |
||||
|
||||
defp pad_address_hash(address) do |
||||
"0x" <> |
||||
(address |
||||
|> String.trim_leading("0x") |
||||
|> String.pad_leading(64, "0")) |
||||
end |
||||
|
||||
defp truncate_address_hash("0x000000000000000000000000" <> truncated_hash) do |
||||
"0x#{truncated_hash}" |
||||
end |
||||
|
||||
defp reorg_block_pop do |
||||
table_name = reorg_table_name(@fetcher_name) |
||||
|
||||
case BoundQueue.pop_front(reorg_queue_get(table_name)) do |
||||
{:ok, {block_number, updated_queue}} -> |
||||
:ets.insert(table_name, {:queue, updated_queue}) |
||||
block_number |
||||
|
||||
{:error, :empty} -> |
||||
nil |
||||
end |
||||
end |
||||
|
||||
defp reorg_block_push(block_number) do |
||||
table_name = reorg_table_name(@fetcher_name) |
||||
{:ok, updated_queue} = BoundQueue.push_back(reorg_queue_get(table_name), block_number) |
||||
:ets.insert(table_name, {:queue, updated_queue}) |
||||
end |
||||
|
||||
defp reorg_handle(reorg_block) do |
||||
{deleted_count, _} = |
||||
Repo.delete_all(from(sb in Bridge, where: sb.l1_block_number >= ^reorg_block and is_nil(sb.l2_transaction_hash))) |
||||
|
||||
{updated_count1, _} = |
||||
Repo.update_all( |
||||
from(sb in Bridge, |
||||
where: |
||||
sb.l1_block_number >= ^reorg_block and not is_nil(sb.l2_transaction_hash) and |
||||
sb.operation_type == :deposit |
||||
), |
||||
set: [timestamp: nil] |
||||
) |
||||
|
||||
{updated_count2, _} = |
||||
Repo.update_all( |
||||
from(sb in Bridge, where: sb.l1_block_number >= ^reorg_block and not is_nil(sb.l2_transaction_hash)), |
||||
set: [l1_transaction_hash: nil, l1_block_number: nil] |
||||
) |
||||
|
||||
updated_count = max(updated_count1, updated_count2) |
||||
|
||||
if deleted_count > 0 or updated_count > 0 do |
||||
recalculate_cached_count() |
||||
|
||||
Logger.warning( |
||||
"As L1 reorg was detected, some rows with l1_block_number >= #{reorg_block} were affected (removed or updated) in the shibarium_bridge table. Number of removed rows: #{deleted_count}. Number of updated rows: >= #{updated_count}." |
||||
) |
||||
end |
||||
end |
||||
|
||||
defp reorg_queue_get(table_name) do |
||||
if :ets.whereis(table_name) == :undefined do |
||||
:ets.new(table_name, [ |
||||
:set, |
||||
:named_table, |
||||
:public, |
||||
read_concurrency: true, |
||||
write_concurrency: true |
||||
]) |
||||
end |
||||
|
||||
with info when info != :undefined <- :ets.info(table_name), |
||||
[{_, value}] <- :ets.lookup(table_name, :queue) do |
||||
value |
||||
else |
||||
_ -> %BoundQueue{} |
||||
end |
||||
end |
||||
|
||||
defp reorg_table_name(fetcher_name) do |
||||
:"#{fetcher_name}#{:_reorgs}" |
||||
end |
||||
end |
@ -0,0 +1,536 @@ |
||||
defmodule Indexer.Fetcher.Shibarium.L2 do |
||||
@moduledoc """ |
||||
Fills shibarium_bridge DB table. |
||||
""" |
||||
|
||||
use GenServer |
||||
use Indexer.Fetcher |
||||
|
||||
require Logger |
||||
|
||||
import Ecto.Query |
||||
|
||||
import EthereumJSONRPC, |
||||
only: [ |
||||
json_rpc: 2, |
||||
quantity_to_integer: 1, |
||||
request: 1 |
||||
] |
||||
|
||||
import Explorer.Chain.SmartContract, only: [burn_address_hash_string: 0] |
||||
|
||||
import Explorer.Helper, only: [decode_data: 2, parse_integer: 1] |
||||
|
||||
import Indexer.Fetcher.Shibarium.Helper, |
||||
only: [calc_operation_hash: 5, prepare_insert_items: 2, recalculate_cached_count: 0] |
||||
|
||||
alias EthereumJSONRPC.Block.ByNumber |
||||
alias EthereumJSONRPC.{Blocks, Logs, Receipt} |
||||
alias Explorer.{Chain, Repo} |
||||
alias Explorer.Chain.Shibarium.Bridge |
||||
alias Indexer.Helper |
||||
|
||||
@eth_get_logs_range_size 100 |
||||
@fetcher_name :shibarium_bridge_l2 |
||||
@empty_hash "0x0000000000000000000000000000000000000000000000000000000000000000" |
||||
|
||||
# 32-byte signature of the event TokenDeposited(address indexed rootToken, address indexed childToken, address indexed user, uint256 amount, uint256 depositCount) |
||||
@token_deposited_event "0xec3afb067bce33c5a294470ec5b29e6759301cd3928550490c6d48816cdc2f5d" |
||||
|
||||
# 32-byte signature of the event Transfer(address indexed from, address indexed to, uint256 value) |
||||
@transfer_event "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" |
||||
|
||||
# 32-byte signature of the event TransferSingle(address indexed operator, address indexed from, address indexed to, uint256 id, uint256 value) |
||||
@transfer_single_event "0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62" |
||||
|
||||
# 32-byte signature of the event TransferBatch(address indexed operator, address indexed from, address indexed to, uint256[] ids, uint256[] values) |
||||
@transfer_batch_event "0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb" |
||||
|
||||
# 32-byte signature of the event Withdraw(address indexed rootToken, address indexed from, uint256 amount, uint256, uint256) |
||||
@withdraw_event "0xebff2602b3f468259e1e99f613fed6691f3a6526effe6ef3e768ba7ae7a36c4f" |
||||
|
||||
# 4-byte signature of the method withdraw(uint256 amount) |
||||
@withdraw_method "0x2e1a7d4d" |
||||
|
||||
def child_spec(start_link_arguments) do |
||||
spec = %{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, start_link_arguments}, |
||||
restart: :transient, |
||||
type: :worker |
||||
} |
||||
|
||||
Supervisor.child_spec(spec, []) |
||||
end |
||||
|
||||
def start_link(args, gen_server_options \\ []) do |
||||
GenServer.start_link(__MODULE__, args, Keyword.put_new(gen_server_options, :name, __MODULE__)) |
||||
end |
||||
|
||||
@impl GenServer |
||||
def init(args) do |
||||
json_rpc_named_arguments = args[:json_rpc_named_arguments] |
||||
{:ok, %{}, {:continue, json_rpc_named_arguments}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_continue(json_rpc_named_arguments, _state) do |
||||
Logger.metadata(fetcher: @fetcher_name) |
||||
# two seconds pause needed to avoid exceeding Supervisor restart intensity when DB issues |
||||
Process.send_after(self(), :init_with_delay, 2000) |
||||
{:noreply, %{json_rpc_named_arguments: json_rpc_named_arguments}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_info(:init_with_delay, %{json_rpc_named_arguments: json_rpc_named_arguments} = state) do |
||||
env = Application.get_all_env(:indexer)[__MODULE__] |
||||
|
||||
with {:start_block_undefined, false} <- {:start_block_undefined, is_nil(env[:start_block])}, |
||||
{:child_chain_address_is_valid, true} <- |
||||
{:child_chain_address_is_valid, Helper.address_correct?(env[:child_chain])}, |
||||
{:weth_address_is_valid, true} <- {:weth_address_is_valid, Helper.address_correct?(env[:weth])}, |
||||
{:bone_withdraw_address_is_valid, true} <- |
||||
{:bone_withdraw_address_is_valid, Helper.address_correct?(env[:bone_withdraw])}, |
||||
start_block = parse_integer(env[:start_block]), |
||||
false <- is_nil(start_block), |
||||
true <- start_block > 0, |
||||
{last_l2_block_number, last_l2_transaction_hash} <- get_last_l2_item(), |
||||
{:ok, latest_block} = Helper.get_block_number_by_tag("latest", json_rpc_named_arguments), |
||||
{:start_block_valid, true} <- |
||||
{:start_block_valid, |
||||
(start_block <= last_l2_block_number || last_l2_block_number == 0) && start_block <= latest_block}, |
||||
{:ok, last_l2_tx} <- Helper.get_transaction_by_hash(last_l2_transaction_hash, json_rpc_named_arguments), |
||||
{:l2_tx_not_found, false} <- {:l2_tx_not_found, !is_nil(last_l2_transaction_hash) && is_nil(last_l2_tx)} do |
||||
recalculate_cached_count() |
||||
|
||||
Process.send(self(), :continue, []) |
||||
|
||||
{:noreply, |
||||
%{ |
||||
start_block: max(start_block, last_l2_block_number), |
||||
latest_block: latest_block, |
||||
child_chain: String.downcase(env[:child_chain]), |
||||
weth: String.downcase(env[:weth]), |
||||
bone_withdraw: String.downcase(env[:bone_withdraw]), |
||||
json_rpc_named_arguments: json_rpc_named_arguments |
||||
}} |
||||
else |
||||
{:start_block_undefined, true} -> |
||||
# the process shouldn't start if the start block is not defined |
||||
{:stop, :normal, state} |
||||
|
||||
{:child_chain_address_is_valid, false} -> |
||||
Logger.error("ChildChain contract address is invalid or not defined.") |
||||
{:stop, :normal, state} |
||||
|
||||
{:weth_address_is_valid, false} -> |
||||
Logger.error("WETH contract address is invalid or not defined.") |
||||
{:stop, :normal, state} |
||||
|
||||
{:bone_withdraw_address_is_valid, false} -> |
||||
Logger.error("Bone Withdraw contract address is invalid or not defined.") |
||||
{:stop, :normal, state} |
||||
|
||||
{:start_block_valid, false} -> |
||||
Logger.error("Invalid L2 Start Block value. Please, check the value and shibarium_bridge table.") |
||||
{:stop, :normal, state} |
||||
|
||||
{:error, error_data} -> |
||||
Logger.error( |
||||
"Cannot get last L2 transaction by its hash or latest block from RPC due to RPC error: #{inspect(error_data)}" |
||||
) |
||||
|
||||
{:stop, :normal, state} |
||||
|
||||
{:l2_tx_not_found, true} -> |
||||
Logger.error( |
||||
"Cannot find last L2 transaction from RPC by its hash. Probably, there was a reorg on L2 chain. Please, check shibarium_bridge table." |
||||
) |
||||
|
||||
{:stop, :normal, state} |
||||
|
||||
_ -> |
||||
Logger.error("L2 Start Block is invalid or zero.") |
||||
{:stop, :normal, state} |
||||
end |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_info( |
||||
:continue, |
||||
%{ |
||||
start_block: start_block, |
||||
latest_block: end_block, |
||||
child_chain: child_chain, |
||||
weth: weth, |
||||
bone_withdraw: bone_withdraw, |
||||
json_rpc_named_arguments: json_rpc_named_arguments |
||||
} = state |
||||
) do |
||||
start_block..end_block |
||||
|> Enum.chunk_every(@eth_get_logs_range_size) |
||||
|> Enum.each(fn current_chunk -> |
||||
chunk_start = List.first(current_chunk) |
||||
chunk_end = List.last(current_chunk) |
||||
|
||||
Helper.log_blocks_chunk_handling(chunk_start, chunk_end, start_block, end_block, nil, "L2") |
||||
|
||||
operations = |
||||
chunk_start..chunk_end |
||||
|> get_logs_all(child_chain, bone_withdraw, json_rpc_named_arguments) |
||||
|> prepare_operations(weth) |
||||
|
||||
{:ok, _} = |
||||
Chain.import(%{ |
||||
shibarium_bridge_operations: %{params: prepare_insert_items(operations, __MODULE__)}, |
||||
timeout: :infinity |
||||
}) |
||||
|
||||
Helper.log_blocks_chunk_handling( |
||||
chunk_start, |
||||
chunk_end, |
||||
start_block, |
||||
end_block, |
||||
"#{Enum.count(operations)} L2 operation(s)", |
||||
"L2" |
||||
) |
||||
end) |
||||
|
||||
{:stop, :normal, state} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_info({ref, _result}, state) do |
||||
Process.demonitor(ref, [:flush]) |
||||
{:noreply, state} |
||||
end |
||||
|
||||
def filter_deposit_events(events, child_chain) do |
||||
Enum.filter(events, fn event -> |
||||
address = String.downcase(event.address_hash) |
||||
first_topic = Helper.log_topic_to_string(event.first_topic) |
||||
second_topic = Helper.log_topic_to_string(event.second_topic) |
||||
third_topic = Helper.log_topic_to_string(event.third_topic) |
||||
fourth_topic = Helper.log_topic_to_string(event.fourth_topic) |
||||
|
||||
(first_topic == @token_deposited_event and address == child_chain) or |
||||
(first_topic == @transfer_event and second_topic == @empty_hash and third_topic != @empty_hash) or |
||||
(Enum.member?([@transfer_single_event, @transfer_batch_event], first_topic) and |
||||
third_topic == @empty_hash and fourth_topic != @empty_hash) |
||||
end) |
||||
end |
||||
|
||||
def filter_withdrawal_events(events, bone_withdraw) do |
||||
Enum.filter(events, fn event -> |
||||
address = String.downcase(event.address_hash) |
||||
first_topic = Helper.log_topic_to_string(event.first_topic) |
||||
second_topic = Helper.log_topic_to_string(event.second_topic) |
||||
third_topic = Helper.log_topic_to_string(event.third_topic) |
||||
fourth_topic = Helper.log_topic_to_string(event.fourth_topic) |
||||
|
||||
(first_topic == @withdraw_event and address == bone_withdraw) or |
||||
(first_topic == @transfer_event and second_topic != @empty_hash and third_topic == @empty_hash) or |
||||
(Enum.member?([@transfer_single_event, @transfer_batch_event], first_topic) and |
||||
third_topic != @empty_hash and fourth_topic == @empty_hash) |
||||
end) |
||||
end |
||||
|
||||
def prepare_operations({events, timestamps}, weth) do |
||||
events |
||||
|> Enum.map(&prepare_operation(&1, timestamps, weth)) |
||||
|> List.flatten() |
||||
end |
||||
|
||||
def reorg_handle(reorg_block) do |
||||
{deleted_count, _} = |
||||
Repo.delete_all(from(sb in Bridge, where: sb.l2_block_number >= ^reorg_block and is_nil(sb.l1_transaction_hash))) |
||||
|
||||
{updated_count1, _} = |
||||
Repo.update_all( |
||||
from(sb in Bridge, |
||||
where: |
||||
sb.l2_block_number >= ^reorg_block and not is_nil(sb.l1_transaction_hash) and |
||||
sb.operation_type == :withdrawal |
||||
), |
||||
set: [timestamp: nil] |
||||
) |
||||
|
||||
{updated_count2, _} = |
||||
Repo.update_all( |
||||
from(sb in Bridge, where: sb.l2_block_number >= ^reorg_block and not is_nil(sb.l1_transaction_hash)), |
||||
set: [l2_transaction_hash: nil, l2_block_number: nil] |
||||
) |
||||
|
||||
updated_count = max(updated_count1, updated_count2) |
||||
|
||||
if deleted_count > 0 or updated_count > 0 do |
||||
recalculate_cached_count() |
||||
|
||||
Logger.warning( |
||||
"As L2 reorg was detected, some rows with l2_block_number >= #{reorg_block} were affected (removed or updated) in the shibarium_bridge table. Number of removed rows: #{deleted_count}. Number of updated rows: >= #{updated_count}." |
||||
) |
||||
end |
||||
end |
||||
|
||||
def withdraw_method_signature do |
||||
@withdraw_method |
||||
end |
||||
|
||||
defp get_blocks_by_range(range, json_rpc_named_arguments, retries) do |
||||
request = |
||||
range |
||||
|> Stream.map(fn block_number -> %{number: block_number} end) |
||||
|> Stream.with_index() |
||||
|> Enum.into(%{}, fn {params, id} -> {id, params} end) |
||||
|> Blocks.requests(&ByNumber.request(&1)) |
||||
|
||||
error_message = &"Cannot fetch blocks with batch request. Error: #{inspect(&1)}. Request: #{inspect(request)}" |
||||
|
||||
case Helper.repeated_call(&json_rpc/2, [request, json_rpc_named_arguments], error_message, retries) do |
||||
{:ok, results} -> Enum.map(results, fn %{result: result} -> result end) |
||||
{:error, _} -> [] |
||||
end |
||||
end |
||||
|
||||
defp get_last_l2_item do |
||||
query = |
||||
from(sb in Bridge, |
||||
select: {sb.l2_block_number, sb.l2_transaction_hash}, |
||||
where: not is_nil(sb.l2_block_number), |
||||
order_by: [desc: sb.l2_block_number], |
||||
limit: 1 |
||||
) |
||||
|
||||
query |
||||
|> Repo.one() |
||||
|> Kernel.||({0, nil}) |
||||
end |
||||
|
||||
defp get_logs_all(block_range, child_chain, bone_withdraw, json_rpc_named_arguments) do |
||||
blocks = get_blocks_by_range(block_range, json_rpc_named_arguments, 100_000_000) |
||||
|
||||
deposit_logs = get_deposit_logs_from_receipts(blocks, child_chain, json_rpc_named_arguments) |
||||
|
||||
withdrawal_logs = get_withdrawal_logs_from_receipts(blocks, bone_withdraw, json_rpc_named_arguments) |
||||
|
||||
timestamps = |
||||
blocks |
||||
|> Enum.reduce(%{}, fn block, acc -> |
||||
block_number = |
||||
block |
||||
|> Map.get("number") |
||||
|> quantity_to_integer() |
||||
|
||||
{:ok, timestamp} = |
||||
block |
||||
|> Map.get("timestamp") |
||||
|> quantity_to_integer() |
||||
|> DateTime.from_unix() |
||||
|
||||
Map.put(acc, block_number, timestamp) |
||||
end) |
||||
|
||||
{deposit_logs ++ withdrawal_logs, timestamps} |
||||
end |
||||
|
||||
defp get_deposit_logs_from_receipts(blocks, child_chain, json_rpc_named_arguments) do |
||||
blocks |
||||
|> Enum.reduce([], fn block, acc -> |
||||
hashes = |
||||
block |
||||
|> Map.get("transactions", []) |
||||
|> Enum.filter(fn t -> Map.get(t, "from") == burn_address_hash_string() end) |
||||
|> Enum.map(fn t -> Map.get(t, "hash") end) |
||||
|
||||
acc ++ hashes |
||||
end) |
||||
|> Enum.chunk_every(@eth_get_logs_range_size) |
||||
|> Enum.reduce([], fn hashes, acc -> |
||||
acc ++ get_receipt_logs(hashes, json_rpc_named_arguments, 100_000_000) |
||||
end) |
||||
|> filter_deposit_events(child_chain) |
||||
end |
||||
|
||||
defp get_withdrawal_logs_from_receipts(blocks, bone_withdraw, json_rpc_named_arguments) do |
||||
blocks |
||||
|> Enum.reduce([], fn block, acc -> |
||||
hashes = |
||||
block |
||||
|> Map.get("transactions", []) |
||||
|> Enum.filter(fn t -> |
||||
# filter by `withdraw(uint256 amount)` signature |
||||
String.downcase(String.slice(Map.get(t, "input", ""), 0..9)) == @withdraw_method |
||||
end) |
||||
|> Enum.map(fn t -> Map.get(t, "hash") end) |
||||
|
||||
acc ++ hashes |
||||
end) |
||||
|> Enum.chunk_every(@eth_get_logs_range_size) |
||||
|> Enum.reduce([], fn hashes, acc -> |
||||
acc ++ get_receipt_logs(hashes, json_rpc_named_arguments, 100_000_000) |
||||
end) |
||||
|> filter_withdrawal_events(bone_withdraw) |
||||
end |
||||
|
||||
defp get_op_amounts(event) do |
||||
cond do |
||||
event.first_topic == @token_deposited_event -> |
||||
[amount, deposit_count] = decode_data(event.data, [{:uint, 256}, {:uint, 256}]) |
||||
{[amount], deposit_count} |
||||
|
||||
event.first_topic == @transfer_event -> |
||||
indexed_token_id = event.fourth_topic |
||||
|
||||
if is_nil(indexed_token_id) do |
||||
{decode_data(event.data, [{:uint, 256}]), 0} |
||||
else |
||||
{[quantity_to_integer(indexed_token_id)], 0} |
||||
end |
||||
|
||||
event.first_topic == @withdraw_event -> |
||||
[amount, _arg3, _arg4] = decode_data(event.data, [{:uint, 256}, {:uint, 256}, {:uint, 256}]) |
||||
{[amount], 0} |
||||
|
||||
true -> |
||||
{[nil], 0} |
||||
end |
||||
end |
||||
|
||||
defp get_op_erc1155_data(event) do |
||||
cond do |
||||
event.first_topic == @transfer_single_event -> |
||||
[id, amount] = decode_data(event.data, [{:uint, 256}, {:uint, 256}]) |
||||
{[id], [amount]} |
||||
|
||||
event.first_topic == @transfer_batch_event -> |
||||
[ids, amounts] = decode_data(event.data, [{:array, {:uint, 256}}, {:array, {:uint, 256}}]) |
||||
{ids, amounts} |
||||
|
||||
true -> |
||||
{[], []} |
||||
end |
||||
end |
||||
|
||||
# credo:disable-for-next-line /Complexity/ |
||||
defp get_op_user(event) do |
||||
cond do |
||||
event.first_topic == @transfer_event and event.third_topic == @empty_hash -> |
||||
truncate_address_hash(event.second_topic) |
||||
|
||||
event.first_topic == @transfer_event and event.second_topic == @empty_hash -> |
||||
truncate_address_hash(event.third_topic) |
||||
|
||||
event.first_topic == @withdraw_event -> |
||||
truncate_address_hash(event.third_topic) |
||||
|
||||
Enum.member?([@transfer_single_event, @transfer_batch_event], event.first_topic) and |
||||
event.fourth_topic == @empty_hash -> |
||||
truncate_address_hash(event.third_topic) |
||||
|
||||
Enum.member?([@transfer_single_event, @transfer_batch_event], event.first_topic) and |
||||
event.third_topic == @empty_hash -> |
||||
truncate_address_hash(event.fourth_topic) |
||||
|
||||
event.first_topic == @token_deposited_event -> |
||||
truncate_address_hash(event.fourth_topic) |
||||
end |
||||
end |
||||
|
||||
defp get_receipt_logs(tx_hashes, json_rpc_named_arguments, retries) do |
||||
reqs = |
||||
tx_hashes |
||||
|> Enum.with_index() |
||||
|> Enum.map(fn {hash, id} -> |
||||
request(%{ |
||||
id: id, |
||||
method: "eth_getTransactionReceipt", |
||||
params: [hash] |
||||
}) |
||||
end) |
||||
|
||||
error_message = &"eth_getTransactionReceipt failed. Error: #{inspect(&1)}" |
||||
|
||||
{:ok, receipts} = Helper.repeated_call(&json_rpc/2, [reqs, json_rpc_named_arguments], error_message, retries) |
||||
|
||||
receipts |
||||
|> Enum.map(&Receipt.elixir_to_logs(&1.result)) |
||||
|> List.flatten() |
||||
|> Logs.elixir_to_params() |
||||
end |
||||
|
||||
defp withdrawal?(event) do |
||||
cond do |
||||
event.first_topic == @withdraw_event -> |
||||
true |
||||
|
||||
event.first_topic == @transfer_event and event.third_topic == @empty_hash -> |
||||
true |
||||
|
||||
Enum.member?([@transfer_single_event, @transfer_batch_event], event.first_topic) and |
||||
event.fourth_topic == @empty_hash -> |
||||
true |
||||
|
||||
true -> |
||||
false |
||||
end |
||||
end |
||||
|
||||
defp prepare_operation(event, timestamps, weth) do |
||||
event = |
||||
event |
||||
|> Map.put(:first_topic, Helper.log_topic_to_string(event.first_topic)) |
||||
|> Map.put(:second_topic, Helper.log_topic_to_string(event.second_topic)) |
||||
|> Map.put(:third_topic, Helper.log_topic_to_string(event.third_topic)) |
||||
|> Map.put(:fourth_topic, Helper.log_topic_to_string(event.fourth_topic)) |
||||
|
||||
user = get_op_user(event) |
||||
|
||||
if user == burn_address_hash_string() do |
||||
[] |
||||
else |
||||
{amounts_or_ids, operation_id} = get_op_amounts(event) |
||||
{erc1155_ids, erc1155_amounts} = get_op_erc1155_data(event) |
||||
|
||||
l2_block_number = quantity_to_integer(event.block_number) |
||||
|
||||
{operation_type, timestamp} = |
||||
if withdrawal?(event) do |
||||
{:withdrawal, Map.get(timestamps, l2_block_number)} |
||||
else |
||||
{:deposit, nil} |
||||
end |
||||
|
||||
token_type = |
||||
cond do |
||||
Enum.member?([@token_deposited_event, @withdraw_event], event.first_topic) -> |
||||
"bone" |
||||
|
||||
event.first_topic == @transfer_event and String.downcase(event.address_hash) == weth -> |
||||
"eth" |
||||
|
||||
true -> |
||||
"other" |
||||
end |
||||
|
||||
Enum.map(amounts_or_ids, fn amount_or_id -> |
||||
%{ |
||||
user: user, |
||||
amount_or_id: amount_or_id, |
||||
erc1155_ids: if(Enum.empty?(erc1155_ids), do: nil, else: erc1155_ids), |
||||
erc1155_amounts: if(Enum.empty?(erc1155_amounts), do: nil, else: erc1155_amounts), |
||||
l2_transaction_hash: event.transaction_hash, |
||||
l2_block_number: l2_block_number, |
||||
l1_transaction_hash: @empty_hash, |
||||
operation_hash: calc_operation_hash(user, amount_or_id, erc1155_ids, erc1155_amounts, operation_id), |
||||
operation_type: operation_type, |
||||
token_type: token_type, |
||||
timestamp: timestamp |
||||
} |
||||
end) |
||||
end |
||||
end |
||||
|
||||
defp truncate_address_hash("0x000000000000000000000000" <> truncated_hash) do |
||||
"0x#{truncated_hash}" |
||||
end |
||||
end |
@ -0,0 +1,99 @@ |
||||
defmodule Indexer.Transform.Shibarium.Bridge do |
||||
@moduledoc """ |
||||
Helper functions for transforming data for Shibarium Bridge operations. |
||||
""" |
||||
|
||||
require Logger |
||||
|
||||
import Explorer.Chain.SmartContract, only: [burn_address_hash_string: 0] |
||||
|
||||
import Indexer.Fetcher.Shibarium.Helper, only: [prepare_insert_items: 2] |
||||
|
||||
import Indexer.Fetcher.Shibarium.L2, only: [withdraw_method_signature: 0] |
||||
|
||||
alias Indexer.Fetcher.Shibarium.L2 |
||||
alias Indexer.Helper |
||||
|
||||
@doc """ |
||||
Returns a list of operations given a list of blocks and their transactions. |
||||
""" |
||||
@spec parse(list(), list(), list()) :: list() |
||||
def parse(blocks, transactions_with_receipts, logs) do |
||||
prev_metadata = Logger.metadata() |
||||
Logger.metadata(fetcher: :shibarium_bridge_l2_realtime) |
||||
|
||||
items = |
||||
with false <- is_nil(Application.get_env(:indexer, Indexer.Fetcher.Shibarium.L2)[:start_block]), |
||||
false <- System.get_env("CHAIN_TYPE") != "shibarium", |
||||
child_chain = Application.get_env(:indexer, Indexer.Fetcher.Shibarium.L2)[:child_chain], |
||||
weth = Application.get_env(:indexer, Indexer.Fetcher.Shibarium.L2)[:weth], |
||||
bone_withdraw = Application.get_env(:indexer, Indexer.Fetcher.Shibarium.L2)[:bone_withdraw], |
||||
true <- Helper.address_correct?(child_chain), |
||||
true <- Helper.address_correct?(weth), |
||||
true <- Helper.address_correct?(bone_withdraw) do |
||||
child_chain = String.downcase(child_chain) |
||||
weth = String.downcase(weth) |
||||
bone_withdraw = String.downcase(bone_withdraw) |
||||
|
||||
block_numbers = Enum.map(blocks, fn block -> block.number end) |
||||
start_block = Enum.min(block_numbers) |
||||
end_block = Enum.max(block_numbers) |
||||
|
||||
Helper.log_blocks_chunk_handling(start_block, end_block, start_block, end_block, nil, "L2") |
||||
|
||||
deposit_transaction_hashes = |
||||
transactions_with_receipts |
||||
|> Enum.filter(fn tx -> tx.from_address_hash == burn_address_hash_string() end) |
||||
|> Enum.map(fn tx -> tx.hash end) |
||||
|
||||
deposit_events = |
||||
logs |
||||
|> Enum.filter(&Enum.member?(deposit_transaction_hashes, &1.transaction_hash)) |
||||
|> L2.filter_deposit_events(child_chain) |
||||
|
||||
withdrawal_transaction_hashes = |
||||
transactions_with_receipts |
||||
|> Enum.filter(fn tx -> |
||||
# filter by `withdraw(uint256 amount)` signature |
||||
String.downcase(String.slice(tx.input, 0..9)) == withdraw_method_signature() |
||||
end) |
||||
|> Enum.map(fn tx -> tx.hash end) |
||||
|
||||
withdrawal_events = |
||||
logs |
||||
|> Enum.filter(&Enum.member?(withdrawal_transaction_hashes, &1.transaction_hash)) |
||||
|> L2.filter_withdrawal_events(bone_withdraw) |
||||
|
||||
events = deposit_events ++ withdrawal_events |
||||
timestamps = Enum.reduce(blocks, %{}, fn block, acc -> Map.put(acc, block.number, block.timestamp) end) |
||||
|
||||
operations = L2.prepare_operations({events, timestamps}, weth) |
||||
items = prepare_insert_items(operations, L2) |
||||
|
||||
Helper.log_blocks_chunk_handling( |
||||
start_block, |
||||
end_block, |
||||
start_block, |
||||
end_block, |
||||
"#{Enum.count(operations)} L2 operation(s)", |
||||
"L2" |
||||
) |
||||
|
||||
items |
||||
else |
||||
true -> |
||||
[] |
||||
|
||||
false -> |
||||
Logger.error( |
||||
"ChildChain or WETH or BoneWithdraw contract address is incorrect. Cannot use #{__MODULE__} for parsing logs." |
||||
) |
||||
|
||||
[] |
||||
end |
||||
|
||||
Logger.reset_metadata(prev_metadata) |
||||
|
||||
items |
||||
end |
||||
end |
Loading…
Reference in new issue