Merge pull request #619 from poanetwork/614

Fix uniqueness/on_conflict for address_token_balances
pull/613/head
Luke Imhoff 6 years ago committed by GitHub
commit dbeb871f27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      apps/explorer/lib/explorer/chain/import.ex
  2. 4
      apps/explorer/priv/repo/migrations/20180817021704_create_address_token_balances.exs
  3. 39
      apps/indexer/lib/indexer/address/token_balances.ex
  4. 32
      apps/indexer/lib/indexer/balances.ex
  5. 3
      apps/indexer/lib/indexer/block_fetcher.ex
  6. 38
      apps/indexer/test/indexer/address/token_balances_test.exs
  7. 30
      apps/indexer/test/indexer/balances_test.exs

@ -757,8 +757,43 @@ defmodule Explorer.Chain.Import do
{:ok, _} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:address_hash, :block_number],
on_conflict: :replace_all,
conflict_target: ~w(address_hash token_contract_address_hash block_number)a,
on_conflict:
from(
token_balance in TokenBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value_fetched_at
)
]
]
),
for: TokenBalance,
returning: true,
timeout: timeout,

@ -18,12 +18,12 @@ defmodule Explorer.Repo.Migrations.CreateAddressTokenBalances do
timestamps(null: false, type: :utc_datetime)
end
create(unique_index(:address_token_balances, [:address_hash, :block_number]))
create(unique_index(:address_token_balances, ~w(address_hash token_contract_address_hash block_number)a))
create(
unique_index(
:address_token_balances,
[:address_hash, :block_number],
~w(address_hash token_contract_address_hash block_number)a,
name: :unfetched_token_balances,
where: "value_fetched_at IS NULL"
)

@ -0,0 +1,39 @@
defmodule Indexer.Address.TokenBalances do
@moduledoc """
Extracts `Explorer.Address.TokenBalance` params from other schema's params.
"""
def params_set(%{} = import_options) do
Enum.reduce(import_options, MapSet.new(), &reducer/2)
end
defp reducer({:token_transfers_params, token_transfers_params}, initial) when is_list(token_transfers_params) do
Enum.reduce(token_transfers_params, initial, fn %{
block_number: block_number,
from_address_hash: from_address_hash,
to_address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash
},
acc
when is_integer(block_number) and is_binary(from_address_hash) and
is_binary(to_address_hash) and
is_binary(token_contract_address_hash) ->
acc
|> MapSet.put(%{
address_hash: from_address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
})
|> MapSet.put(%{
address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
})
|> MapSet.put(%{
address_hash: token_contract_address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
})
end)
end
end

@ -1,6 +1,6 @@
defmodule Indexer.Balances do
@moduledoc """
Extracts `Explorer.Chain.Balance` params from other schema's params
Extracts `Explorer.Chain.Balance` params from other schema's params.
"""
def params_set(%{} = import_options) do
@ -28,36 +28,6 @@ defmodule Indexer.Balances do
end)
end
defp reducer({:token_transfers_params, token_transfers_params}, initial) when is_list(token_transfers_params) do
Enum.reduce(token_transfers_params, initial, fn %{
block_number: block_number,
from_address_hash: from_address_hash,
to_address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash
},
acc
when is_integer(block_number) and is_binary(from_address_hash) and
is_binary(to_address_hash) and
is_binary(token_contract_address_hash) ->
acc
|> MapSet.put(%{
address_hash: from_address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
})
|> MapSet.put(%{
address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
})
|> MapSet.put(%{
address_hash: token_contract_address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
})
end)
end
defp reducer({:transactions_params, transactions_params}, initial) when is_list(transactions_params) do
Enum.reduce(transactions_params, initial, &transactions_params_reducer/2)
end

@ -7,6 +7,7 @@ defmodule Indexer.BlockFetcher do
alias Explorer.Chain.{Block, Import}
alias Indexer.{AddressExtraction, Balances, TokenTransfers}
alias Indexer.Address.TokenBalances
alias Indexer.BlockFetcher.Receipts
@type address_hash_to_fetched_balance_block_number :: %{String.t() => Block.block_number()}
@ -108,7 +109,7 @@ defmodule Indexer.BlockFetcher do
logs_params: logs,
transactions_params: transactions_with_receipts
}),
token_balances = Balances.params_set(%{token_transfers_params: token_transfers}),
token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}),
{:ok, inserted} <-
import_range(
state,

@ -0,0 +1,38 @@
defmodule Indexer.Address.TokenBalancesTest do
use ExUnit.Case, async: true
alias Explorer.Factory
alias Indexer.Address.TokenBalances
describe "params_set/1" do
test "with token transfer extract from_address, to_address, and token_contract_address_hash" do
block_number = 1
from_address_hash =
Factory.address_hash()
|> to_string()
to_address_hash =
Factory.address_hash()
|> to_string()
token_contract_address_hash =
Factory.address_hash()
|> to_string()
token_transfer_params = %{
block_number: block_number,
from_address_hash: from_address_hash,
to_address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash
}
params_set = TokenBalances.params_set(%{token_transfers_params: [token_transfer_params]})
assert MapSet.size(params_set) == 3
assert %{address_hash: from_address_hash, block_number: block_number}
assert %{address_hash: to_address_hash, block_number: block_number}
assert %{address_hash: token_contract_address_hash, block_number: block_number}
end
end
end

@ -107,36 +107,6 @@ defmodule Indexer.BalancesTest do
assert %{address_hash: address_hash, block_number: block_number}
end
test "with token transfer extract from_address, to_address, and token_contract_address_hash" do
block_number = 1
from_address_hash =
Factory.address_hash()
|> to_string()
to_address_hash =
Factory.address_hash()
|> to_string()
token_contract_address_hash =
Factory.address_hash()
|> to_string()
token_transfer_params = %{
block_number: block_number,
from_address_hash: from_address_hash,
to_address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash
}
params_set = Balances.params_set(%{token_transfers_params: [token_transfer_params]})
assert MapSet.size(params_set) == 3
assert %{address_hash: from_address_hash, block_number: block_number}
assert %{address_hash: to_address_hash, block_number: block_number}
assert %{address_hash: token_contract_address_hash, block_number: block_number}
end
test "with transaction without to_address_hash extracts from_address_hash" do
block_number = 1

Loading…
Cancel
Save