Move uniquifiation of address current token balances out of runner

For all other runners, it is the responsibility of the caller to ensure
unique entries.  This ensures that the least amount of data as possible
is sent to `Explorer.Chain.Import.all`, which speeds up the whole import
process.
pull/1296/head
Luke Imhoff 6 years ago
parent cb2e17e373
commit 6165f6757f
  1. 13
      apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex
  2. 28
      apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs
  3. 3
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  4. 2
      apps/indexer/lib/indexer/token_balance/fetcher.ex
  5. 10
      apps/indexer/lib/indexer/token_balances.ex

@ -63,7 +63,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
{:ok, _} = {:ok, _} =
Import.insert_changes_list( Import.insert_changes_list(
repo, repo,
unique_token_balances(changes_list), changes_list,
conflict_target: ~w(address_hash token_contract_address_hash)a, conflict_target: ~w(address_hash token_contract_address_hash)a,
on_conflict: on_conflict, on_conflict: on_conflict,
for: CurrentTokenBalance, for: CurrentTokenBalance,
@ -73,17 +73,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
) )
end end
# Remove duplicated token balances based on `{address_hash, token_hash}` considering the last block
# to avoid `cardinality_violation` error in Postgres. This error happens when there are duplicated
# rows being inserted.
defp unique_token_balances(changes_list) do
changes_list
|> Enum.sort(&(&1.block_number > &2.block_number))
|> Enum.uniq_by(fn %{address_hash: address_hash, token_contract_address_hash: token_hash} ->
{address_hash, token_hash}
end)
end
defp default_on_conflict do defp default_on_conflict do
from( from(
current_token_balance in CurrentTokenBalance, current_token_balance in CurrentTokenBalance,

@ -63,33 +63,5 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalancesTest do
assert current_token_balance.block_number == 2 assert current_token_balance.block_number == 2
assert current_token_balance.value == Decimal.new(200) assert current_token_balance.value == Decimal.new(200)
end end
test "considers the last block when there are duplicated params", %{
address: address,
token: token,
insert_options: insert_options
} do
changes = [
%{
address_hash: address.hash,
block_number: 4,
token_contract_address_hash: token.contract_address_hash,
value: Decimal.new(200)
},
%{
address_hash: address.hash,
block_number: 1,
token_contract_address_hash: token.contract_address_hash,
value: Decimal.new(100)
}
]
CurrentTokenBalances.insert(Repo, changes, insert_options)
current_token_balance = Repo.get_by(CurrentTokenBalance, address_hash: address.hash)
assert current_token_balance.block_number == 4
assert current_token_balance.value == Decimal.new(200)
end
end end
end end

@ -117,13 +117,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
})}, })},
{:address_token_balances, {:ok, address_token_balances}} <- {:address_token_balances, {:ok, address_token_balances}} <-
{:address_token_balances, fetch_token_balances(address_token_balances_params)}, {:address_token_balances, fetch_token_balances(address_token_balances_params)},
address_current_token_balances = TokenBalances.to_address_current_token_balances(address_token_balances),
chain_import_options = chain_import_options =
options options
|> Map.drop(@import_options) |> Map.drop(@import_options)
|> put_in([:addresses, :params], balances_addresses_params) |> put_in([:addresses, :params], balances_addresses_params)
|> put_in([:blocks, :params, Access.all(), :consensus], true) |> put_in([:blocks, :params, Access.all(), :consensus], true)
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params) |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params)
|> put_in([Access.key(:address_current_token_balances, %{}), :params], address_token_balances) |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_current_token_balances)
|> put_in([Access.key(:address_token_balances), :params], address_token_balances) |> put_in([Access.key(:address_token_balances), :params], address_token_balances)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params), |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params),
{:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do

@ -108,7 +108,7 @@ defmodule Indexer.TokenBalance.Fetcher do
import_params = %{ import_params = %{
addresses: %{params: addresses_params}, addresses: %{params: addresses_params},
address_token_balances: %{params: token_balances_params}, address_token_balances: %{params: token_balances_params},
address_current_token_balances: %{params: token_balances_params}, address_current_token_balances: %{params: TokenBalances.to_address_current_token_balances(token_balances_params)},
timeout: :infinity timeout: :infinity
} }

@ -60,6 +60,16 @@ defmodule Indexer.TokenBalances do
{:ok, fetched_token_balances} {:ok, fetched_token_balances}
end end
def to_address_current_token_balances(address_token_balances) when is_list(address_token_balances) do
address_token_balances
|> Enum.group_by(fn %{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} ->
{address_hash, token_contract_address_hash}
end)
|> Enum.map(fn {_, grouped_address_token_balances} ->
Enum.max_by(grouped_address_token_balances, fn %{block_number: block_number} -> block_number end)
end)
end
defp traced_fetch_token_balance_callback(%Spandex.Span{} = span) do defp traced_fetch_token_balance_callback(%Spandex.Span{} = span) do
fn balance -> fn balance ->
try do try do

Loading…
Cancel
Save