From 6165f6757ff76c59707a4fc47e17533846bf7463 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 3 Jan 2019 09:20:17 -0600 Subject: [PATCH] Move uniquifiation of address current token balances out of runner For all other runners, it is the responsibility of the caller to ensure unique entries. This ensures that the least amount of data as possible is sent to `Explorer.Chain.Import.all`, which speeds up the whole import process. --- .../runner/address/current_token_balances.ex | 13 +-------- .../address/current_token_balances_test.exs | 28 ------------------- .../lib/indexer/block/realtime/fetcher.ex | 3 +- .../lib/indexer/token_balance/fetcher.ex | 2 +- apps/indexer/lib/indexer/token_balances.ex | 10 +++++++ 5 files changed, 14 insertions(+), 42 deletions(-) diff --git a/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex b/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex index 1af76ea9b8..737a224268 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex @@ -63,7 +63,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do {:ok, _} = Import.insert_changes_list( repo, - unique_token_balances(changes_list), + changes_list, conflict_target: ~w(address_hash token_contract_address_hash)a, on_conflict: on_conflict, for: CurrentTokenBalance, @@ -73,17 +73,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do ) end - # Remove duplicated token balances based on `{address_hash, token_hash}` considering the last block - # to avoid `cardinality_violation` error in Postgres. This error happens when there are duplicated - # rows being inserted. - defp unique_token_balances(changes_list) do - changes_list - |> Enum.sort(&(&1.block_number > &2.block_number)) - |> Enum.uniq_by(fn %{address_hash: address_hash, token_contract_address_hash: token_hash} -> - {address_hash, token_hash} - end) - end - defp default_on_conflict do from( current_token_balance in CurrentTokenBalance, diff --git a/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs b/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs index ad49a64261..13de1c3ada 100644 --- a/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs @@ -63,33 +63,5 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalancesTest do assert current_token_balance.block_number == 2 assert current_token_balance.value == Decimal.new(200) end - - test "considers the last block when there are duplicated params", %{ - address: address, - token: token, - insert_options: insert_options - } do - changes = [ - %{ - address_hash: address.hash, - block_number: 4, - token_contract_address_hash: token.contract_address_hash, - value: Decimal.new(200) - }, - %{ - address_hash: address.hash, - block_number: 1, - token_contract_address_hash: token.contract_address_hash, - value: Decimal.new(100) - } - ] - - CurrentTokenBalances.insert(Repo, changes, insert_options) - - current_token_balance = Repo.get_by(CurrentTokenBalance, address_hash: address.hash) - - assert current_token_balance.block_number == 4 - assert current_token_balance.value == Decimal.new(200) - end end end diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index f9a6d83cc3..688a0fe9ba 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -117,13 +117,14 @@ defmodule Indexer.Block.Realtime.Fetcher do })}, {:address_token_balances, {:ok, address_token_balances}} <- {:address_token_balances, fetch_token_balances(address_token_balances_params)}, + address_current_token_balances = TokenBalances.to_address_current_token_balances(address_token_balances), chain_import_options = options |> Map.drop(@import_options) |> put_in([:addresses, :params], balances_addresses_params) |> put_in([:blocks, :params, Access.all(), :consensus], true) |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params) - |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_token_balances) + |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_current_token_balances) |> put_in([Access.key(:address_token_balances), :params], address_token_balances) |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params), {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do diff --git a/apps/indexer/lib/indexer/token_balance/fetcher.ex b/apps/indexer/lib/indexer/token_balance/fetcher.ex index 69a53d8a30..2cc348199f 100644 --- a/apps/indexer/lib/indexer/token_balance/fetcher.ex +++ b/apps/indexer/lib/indexer/token_balance/fetcher.ex @@ -108,7 +108,7 @@ defmodule Indexer.TokenBalance.Fetcher do import_params = %{ addresses: %{params: addresses_params}, address_token_balances: %{params: token_balances_params}, - address_current_token_balances: %{params: token_balances_params}, + address_current_token_balances: %{params: TokenBalances.to_address_current_token_balances(token_balances_params)}, timeout: :infinity } diff --git a/apps/indexer/lib/indexer/token_balances.ex b/apps/indexer/lib/indexer/token_balances.ex index 9371d17d81..4e39997bb0 100644 --- a/apps/indexer/lib/indexer/token_balances.ex +++ b/apps/indexer/lib/indexer/token_balances.ex @@ -60,6 +60,16 @@ defmodule Indexer.TokenBalances do {:ok, fetched_token_balances} end + def to_address_current_token_balances(address_token_balances) when is_list(address_token_balances) do + address_token_balances + |> Enum.group_by(fn %{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} -> + {address_hash, token_contract_address_hash} + end) + |> Enum.map(fn {_, grouped_address_token_balances} -> + Enum.max_by(grouped_address_token_balances, fn %{block_number: block_number} -> block_number end) + end) + end + defp traced_fetch_token_balance_callback(%Spandex.Span{} = span) do fn balance -> try do