Import the current token balances

pull/1010/head
Felipe Renan 6 years ago
parent 233d539a00
commit ed183c988d
  1. 1
      apps/explorer/lib/explorer/chain/import.ex
  2. 124
      apps/explorer/lib/explorer/chain/import/address/current_token_balances.ex
  3. 95
      apps/explorer/test/explorer/chain/import/address/address/current_token_balances_test.exs
  4. 50
      apps/explorer/test/explorer/chain/import_test.exs
  5. 1
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  6. 8
      apps/indexer/lib/indexer/token_balance/fetcher.ex

@ -19,6 +19,7 @@ defmodule Explorer.Chain.Import do
Import.Logs,
Import.Tokens,
Import.TokenTransfers,
Import.Address.CurrentTokenBalances,
Import.Address.TokenBalances
]

@ -0,0 +1,124 @@
defmodule Explorer.Chain.Import.Address.CurrentTokenBalances do
@moduledoc """
Bulk imports `t:Explorer.Chain.Address.CurrentTokenBalance.t/0`.
"""
require Ecto.Query
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.Address.CurrentTokenBalance
alias Explorer.Chain.Import
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type imported :: [CurrentTokenBalance.t()]
@impl Import.Runner
def ecto_schema_module, do: CurrentTokenBalance
@impl Import.Runner
def option_key, do: :address_current_token_balances
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, %{timestamps: timestamps} = options) do
insert_options =
options
|> Map.get(option_key(), %{})
|> Map.take(~w(on_conflict timeout)a)
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :address_current_token_balances, fn _ ->
insert(changes_list, insert_options)
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert([map()], %{
optional(:on_conflict) => Import.Runner.on_conflict(),
required(:timeout) => timeout(),
required(:timestamps) => Import.timestamps()
}) ::
{:ok, [CurrentTokenBalance.t()]}
| {:error, [Changeset.t()]}
def insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
{:ok, _} =
Import.insert_changes_list(
unique_token_balances(changes_list),
conflict_target: ~w(address_hash token_contract_address_hash)a,
on_conflict: on_conflict,
for: CurrentTokenBalance,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
# Remove duplicated token balances based on `{address_hash, token_hash}` considering the last block
# to avoid `cardinality_violation` error in Postgres. This error happens when there are duplicated
# rows being inserted.
defp unique_token_balances(changes_list) do
changes_list
|> Enum.sort(&(&1.block_number > &2.block_number))
|> Enum.uniq_by(fn %{address_hash: address_hash, token_contract_address_hash: token_hash} ->
{address_hash, token_hash}
end)
end
defp default_on_conflict do
from(
current_token_balance in CurrentTokenBalance,
update: [
set: [
block_number:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.block_number ELSE ? END",
current_token_balance.block_number,
current_token_balance.block_number
),
inserted_at:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.inserted_at ELSE ? END",
current_token_balance.block_number,
current_token_balance.inserted_at
),
updated_at:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.updated_at ELSE ? END",
current_token_balance.block_number,
current_token_balance.updated_at
),
value:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.value ELSE ? END",
current_token_balance.block_number,
current_token_balance.value
),
value_fetched_at:
fragment(
"CASE WHEN EXCLUDED.block_number > ? THEN EXCLUDED.value_fetched_at ELSE ? END",
current_token_balance.block_number,
current_token_balance.value_fetched_at
)
]
]
)
end
end

@ -0,0 +1,95 @@
defmodule Explorer.Chain.Import.Address.CurrentTokenBalancesTest do
use Explorer.DataCase
alias Explorer.Chain.Import.Address.CurrentTokenBalances
alias Explorer.Chain.{Address.CurrentTokenBalance}
describe "insert/2" do
setup do
address = insert(:address, hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca")
token = insert(:token)
insert_options = %{
timeout: :infinity,
timestamps: %{inserted_at: DateTime.utc_now(), updated_at: DateTime.utc_now()}
}
%{address: address, token: token, insert_options: insert_options}
end
test "inserts in the current token balances", %{address: address, token: token, insert_options: insert_options} do
changes = [
%{
address_hash: address.hash,
block_number: 1,
token_contract_address_hash: token.contract_address_hash,
value: Decimal.new(100)
}
]
CurrentTokenBalances.insert(changes, insert_options)
current_token_balances =
CurrentTokenBalance
|> Explorer.Repo.all()
|> Enum.count()
assert current_token_balances == 1
end
test "considers the last block upserting", %{address: address, token: token, insert_options: insert_options} do
insert(
:address_current_token_balance,
address: address,
block_number: 1,
token_contract_address_hash: token.contract_address_hash,
value: 100
)
changes = [
%{
address_hash: address.hash,
block_number: 2,
token_contract_address_hash: token.contract_address_hash,
value: Decimal.new(200)
}
]
CurrentTokenBalances.insert(changes, insert_options)
current_token_balance = Explorer.Repo.get_by(CurrentTokenBalance, address_hash: address.hash)
assert current_token_balance.block_number == 2
assert current_token_balance.value == Decimal.new(200)
end
test "considers the last block when there are duplicated params", %{
address: address,
token: token,
insert_options: insert_options
} do
changes = [
%{
address_hash: address.hash,
block_number: 4,
token_contract_address_hash: token.contract_address_hash,
value: Decimal.new(200)
},
%{
address_hash: address.hash,
block_number: 1,
token_contract_address_hash: token.contract_address_hash,
value: Decimal.new(100)
}
]
CurrentTokenBalances.insert(changes, insert_options)
current_token_balance = Explorer.Repo.get_by(CurrentTokenBalance, address_hash: address.hash)
assert current_token_balance.block_number == 4
assert current_token_balance.value == Decimal.new(200)
end
end
end

@ -6,6 +6,7 @@ defmodule Explorer.Chain.ImportTest do
alias Explorer.Chain.{
Address,
Address.TokenBalance,
Address.CurrentTokenBalance,
Block,
Data,
Log,
@ -395,6 +396,55 @@ defmodule Explorer.Chain.ImportTest do
assert 3 == count
end
test "inserts a current_token_balance" do
params = %{
addresses: %{
params: [
%{hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca"},
%{hash: "0x515c09c5bba1ed566b02a5b0599ec5d5d0aee73d"},
%{hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b"}
],
timeout: 5
},
tokens: %{
on_conflict: :nothing,
params: [
%{
contract_address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b",
type: "ERC-20"
}
],
timeout: 5
},
address_current_token_balances: %{
params: [
%{
address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
token_contract_address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b",
block_number: "37",
value: 200
},
%{
address_hash: "0x515c09c5bba1ed566b02a5b0599ec5d5d0aee73d",
token_contract_address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b",
block_number: "37",
value: 100
}
],
timeout: 5
}
}
Import.all(params)
count =
CurrentTokenBalance
|> Explorer.Repo.all()
|> Enum.count()
assert count == 2
end
test "with empty map" do
assert {:ok, %{}} == Import.all(%{})
end

@ -107,6 +107,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
|> put_in([:addresses, :params], balances_addresses_params)
|> put_in([:blocks, :params, Access.all(), :consensus], true)
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params)
|> put_in([Access.key(:address_current_token_balances, %{}), :params], address_token_balances)
|> put_in([Access.key(:address_token_balances), :params], address_token_balances)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params),
{:ok, imported} = ok <- Chain.import(chain_import_options) do

@ -80,7 +80,13 @@ defmodule Indexer.TokenBalance.Fetcher do
end
def import_token_balances(token_balances_params) do
case Chain.import(%{address_token_balances: %{params: token_balances_params}, timeout: :infinity}) do
import_params = %{
address_token_balances: %{params: token_balances_params},
address_current_token_balances: %{params: token_balances_params},
timeout: :infinity
}
case Chain.import(import_params) do
{:ok, _} ->
:ok

Loading…
Cancel
Save