Optimize token holder count updates when importing address current balances

Problem: realtime fetcher transactions takes too long and often fails with
a timeout. The reason of timeout is deleted_address_current_token_balances
step of import, which selects previous balances for the rows we are going
to update. The SELECT query it makes is extremely slow.

Solution: we incorporate fetching old balance value into the upsert itself.
This is done by help of a transient old_value field, which is updated in
ON CONFLICT clause of INSERT with the previous value and returned.

Upgrading: migration 20190321185644_add_old_value_for_current_token_balances.exs
pull/1639/head
goodsoft 6 years ago
parent 8f60e68d31
commit ea7d0d007f
No known key found for this signature in database
GPG Key ID: DF5159A3A5F09D21
  1. 1
      CHANGELOG.md
  2. 3
      apps/explorer/lib/explorer/chain/address/current_token_balance.ex
  3. 70
      apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex
  4. 10
      apps/explorer/priv/repo/migrations/20190321185644_add_old_value_for_current_token_balances.exs
  5. 10
      apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs

@ -8,6 +8,7 @@
- [#1621](https://github.com/poanetwork/blockscout/pull/1621) - Modify query to fetch failed contract creations
- [#1614](https://github.com/poanetwork/blockscout/pull/1614) - Do not fetch burn address token balance
- [#1639](https://github.com/poanetwork/blockscout/pull/1614) - Optimize token holder count updates when importing address current balances
### Chore

@ -40,6 +40,9 @@ defmodule Explorer.Chain.Address.CurrentTokenBalance do
field(:block_number, :integer)
field(:value_fetched_at, :utc_datetime_usec)
# A transient field for deriving token holder count deltas during address_current_token_balances upserts
field(:old_value, :decimal)
belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address)
belongs_to(

@ -26,7 +26,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
def to_holder_address_hash_set_by_token_contract_address_hash(address_current_token_balances)
when is_list(address_current_token_balances) do
address_current_token_balances
|> Stream.filter(fn %{value: value} -> not is_nil(value) && Decimal.cmp(value, 0) == :gt end)
|> Stream.filter(fn %{value: value} -> valid_holder?(value) end)
|> Enum.reduce(%{}, fn %{token_contract_address_hash: token_contract_address_hash, address_hash: address_hash},
acc_holder_address_hash_set_by_token_contract_address_hash ->
updated_holder_address_hash_set =
@ -106,26 +106,19 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
timeout = insert_options.timeout
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.token_contract_address_hash})
multi
|> Multi.run(:deleted_address_current_token_balances, fn repo, _ ->
deleted_address_current_token_balances(repo, ordered_changes_list, %{timeout: timeout})
end)
|> Multi.run(:address_current_token_balances, fn repo, _ ->
insert(repo, ordered_changes_list, insert_options)
end)
|> Multi.run(:address_current_token_balances_update_token_holder_counts, fn repo,
%{
deleted_address_current_token_balances:
deleted,
address_current_token_balances:
inserted
upserted_balances
} ->
token_holder_count_deltas = token_holder_count_deltas(%{deleted: deleted, inserted: inserted})
token_holder_count_deltas = upserted_balances_to_holder_count_deltas(upserted_balances)
Tokens.update_holder_counts_with_deltas(
repo,
@ -138,41 +131,29 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
@impl Import.Runner
def timeout, do: @timeout
@spec deleted_address_current_token_balances(Repo.t(), [map()], %{timeout: timeout()}) ::
{:ok, [CurrentTokenBalance.t()]}
defp deleted_address_current_token_balances(_, [], _), do: {:ok, []}
defp deleted_address_current_token_balances(repo, changes_list, %{timeout: timeout})
when is_atom(repo) and is_list(changes_list) do
initial_query =
from(current_token_balance in CurrentTokenBalance,
select:
map(current_token_balance, [
:address_hash,
:token_contract_address_hash,
# to determine if a holder for `update_token_holder_counts`
:value
]),
# to maintain order of lock for `address_current_token_balances`
lock: "FOR UPDATE"
)
defp valid_holder?(value) do
not is_nil(value) and Decimal.cmp(value, 0) == :gt
end
final_query =
Enum.reduce(changes_list, initial_query, fn %{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
},
acc_query ->
from(current_token_balance in acc_query,
or_where:
current_token_balance.address_hash == ^address_hash and
current_token_balance.token_contract_address_hash == ^token_contract_address_hash and
current_token_balance.block_number < ^block_number
)
end)
{:ok, repo.all(final_query, timeout: timeout)}
# Assumes existence of old_value field with previous value or nil
defp upserted_balances_to_holder_count_deltas(upserted_balances) do
upserted_balances
|> Enum.map(fn %{token_contract_address_hash: contract_address_hash, value: value, old_value: old_value} ->
delta =
cond do
not valid_holder?(old_value) and valid_holder?(value) -> 1
valid_holder?(old_value) and not valid_holder?(value) -> -1
true -> 0
end
%{contract_address_hash: contract_address_hash, delta: delta}
end)
|> Enum.group_by(& &1.contract_address_hash, & &1.delta)
|> Enum.map(fn {contract_address_hash, deltas} ->
%{contract_address_hash: contract_address_hash, delta: Enum.sum(deltas)}
end)
|> Enum.filter(fn %{delta: delta} -> delta != 0 end)
|> Enum.sort_by(& &1.contract_address_hash)
end
defp holder_count_delta(%{
@ -236,6 +217,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
block_number: fragment("EXCLUDED.block_number"),
value: fragment("EXCLUDED.value"),
value_fetched_at: fragment("EXCLUDED.value_fetched_at"),
old_value: current_token_balance.value,
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", current_token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", current_token_balance.updated_at)
]

@ -0,0 +1,10 @@
defmodule Explorer.Repo.Migrations.AddOldValueForCurrentTokenBalances do
use Ecto.Migration
def change do
alter table(:address_current_token_balances) do
# A transient field for deriving token holder count deltas during address_current_token_balances upserts
add(:old_value, :decimal, null: true)
end
end
end

@ -247,16 +247,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalancesTest do
assert {:ok,
%{
deleted_address_current_token_balances: [
%{
address_hash: ^non_holder_becomes_holder_address_hash,
token_contract_address_hash: ^token_contract_address_hash
},
%{
address_hash: ^holder_becomes_non_holder_address_hash,
token_contract_address_hash: ^token_contract_address_hash
}
],
address_current_token_balances: [
%{
address_hash: ^non_holder_becomes_holder_address_hash,

Loading…
Cancel
Save