Merge branch 'master' into feature/#1476-add-styles-for-POSDAO-network

* master:
  Optimize token holder count updates when importing address current balances
pull/1704/head
Gabriel Rodriguez Alsina 6 years ago
commit 50eed5812a
  1. 1
      CHANGELOG.md
  2. 3
      apps/explorer/lib/explorer/chain/address/current_token_balance.ex
  3. 68
      apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex
  4. 10
      apps/explorer/priv/repo/migrations/20190321185644_add_old_value_for_current_token_balances.exs
  5. 10
      apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs

@ -11,6 +11,7 @@
- [#1630](https://github.com/poanetwork/blockscout/pull/1630) - (Fix) colour for release link in the footer - [#1630](https://github.com/poanetwork/blockscout/pull/1630) - (Fix) colour for release link in the footer
- [#1621](https://github.com/poanetwork/blockscout/pull/1621) - Modify query to fetch failed contract creations - [#1621](https://github.com/poanetwork/blockscout/pull/1621) - Modify query to fetch failed contract creations
- [#1614](https://github.com/poanetwork/blockscout/pull/1614) - Do not fetch burn address token balance - [#1614](https://github.com/poanetwork/blockscout/pull/1614) - Do not fetch burn address token balance
- [#1639](https://github.com/poanetwork/blockscout/pull/1614) - Optimize token holder count updates when importing address current balances
- [#1643](https://github.com/poanetwork/blockscout/pull/1643) - Set internal_transactions_indexed_at for empty blocks - [#1643](https://github.com/poanetwork/blockscout/pull/1643) - Set internal_transactions_indexed_at for empty blocks
- [#1647](https://github.com/poanetwork/blockscout/pull/1647) - Fix typo in view - [#1647](https://github.com/poanetwork/blockscout/pull/1647) - Fix typo in view

@ -40,6 +40,9 @@ defmodule Explorer.Chain.Address.CurrentTokenBalance do
field(:block_number, :integer) field(:block_number, :integer)
field(:value_fetched_at, :utc_datetime_usec) field(:value_fetched_at, :utc_datetime_usec)
# A transient field for deriving token holder count deltas during address_current_token_balances upserts
field(:old_value, :decimal)
belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address) belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address)
belongs_to( belongs_to(

@ -26,7 +26,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
def to_holder_address_hash_set_by_token_contract_address_hash(address_current_token_balances) def to_holder_address_hash_set_by_token_contract_address_hash(address_current_token_balances)
when is_list(address_current_token_balances) do when is_list(address_current_token_balances) do
address_current_token_balances address_current_token_balances
|> Stream.filter(fn %{value: value} -> not is_nil(value) && Decimal.cmp(value, 0) == :gt end) |> Stream.filter(fn %{value: value} -> valid_holder?(value) end)
|> Enum.reduce(%{}, fn %{token_contract_address_hash: token_contract_address_hash, address_hash: address_hash}, |> Enum.reduce(%{}, fn %{token_contract_address_hash: token_contract_address_hash, address_hash: address_hash},
acc_holder_address_hash_set_by_token_contract_address_hash -> acc_holder_address_hash_set_by_token_contract_address_hash ->
updated_holder_address_hash_set = updated_holder_address_hash_set =
@ -106,26 +106,19 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
|> Map.put_new(:timeout, @timeout) |> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps) |> Map.put(:timestamps, timestamps)
timeout = insert_options.timeout
# order so that row ShareLocks are grabbed in a consistent order # order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.token_contract_address_hash}) ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.token_contract_address_hash})
multi multi
|> Multi.run(:deleted_address_current_token_balances, fn repo, _ ->
deleted_address_current_token_balances(repo, ordered_changes_list, %{timeout: timeout})
end)
|> Multi.run(:address_current_token_balances, fn repo, _ -> |> Multi.run(:address_current_token_balances, fn repo, _ ->
insert(repo, ordered_changes_list, insert_options) insert(repo, ordered_changes_list, insert_options)
end) end)
|> Multi.run(:address_current_token_balances_update_token_holder_counts, fn repo, |> Multi.run(:address_current_token_balances_update_token_holder_counts, fn repo,
%{ %{
deleted_address_current_token_balances:
deleted,
address_current_token_balances: address_current_token_balances:
inserted upserted_balances
} -> } ->
token_holder_count_deltas = token_holder_count_deltas(%{deleted: deleted, inserted: inserted}) token_holder_count_deltas = upserted_balances_to_holder_count_deltas(upserted_balances)
Tokens.update_holder_counts_with_deltas( Tokens.update_holder_counts_with_deltas(
repo, repo,
@ -138,41 +131,29 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
@impl Import.Runner @impl Import.Runner
def timeout, do: @timeout def timeout, do: @timeout
@spec deleted_address_current_token_balances(Repo.t(), [map()], %{timeout: timeout()}) :: defp valid_holder?(value) do
{:ok, [CurrentTokenBalance.t()]} not is_nil(value) and Decimal.cmp(value, 0) == :gt
defp deleted_address_current_token_balances(_, [], _), do: {:ok, []} end
defp deleted_address_current_token_balances(repo, changes_list, %{timeout: timeout})
when is_atom(repo) and is_list(changes_list) do
initial_query =
from(current_token_balance in CurrentTokenBalance,
select:
map(current_token_balance, [
:address_hash,
:token_contract_address_hash,
# to determine if a holder for `update_token_holder_counts`
:value
]),
# to maintain order of lock for `address_current_token_balances`
lock: "FOR UPDATE"
)
final_query = # Assumes existence of old_value field with previous value or nil
Enum.reduce(changes_list, initial_query, fn %{ defp upserted_balances_to_holder_count_deltas(upserted_balances) do
address_hash: address_hash, upserted_balances
token_contract_address_hash: token_contract_address_hash, |> Enum.map(fn %{token_contract_address_hash: contract_address_hash, value: value, old_value: old_value} ->
block_number: block_number delta =
}, cond do
acc_query -> not valid_holder?(old_value) and valid_holder?(value) -> 1
from(current_token_balance in acc_query, valid_holder?(old_value) and not valid_holder?(value) -> -1
or_where: true -> 0
current_token_balance.address_hash == ^address_hash and end
current_token_balance.token_contract_address_hash == ^token_contract_address_hash and
current_token_balance.block_number < ^block_number
)
end)
{:ok, repo.all(final_query, timeout: timeout)} %{contract_address_hash: contract_address_hash, delta: delta}
end)
|> Enum.group_by(& &1.contract_address_hash, & &1.delta)
|> Enum.map(fn {contract_address_hash, deltas} ->
%{contract_address_hash: contract_address_hash, delta: Enum.sum(deltas)}
end)
|> Enum.filter(fn %{delta: delta} -> delta != 0 end)
|> Enum.sort_by(& &1.contract_address_hash)
end end
defp holder_count_delta(%{ defp holder_count_delta(%{
@ -236,6 +217,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
block_number: fragment("EXCLUDED.block_number"), block_number: fragment("EXCLUDED.block_number"),
value: fragment("EXCLUDED.value"), value: fragment("EXCLUDED.value"),
value_fetched_at: fragment("EXCLUDED.value_fetched_at"), value_fetched_at: fragment("EXCLUDED.value_fetched_at"),
old_value: current_token_balance.value,
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", current_token_balance.inserted_at), inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", current_token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", current_token_balance.updated_at) updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", current_token_balance.updated_at)
] ]

@ -0,0 +1,10 @@
defmodule Explorer.Repo.Migrations.AddOldValueForCurrentTokenBalances do
use Ecto.Migration
def change do
alter table(:address_current_token_balances) do
# A transient field for deriving token holder count deltas during address_current_token_balances upserts
add(:old_value, :decimal, null: true)
end
end
end

@ -247,16 +247,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalancesTest do
assert {:ok, assert {:ok,
%{ %{
deleted_address_current_token_balances: [
%{
address_hash: ^non_holder_becomes_holder_address_hash,
token_contract_address_hash: ^token_contract_address_hash
},
%{
address_hash: ^holder_becomes_non_holder_address_hash,
token_contract_address_hash: ^token_contract_address_hash
}
],
address_current_token_balances: [ address_current_token_balances: [
%{ %{
address_hash: ^non_holder_becomes_holder_address_hash, address_hash: ^non_holder_becomes_holder_address_hash,

Loading…
Cancel
Save