From ea7d0d007f673ccd7286af67c4f37933499ca2b3 Mon Sep 17 00:00:00 2001 From: goodsoft Date: Fri, 22 Mar 2019 16:50:54 +0200 Subject: [PATCH] Optimize token holder count updates when importing address current balances Problem: realtime fetcher transactions takes too long and often fails with a timeout. The reason of timeout is deleted_address_current_token_balances step of import, which selects previous balances for the rows we are going to update. The SELECT query it makes is extremely slow. Solution: we incorporate fetching old balance value into the upsert itself. This is done by help of a transient old_value field, which is updated in ON CONFLICT clause of INSERT with the previous value and returned. Upgrading: migration 20190321185644_add_old_value_for_current_token_balances.exs --- CHANGELOG.md | 1 + .../chain/address/current_token_balance.ex | 3 + .../runner/address/current_token_balances.ex | 70 +++++++------------ ...d_old_value_for_current_token_balances.exs | 10 +++ .../address/current_token_balances_test.exs | 10 --- 5 files changed, 40 insertions(+), 54 deletions(-) create mode 100644 apps/explorer/priv/repo/migrations/20190321185644_add_old_value_for_current_token_balances.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fcd748491..c73a6d1367 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#1621](https://github.com/poanetwork/blockscout/pull/1621) - Modify query to fetch failed contract creations - [#1614](https://github.com/poanetwork/blockscout/pull/1614) - Do not fetch burn address token balance + - [#1639](https://github.com/poanetwork/blockscout/pull/1614) - Optimize token holder count updates when importing address current balances ### Chore diff --git a/apps/explorer/lib/explorer/chain/address/current_token_balance.ex b/apps/explorer/lib/explorer/chain/address/current_token_balance.ex index 41ab222fd7..1471c9fbec 100644 --- a/apps/explorer/lib/explorer/chain/address/current_token_balance.ex +++ b/apps/explorer/lib/explorer/chain/address/current_token_balance.ex @@ -40,6 +40,9 @@ defmodule Explorer.Chain.Address.CurrentTokenBalance do field(:block_number, :integer) field(:value_fetched_at, :utc_datetime_usec) + # A transient field for deriving token holder count deltas during address_current_token_balances upserts + field(:old_value, :decimal) + belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address) belongs_to( diff --git a/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex b/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex index 78c24f491c..d6259baad8 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex @@ -26,7 +26,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do def to_holder_address_hash_set_by_token_contract_address_hash(address_current_token_balances) when is_list(address_current_token_balances) do address_current_token_balances - |> Stream.filter(fn %{value: value} -> not is_nil(value) && Decimal.cmp(value, 0) == :gt end) + |> Stream.filter(fn %{value: value} -> valid_holder?(value) end) |> Enum.reduce(%{}, fn %{token_contract_address_hash: token_contract_address_hash, address_hash: address_hash}, acc_holder_address_hash_set_by_token_contract_address_hash -> updated_holder_address_hash_set = @@ -106,26 +106,19 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do |> Map.put_new(:timeout, @timeout) |> Map.put(:timestamps, timestamps) - timeout = insert_options.timeout - # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.token_contract_address_hash}) multi - |> Multi.run(:deleted_address_current_token_balances, fn repo, _ -> - deleted_address_current_token_balances(repo, ordered_changes_list, %{timeout: timeout}) - end) |> Multi.run(:address_current_token_balances, fn repo, _ -> insert(repo, ordered_changes_list, insert_options) end) |> Multi.run(:address_current_token_balances_update_token_holder_counts, fn repo, %{ - deleted_address_current_token_balances: - deleted, address_current_token_balances: - inserted + upserted_balances } -> - token_holder_count_deltas = token_holder_count_deltas(%{deleted: deleted, inserted: inserted}) + token_holder_count_deltas = upserted_balances_to_holder_count_deltas(upserted_balances) Tokens.update_holder_counts_with_deltas( repo, @@ -138,41 +131,29 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do @impl Import.Runner def timeout, do: @timeout - @spec deleted_address_current_token_balances(Repo.t(), [map()], %{timeout: timeout()}) :: - {:ok, [CurrentTokenBalance.t()]} - defp deleted_address_current_token_balances(_, [], _), do: {:ok, []} - - defp deleted_address_current_token_balances(repo, changes_list, %{timeout: timeout}) - when is_atom(repo) and is_list(changes_list) do - initial_query = - from(current_token_balance in CurrentTokenBalance, - select: - map(current_token_balance, [ - :address_hash, - :token_contract_address_hash, - # to determine if a holder for `update_token_holder_counts` - :value - ]), - # to maintain order of lock for `address_current_token_balances` - lock: "FOR UPDATE" - ) + defp valid_holder?(value) do + not is_nil(value) and Decimal.cmp(value, 0) == :gt + end - final_query = - Enum.reduce(changes_list, initial_query, fn %{ - address_hash: address_hash, - token_contract_address_hash: token_contract_address_hash, - block_number: block_number - }, - acc_query -> - from(current_token_balance in acc_query, - or_where: - current_token_balance.address_hash == ^address_hash and - current_token_balance.token_contract_address_hash == ^token_contract_address_hash and - current_token_balance.block_number < ^block_number - ) - end) - - {:ok, repo.all(final_query, timeout: timeout)} + # Assumes existence of old_value field with previous value or nil + defp upserted_balances_to_holder_count_deltas(upserted_balances) do + upserted_balances + |> Enum.map(fn %{token_contract_address_hash: contract_address_hash, value: value, old_value: old_value} -> + delta = + cond do + not valid_holder?(old_value) and valid_holder?(value) -> 1 + valid_holder?(old_value) and not valid_holder?(value) -> -1 + true -> 0 + end + + %{contract_address_hash: contract_address_hash, delta: delta} + end) + |> Enum.group_by(& &1.contract_address_hash, & &1.delta) + |> Enum.map(fn {contract_address_hash, deltas} -> + %{contract_address_hash: contract_address_hash, delta: Enum.sum(deltas)} + end) + |> Enum.filter(fn %{delta: delta} -> delta != 0 end) + |> Enum.sort_by(& &1.contract_address_hash) end defp holder_count_delta(%{ @@ -236,6 +217,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do block_number: fragment("EXCLUDED.block_number"), value: fragment("EXCLUDED.value"), value_fetched_at: fragment("EXCLUDED.value_fetched_at"), + old_value: current_token_balance.value, inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", current_token_balance.inserted_at), updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", current_token_balance.updated_at) ] diff --git a/apps/explorer/priv/repo/migrations/20190321185644_add_old_value_for_current_token_balances.exs b/apps/explorer/priv/repo/migrations/20190321185644_add_old_value_for_current_token_balances.exs new file mode 100644 index 0000000000..45cc50d7e4 --- /dev/null +++ b/apps/explorer/priv/repo/migrations/20190321185644_add_old_value_for_current_token_balances.exs @@ -0,0 +1,10 @@ +defmodule Explorer.Repo.Migrations.AddOldValueForCurrentTokenBalances do + use Ecto.Migration + + def change do + alter table(:address_current_token_balances) do + # A transient field for deriving token holder count deltas during address_current_token_balances upserts + add(:old_value, :decimal, null: true) + end + end +end diff --git a/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs b/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs index 123d7e964e..e7dcfe4b06 100644 --- a/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs @@ -247,16 +247,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalancesTest do assert {:ok, %{ - deleted_address_current_token_balances: [ - %{ - address_hash: ^non_holder_becomes_holder_address_hash, - token_contract_address_hash: ^token_contract_address_hash - }, - %{ - address_hash: ^holder_becomes_non_holder_address_hash, - token_contract_address_hash: ^token_contract_address_hash - } - ], address_current_token_balances: [ %{ address_hash: ^non_holder_becomes_holder_address_hash,