diff --git a/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex b/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex index 1af76ea9b8..737a224268 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex @@ -63,7 +63,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do {:ok, _} = Import.insert_changes_list( repo, - unique_token_balances(changes_list), + changes_list, conflict_target: ~w(address_hash token_contract_address_hash)a, on_conflict: on_conflict, for: CurrentTokenBalance, @@ -73,17 +73,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do ) end - # Remove duplicated token balances based on `{address_hash, token_hash}` considering the last block - # to avoid `cardinality_violation` error in Postgres. This error happens when there are duplicated - # rows being inserted. - defp unique_token_balances(changes_list) do - changes_list - |> Enum.sort(&(&1.block_number > &2.block_number)) - |> Enum.uniq_by(fn %{address_hash: address_hash, token_contract_address_hash: token_hash} -> - {address_hash, token_hash} - end) - end - defp default_on_conflict do from( current_token_balance in CurrentTokenBalance, diff --git a/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs b/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs index ad49a64261..13de1c3ada 100644 --- a/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs @@ -63,33 +63,5 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalancesTest do assert current_token_balance.block_number == 2 assert current_token_balance.value == Decimal.new(200) end - - test "considers the last block when there are duplicated params", %{ - address: address, - token: token, - insert_options: insert_options - } do - changes = [ - %{ - address_hash: address.hash, - block_number: 4, - token_contract_address_hash: token.contract_address_hash, - value: Decimal.new(200) - }, - %{ - address_hash: address.hash, - block_number: 1, - token_contract_address_hash: token.contract_address_hash, - value: Decimal.new(100) - } - ] - - CurrentTokenBalances.insert(Repo, changes, insert_options) - - current_token_balance = Repo.get_by(CurrentTokenBalance, address_hash: address.hash) - - assert current_token_balance.block_number == 4 - assert current_token_balance.value == Decimal.new(200) - end end end diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index f9a6d83cc3..688a0fe9ba 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -117,13 +117,14 @@ defmodule Indexer.Block.Realtime.Fetcher do })}, {:address_token_balances, {:ok, address_token_balances}} <- {:address_token_balances, fetch_token_balances(address_token_balances_params)}, + address_current_token_balances = TokenBalances.to_address_current_token_balances(address_token_balances), chain_import_options = options |> Map.drop(@import_options) |> put_in([:addresses, :params], balances_addresses_params) |> put_in([:blocks, :params, Access.all(), :consensus], true) |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params) - |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_token_balances) + |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_current_token_balances) |> put_in([Access.key(:address_token_balances), :params], address_token_balances) |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params), {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do diff --git a/apps/indexer/lib/indexer/token_balance/fetcher.ex b/apps/indexer/lib/indexer/token_balance/fetcher.ex index 69a53d8a30..2cc348199f 100644 --- a/apps/indexer/lib/indexer/token_balance/fetcher.ex +++ b/apps/indexer/lib/indexer/token_balance/fetcher.ex @@ -108,7 +108,7 @@ defmodule Indexer.TokenBalance.Fetcher do import_params = %{ addresses: %{params: addresses_params}, address_token_balances: %{params: token_balances_params}, - address_current_token_balances: %{params: token_balances_params}, + address_current_token_balances: %{params: TokenBalances.to_address_current_token_balances(token_balances_params)}, timeout: :infinity } diff --git a/apps/indexer/lib/indexer/token_balances.ex b/apps/indexer/lib/indexer/token_balances.ex index 9371d17d81..4e39997bb0 100644 --- a/apps/indexer/lib/indexer/token_balances.ex +++ b/apps/indexer/lib/indexer/token_balances.ex @@ -60,6 +60,16 @@ defmodule Indexer.TokenBalances do {:ok, fetched_token_balances} end + def to_address_current_token_balances(address_token_balances) when is_list(address_token_balances) do + address_token_balances + |> Enum.group_by(fn %{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} -> + {address_hash, token_contract_address_hash} + end) + |> Enum.map(fn {_, grouped_address_token_balances} -> + Enum.max_by(grouped_address_token_balances, fn %{block_number: block_number} -> block_number end) + end) + end + defp traced_fetch_token_balance_callback(%Spandex.Span{} = span) do fn balance -> try do