diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index 3ba010e6be..28d62ee454 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -19,6 +19,7 @@ defmodule Explorer.Chain.Import do Import.Logs, Import.Tokens, Import.TokenTransfers, + Import.Address.CurrentTokenBalances, Import.Address.TokenBalances ] diff --git a/apps/explorer/lib/explorer/chain/import/address/current_token_balances.ex b/apps/explorer/lib/explorer/chain/import/address/current_token_balances.ex new file mode 100644 index 0000000000..11cf8cf460 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/address/current_token_balances.ex @@ -0,0 +1,124 @@ +defmodule Explorer.Chain.Import.Address.CurrentTokenBalances do + @moduledoc """ + Bulk imports `t:Explorer.Chain.Address.CurrentTokenBalance.t/0`. + """ + + require Ecto.Query + + import Ecto.Query, only: [from: 2] + + alias Ecto.{Changeset, Multi} + alias Explorer.Chain.Address.CurrentTokenBalance + alias Explorer.Chain.Import + + @behaviour Import.Runner + + # milliseconds + @timeout 60_000 + + @type imported :: [CurrentTokenBalance.t()] + + @impl Import.Runner + def ecto_schema_module, do: CurrentTokenBalance + + @impl Import.Runner + def option_key, do: :address_current_token_balances + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } + end + + @impl Import.Runner + def run(multi, changes_list, %{timestamps: timestamps} = options) do + insert_options = + options + |> Map.get(option_key(), %{}) + |> Map.take(~w(on_conflict timeout)a) + |> Map.put_new(:timeout, @timeout) + |> Map.put(:timestamps, timestamps) + + Multi.run(multi, :address_current_token_balances, fn _ -> + insert(changes_list, insert_options) + end) + end + + @impl Import.Runner + def timeout, do: @timeout + + @spec insert([map()], %{ + optional(:on_conflict) => Import.Runner.on_conflict(), + required(:timeout) => timeout(), + required(:timestamps) => Import.timestamps() + }) :: + {:ok, [CurrentTokenBalance.t()]} + | {:error, [Changeset.t()]} + def insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + + {:ok, _} = + Import.insert_changes_list( + unique_token_balances(changes_list), + conflict_target: ~w(address_hash token_contract_address_hash)a, + on_conflict: on_conflict, + for: CurrentTokenBalance, + returning: true, + timeout: timeout, + timestamps: timestamps + ) + end + + # Remove duplicated token balances based on `{address_hash, token_hash}` considering the last block + # to avoid `cardinality_violation` error in Postgres. This error happens when there are duplicated + # rows being inserted. + defp unique_token_balances(changes_list) do + changes_list + |> Enum.sort(&(&1.block_number > &2.block_number)) + |> Enum.uniq_by(fn %{address_hash: address_hash, token_contract_address_hash: token_hash} -> + {address_hash, token_hash} + end) + end + + defp default_on_conflict do + from( + current_token_balance in CurrentTokenBalance, + update: [ + set: [ + block_number: + fragment( + "CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.block_number ELSE ? END", + current_token_balance.block_number, + current_token_balance.block_number + ), + inserted_at: + fragment( + "CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.inserted_at ELSE ? END", + current_token_balance.block_number, + current_token_balance.inserted_at + ), + updated_at: + fragment( + "CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.updated_at ELSE ? END", + current_token_balance.block_number, + current_token_balance.updated_at + ), + value: + fragment( + "CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.value ELSE ? END", + current_token_balance.block_number, + current_token_balance.value + ), + value_fetched_at: + fragment( + "CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.value_fetched_at ELSE ? END", + current_token_balance.block_number, + current_token_balance.value_fetched_at + ) + ] + ] + ) + end +end diff --git a/apps/explorer/test/explorer/chain/import/address/address/current_token_balances_test.exs b/apps/explorer/test/explorer/chain/import/address/address/current_token_balances_test.exs new file mode 100644 index 0000000000..a78b7046ef --- /dev/null +++ b/apps/explorer/test/explorer/chain/import/address/address/current_token_balances_test.exs @@ -0,0 +1,95 @@ +defmodule Explorer.Chain.Import.Address.CurrentTokenBalancesTest do + use Explorer.DataCase + + alias Explorer.Chain.Import.Address.CurrentTokenBalances + + alias Explorer.Chain.{Address.CurrentTokenBalance} + + describe "insert/2" do + setup do + address = insert(:address, hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca") + token = insert(:token) + + insert_options = %{ + timeout: :infinity, + timestamps: %{inserted_at: DateTime.utc_now(), updated_at: DateTime.utc_now()} + } + + %{address: address, token: token, insert_options: insert_options} + end + + test "inserts in the current token balances", %{address: address, token: token, insert_options: insert_options} do + changes = [ + %{ + address_hash: address.hash, + block_number: 1, + token_contract_address_hash: token.contract_address_hash, + value: Decimal.new(100) + } + ] + + CurrentTokenBalances.insert(changes, insert_options) + + current_token_balances = + CurrentTokenBalance + |> Explorer.Repo.all() + |> Enum.count() + + assert current_token_balances == 1 + end + + test "considers the last block upserting", %{address: address, token: token, insert_options: insert_options} do + insert( + :address_current_token_balance, + address: address, + block_number: 1, + token_contract_address_hash: token.contract_address_hash, + value: 100 + ) + + changes = [ + %{ + address_hash: address.hash, + block_number: 2, + token_contract_address_hash: token.contract_address_hash, + value: Decimal.new(200) + } + ] + + CurrentTokenBalances.insert(changes, insert_options) + + current_token_balance = Explorer.Repo.get_by(CurrentTokenBalance, address_hash: address.hash) + + assert current_token_balance.block_number == 2 + assert current_token_balance.value == Decimal.new(200) + end + + test "considers the last block when there are duplicated params", %{ + address: address, + token: token, + insert_options: insert_options + } do + changes = [ + %{ + address_hash: address.hash, + block_number: 4, + token_contract_address_hash: token.contract_address_hash, + value: Decimal.new(200) + }, + %{ + address_hash: address.hash, + block_number: 1, + token_contract_address_hash: token.contract_address_hash, + value: Decimal.new(100) + } + ] + + CurrentTokenBalances.insert(changes, insert_options) + + current_token_balance = Explorer.Repo.get_by(CurrentTokenBalance, address_hash: address.hash) + + assert current_token_balance.block_number == 4 + assert current_token_balance.value == Decimal.new(200) + end + end +end diff --git a/apps/explorer/test/explorer/chain/import_test.exs b/apps/explorer/test/explorer/chain/import_test.exs index ea4a485f6f..89c1c4f3eb 100644 --- a/apps/explorer/test/explorer/chain/import_test.exs +++ b/apps/explorer/test/explorer/chain/import_test.exs @@ -6,6 +6,7 @@ defmodule Explorer.Chain.ImportTest do alias Explorer.Chain.{ Address, Address.TokenBalance, + Address.CurrentTokenBalance, Block, Data, Log, @@ -395,6 +396,55 @@ defmodule Explorer.Chain.ImportTest do assert 3 == count end + test "inserts a current_token_balance" do + params = %{ + addresses: %{ + params: [ + %{hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca"}, + %{hash: "0x515c09c5bba1ed566b02a5b0599ec5d5d0aee73d"}, + %{hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b"} + ], + timeout: 5 + }, + tokens: %{ + on_conflict: :nothing, + params: [ + %{ + contract_address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b", + type: "ERC-20" + } + ], + timeout: 5 + }, + address_current_token_balances: %{ + params: [ + %{ + address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca", + token_contract_address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b", + block_number: "37", + value: 200 + }, + %{ + address_hash: "0x515c09c5bba1ed566b02a5b0599ec5d5d0aee73d", + token_contract_address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b", + block_number: "37", + value: 100 + } + ], + timeout: 5 + } + } + + Import.all(params) + + count = + CurrentTokenBalance + |> Explorer.Repo.all() + |> Enum.count() + + assert count == 2 + end + test "with empty map" do assert {:ok, %{}} == Import.all(%{}) end diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index f84e19ffc1..88682c149a 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -107,6 +107,7 @@ defmodule Indexer.Block.Realtime.Fetcher do |> put_in([:addresses, :params], balances_addresses_params) |> put_in([:blocks, :params, Access.all(), :consensus], true) |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params) + |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_token_balances) |> put_in([Access.key(:address_token_balances), :params], address_token_balances) |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params), {:ok, imported} = ok <- Chain.import(chain_import_options) do diff --git a/apps/indexer/lib/indexer/token_balance/fetcher.ex b/apps/indexer/lib/indexer/token_balance/fetcher.ex index 19df1853f9..f8cf8e65a6 100644 --- a/apps/indexer/lib/indexer/token_balance/fetcher.ex +++ b/apps/indexer/lib/indexer/token_balance/fetcher.ex @@ -80,7 +80,13 @@ defmodule Indexer.TokenBalance.Fetcher do end def import_token_balances(token_balances_params) do - case Chain.import(%{address_token_balances: %{params: token_balances_params}, timeout: :infinity}) do + import_params = %{ + address_token_balances: %{params: token_balances_params}, + address_current_token_balances: %{params: token_balances_params}, + timeout: :infinity + } + + case Chain.import(import_params) do {:ok, _} -> :ok