Merge pull request #1296 from poanetwork/1295

Order current token balance inserts to prevent deadlock
pull/1292/head
Luke Imhoff 6 years ago committed by GitHub
commit e88fb5c999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex
  2. 3
      apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex
  3. 28
      apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs
  4. 3
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  5. 2
      apps/indexer/lib/indexer/token_balance/fetcher.ex
  6. 10
      apps/indexer/lib/indexer/token_balances.ex

@ -60,10 +60,13 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
when is_atom(repo) and is_list(changes_list) do when is_atom(repo) and is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.token_contract_address_hash})
{:ok, _} = {:ok, _} =
Import.insert_changes_list( Import.insert_changes_list(
repo, repo,
unique_token_balances(changes_list), ordered_changes_list,
conflict_target: ~w(address_hash token_contract_address_hash)a, conflict_target: ~w(address_hash token_contract_address_hash)a,
on_conflict: on_conflict, on_conflict: on_conflict,
for: CurrentTokenBalance, for: CurrentTokenBalance,
@ -73,17 +76,6 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
) )
end end
# Remove duplicated token balances based on `{address_hash, token_hash}` considering the last block
# to avoid `cardinality_violation` error in Postgres. This error happens when there are duplicated
# rows being inserted.
defp unique_token_balances(changes_list) do
changes_list
|> Enum.sort(&(&1.block_number > &2.block_number))
|> Enum.uniq_by(fn %{address_hash: address_hash, token_contract_address_hash: token_hash} ->
{address_hash, token_hash}
end)
end
defp default_on_conflict do defp default_on_conflict do
from( from(
current_token_balance in CurrentTokenBalance, current_token_balance in CurrentTokenBalance,

@ -60,7 +60,8 @@ defmodule Explorer.Chain.Import.Runner.Address.TokenBalances do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# order so that row ShareLocks are grabbed in a consistent order # order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number}) ordered_changes_list =
Enum.sort_by(changes_list, &{&1.address_hash, &1.token_contract_address_hash, &1.block_number})
{:ok, _} = {:ok, _} =
Import.insert_changes_list( Import.insert_changes_list(

@ -63,33 +63,5 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalancesTest do
assert current_token_balance.block_number == 2 assert current_token_balance.block_number == 2
assert current_token_balance.value == Decimal.new(200) assert current_token_balance.value == Decimal.new(200)
end end
test "considers the last block when there are duplicated params", %{
address: address,
token: token,
insert_options: insert_options
} do
changes = [
%{
address_hash: address.hash,
block_number: 4,
token_contract_address_hash: token.contract_address_hash,
value: Decimal.new(200)
},
%{
address_hash: address.hash,
block_number: 1,
token_contract_address_hash: token.contract_address_hash,
value: Decimal.new(100)
}
]
CurrentTokenBalances.insert(Repo, changes, insert_options)
current_token_balance = Repo.get_by(CurrentTokenBalance, address_hash: address.hash)
assert current_token_balance.block_number == 4
assert current_token_balance.value == Decimal.new(200)
end
end end
end end

@ -117,13 +117,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
})}, })},
{:address_token_balances, {:ok, address_token_balances}} <- {:address_token_balances, {:ok, address_token_balances}} <-
{:address_token_balances, fetch_token_balances(address_token_balances_params)}, {:address_token_balances, fetch_token_balances(address_token_balances_params)},
address_current_token_balances = TokenBalances.to_address_current_token_balances(address_token_balances),
chain_import_options = chain_import_options =
options options
|> Map.drop(@import_options) |> Map.drop(@import_options)
|> put_in([:addresses, :params], balances_addresses_params) |> put_in([:addresses, :params], balances_addresses_params)
|> put_in([:blocks, :params, Access.all(), :consensus], true) |> put_in([:blocks, :params, Access.all(), :consensus], true)
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params) |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params)
|> put_in([Access.key(:address_current_token_balances, %{}), :params], address_token_balances) |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_current_token_balances)
|> put_in([Access.key(:address_token_balances), :params], address_token_balances) |> put_in([Access.key(:address_token_balances), :params], address_token_balances)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params), |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params),
{:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do

@ -108,7 +108,7 @@ defmodule Indexer.TokenBalance.Fetcher do
import_params = %{ import_params = %{
addresses: %{params: addresses_params}, addresses: %{params: addresses_params},
address_token_balances: %{params: token_balances_params}, address_token_balances: %{params: token_balances_params},
address_current_token_balances: %{params: token_balances_params}, address_current_token_balances: %{params: TokenBalances.to_address_current_token_balances(token_balances_params)},
timeout: :infinity timeout: :infinity
} }

@ -60,6 +60,16 @@ defmodule Indexer.TokenBalances do
{:ok, fetched_token_balances} {:ok, fetched_token_balances}
end end
def to_address_current_token_balances(address_token_balances) when is_list(address_token_balances) do
address_token_balances
|> Enum.group_by(fn %{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} ->
{address_hash, token_contract_address_hash}
end)
|> Enum.map(fn {_, grouped_address_token_balances} ->
Enum.max_by(grouped_address_token_balances, fn %{block_number: block_number} -> block_number end)
end)
end
defp traced_fetch_token_balance_callback(%Spandex.Span{} = span) do defp traced_fetch_token_balance_callback(%Spandex.Span{} = span) do
fn balance -> fn balance ->
try do try do

Loading…
Cancel
Save