|
|
|
@ -26,7 +26,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do |
|
|
|
|
def to_holder_address_hash_set_by_token_contract_address_hash(address_current_token_balances) |
|
|
|
|
when is_list(address_current_token_balances) do |
|
|
|
|
address_current_token_balances |
|
|
|
|
|> Stream.filter(fn %{value: value} -> not is_nil(value) && Decimal.cmp(value, 0) == :gt end) |
|
|
|
|
|> Stream.filter(fn %{value: value} -> valid_holder?(value) end) |
|
|
|
|
|> Enum.reduce(%{}, fn %{token_contract_address_hash: token_contract_address_hash, address_hash: address_hash}, |
|
|
|
|
acc_holder_address_hash_set_by_token_contract_address_hash -> |
|
|
|
|
updated_holder_address_hash_set = |
|
|
|
@ -106,26 +106,19 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do |
|
|
|
|
|> Map.put_new(:timeout, @timeout) |
|
|
|
|
|> Map.put(:timestamps, timestamps) |
|
|
|
|
|
|
|
|
|
timeout = insert_options.timeout |
|
|
|
|
|
|
|
|
|
# order so that row ShareLocks are grabbed in a consistent order |
|
|
|
|
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.token_contract_address_hash}) |
|
|
|
|
|
|
|
|
|
multi |
|
|
|
|
|> Multi.run(:deleted_address_current_token_balances, fn repo, _ -> |
|
|
|
|
deleted_address_current_token_balances(repo, ordered_changes_list, %{timeout: timeout}) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:address_current_token_balances, fn repo, _ -> |
|
|
|
|
insert(repo, ordered_changes_list, insert_options) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:address_current_token_balances_update_token_holder_counts, fn repo, |
|
|
|
|
%{ |
|
|
|
|
deleted_address_current_token_balances: |
|
|
|
|
deleted, |
|
|
|
|
address_current_token_balances: |
|
|
|
|
inserted |
|
|
|
|
upserted_balances |
|
|
|
|
} -> |
|
|
|
|
token_holder_count_deltas = token_holder_count_deltas(%{deleted: deleted, inserted: inserted}) |
|
|
|
|
token_holder_count_deltas = upserted_balances_to_holder_count_deltas(upserted_balances) |
|
|
|
|
|
|
|
|
|
Tokens.update_holder_counts_with_deltas( |
|
|
|
|
repo, |
|
|
|
@ -138,41 +131,29 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do |
|
|
|
|
@impl Import.Runner |
|
|
|
|
def timeout, do: @timeout |
|
|
|
|
|
|
|
|
|
@spec deleted_address_current_token_balances(Repo.t(), [map()], %{timeout: timeout()}) :: |
|
|
|
|
{:ok, [CurrentTokenBalance.t()]} |
|
|
|
|
defp deleted_address_current_token_balances(_, [], _), do: {:ok, []} |
|
|
|
|
|
|
|
|
|
defp deleted_address_current_token_balances(repo, changes_list, %{timeout: timeout}) |
|
|
|
|
when is_atom(repo) and is_list(changes_list) do |
|
|
|
|
initial_query = |
|
|
|
|
from(current_token_balance in CurrentTokenBalance, |
|
|
|
|
select: |
|
|
|
|
map(current_token_balance, [ |
|
|
|
|
:address_hash, |
|
|
|
|
:token_contract_address_hash, |
|
|
|
|
# to determine if a holder for `update_token_holder_counts` |
|
|
|
|
:value |
|
|
|
|
]), |
|
|
|
|
# to maintain order of lock for `address_current_token_balances` |
|
|
|
|
lock: "FOR UPDATE" |
|
|
|
|
) |
|
|
|
|
defp valid_holder?(value) do |
|
|
|
|
not is_nil(value) and Decimal.cmp(value, 0) == :gt |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
final_query = |
|
|
|
|
Enum.reduce(changes_list, initial_query, fn %{ |
|
|
|
|
address_hash: address_hash, |
|
|
|
|
token_contract_address_hash: token_contract_address_hash, |
|
|
|
|
block_number: block_number |
|
|
|
|
}, |
|
|
|
|
acc_query -> |
|
|
|
|
from(current_token_balance in acc_query, |
|
|
|
|
or_where: |
|
|
|
|
current_token_balance.address_hash == ^address_hash and |
|
|
|
|
current_token_balance.token_contract_address_hash == ^token_contract_address_hash and |
|
|
|
|
current_token_balance.block_number < ^block_number |
|
|
|
|
) |
|
|
|
|
end) |
|
|
|
|
# Assumes existence of old_value field with previous value or nil |
|
|
|
|
defp upserted_balances_to_holder_count_deltas(upserted_balances) do |
|
|
|
|
upserted_balances |
|
|
|
|
|> Enum.map(fn %{token_contract_address_hash: contract_address_hash, value: value, old_value: old_value} -> |
|
|
|
|
delta = |
|
|
|
|
cond do |
|
|
|
|
not valid_holder?(old_value) and valid_holder?(value) -> 1 |
|
|
|
|
valid_holder?(old_value) and not valid_holder?(value) -> -1 |
|
|
|
|
true -> 0 |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
{:ok, repo.all(final_query, timeout: timeout)} |
|
|
|
|
%{contract_address_hash: contract_address_hash, delta: delta} |
|
|
|
|
end) |
|
|
|
|
|> Enum.group_by(& &1.contract_address_hash, & &1.delta) |
|
|
|
|
|> Enum.map(fn {contract_address_hash, deltas} -> |
|
|
|
|
%{contract_address_hash: contract_address_hash, delta: Enum.sum(deltas)} |
|
|
|
|
end) |
|
|
|
|
|> Enum.filter(fn %{delta: delta} -> delta != 0 end) |
|
|
|
|
|> Enum.sort_by(& &1.contract_address_hash) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp holder_count_delta(%{ |
|
|
|
@ -236,6 +217,7 @@ defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do |
|
|
|
|
block_number: fragment("EXCLUDED.block_number"), |
|
|
|
|
value: fragment("EXCLUDED.value"), |
|
|
|
|
value_fetched_at: fragment("EXCLUDED.value_fetched_at"), |
|
|
|
|
old_value: current_token_balance.value, |
|
|
|
|
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", current_token_balance.inserted_at), |
|
|
|
|
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", current_token_balance.updated_at) |
|
|
|
|
] |
|
|
|
|