Extract Explorer.Chain.Import.Address.TokenBalances

Standardized on :address_token_balances instead of :token_balances for
options.
pull/859/head
Luke Imhoff 6 years ago
parent 042eb418b2
commit 59c7c38921
  1. 102
      apps/explorer/lib/explorer/chain/import.ex
  2. 103
      apps/explorer/lib/explorer/chain/import/address/token_balances.ex
  3. 6
      apps/explorer/test/explorer/chain/import_test.exs
  4. 2
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  5. 6
      apps/indexer/lib/indexer/block/fetcher.ex
  6. 15
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  7. 2
      apps/indexer/lib/indexer/token_balance/fetcher.ex
  8. 2
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs

@ -3,8 +3,6 @@ defmodule Explorer.Chain.Import do
Bulk importing of data into `Explorer.Repo`
"""
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{
@ -25,13 +23,10 @@ defmodule Explorer.Chain.Import do
@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all
@type params :: [map()]
@type token_balances_options :: %{
required(:params) => params,
optional(:timeout) => timeout
}
@type all_options :: %{
optional(:addresses) => Import.Addresses.options(),
optional(:address_coin_balances) => Import.Address.CoinBalances.options(),
optional(:address_token_balances) => Import.Address.TokenBalances.options(),
optional(:blocks) => Import.Blocks.options(),
optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.options(),
optional(:broadcast) => boolean,
@ -40,7 +35,6 @@ defmodule Explorer.Chain.Import do
optional(:timeout) => timeout,
optional(:token_transfers) => Import.TokenTransfers.options(),
optional(:tokens) => Import.Tokens.options(),
optional(:token_balances) => token_balances_options,
optional(:transactions) => Import.Transactions.options(),
optional(:transaction_forks) => Import.Transaction.Forks.options()
}
@ -49,13 +43,13 @@ defmodule Explorer.Chain.Import do
%{
optional(:addresses) => Import.Addresses.imported(),
optional(:address_coin_balances) => Import.Address.CoinBalances.imported(),
optional(:address_token_balances) => Import.Address.TokenBalances.imported(),
optional(:blocks) => Import.Blocks.imported(),
optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.imported(),
optional(:internal_transactions) => Import.InternalTransactions.imported(),
optional(:logs) => Import.Logs.imported(),
optional(:token_transfers) => Import.TokenTransfers.imported(),
optional(:tokens) => Import.Tokens.imported(),
optional(:token_balances) => [TokenBalance.t()],
optional(:transactions) => Import.Transactions.imported(),
optional(:transaction_forks) => Import.Transaction.Forks.imported()
}}
@ -65,12 +59,9 @@ defmodule Explorer.Chain.Import do
@type timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()}
# timeouts all in milliseconds
# milliseconds
@transaction_timeout 120_000
@insert_token_balances_timeout 60_000
@doc """
Bulk insert all data stored in the `Explorer`.
@ -114,6 +105,8 @@ defmodule Explorer.Chain.Import do
* `:params` - `list` of params for `Explorer.Chain.Address.CoinBalance.changeset/2`.
* `:timeout` - the timeout for inserting all balances. Defaults to `#{Import.Address.CoinBalances.timeout()}`
milliseconds.
* `:address_token_balances`
* `:params` - `list` of params for `Explorer.Chain.TokenBalance.changeset/2`
* `:blocks`
* `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`.
* `:timeout` - the timeout for inserting all blocks. Defaults to `#{Import.Blocks.timeout()}` milliseconds.
@ -156,8 +149,6 @@ defmodule Explorer.Chain.Import do
* `:params` - `list` of params for `Explorer.Chain.Transaction.Fork.changeset/2`.
* `:timeout` - the timeout for inserting all transaction forks. Defaults to
`#{Import.Transaction.Forks.timeout()}` milliseconds.
* `:token_balances`
* `:params` - `list` of params for `Explorer.Chain.TokenBalance.changeset/2`
* `:timeout` - the timeout for `Repo.transaction`. Defaults to `#{@transaction_timeout}` milliseconds.
"""
@ -238,12 +229,12 @@ defmodule Explorer.Chain.Import do
@import_option_key_to_ecto_schema_module %{
addresses: Address,
address_coin_balances: CoinBalance,
address_token_balances: TokenBalance,
blocks: Block,
block_second_degree_relations: Block.SecondDegreeRelation,
internal_transactions: InternalTransaction,
logs: Log,
token_transfers: TokenTransfer,
token_balances: TokenBalance,
tokens: Token,
transactions: Transaction,
transaction_forks: Transaction.Fork
@ -265,86 +256,7 @@ defmodule Explorer.Chain.Import do
|> Import.Logs.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Tokens.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.TokenTransfers.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_token_balances(ecto_schema_module_to_changes_list_map, full_options)
end
defp run_token_balances(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{TokenBalance => token_balances_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :token_balances, fn _ ->
insert_token_balances(
token_balances_changes,
%{
timeout: options[:token_balances][:timeout] || @insert_token_balances_timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
@spec insert_token_balances([map()], %{
required(:timeout) => timeout(),
required(:timestamps) => timestamps()
}) ::
{:ok, [TokenBalance.t()]}
| {:error, [Changeset.t()]}
def insert_token_balances(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number})
{:ok, _} =
insert_changes_list(
ordered_changes_list,
conflict_target: ~w(address_hash token_contract_address_hash block_number)a,
on_conflict:
from(
token_balance in TokenBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value_fetched_at
)
]
]
),
for: TokenBalance,
returning: true,
timeout: timeout,
timestamps: timestamps
)
|> Import.Address.TokenBalances.run(ecto_schema_module_to_changes_list_map, full_options)
end
def insert_changes_list(changes_list, options) when is_list(changes_list) do

@ -0,0 +1,103 @@
defmodule Explorer.Chain.Import.Address.TokenBalances do
@moduledoc """
Bulk imports `t:Explorer.Chain.Address.TokenBalance.t/0`.
"""
require Ecto.Query
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.Address.TokenBalance
alias Explorer.Chain.Import
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [TokenBalance.t()]
def run(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{TokenBalance => token_balances_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :address_token_balances, fn _ ->
insert(
token_balances_changes,
%{
timeout: options[:address_token_balances][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
def timeout, do: @timeout
@spec insert([map()], %{
required(:timeout) => timeout(),
required(:timestamps) => Import.timestamps()
}) ::
{:ok, [TokenBalance.t()]}
| {:error, [Changeset.t()]}
def insert(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number})
{:ok, _} =
Import.insert_changes_list(
ordered_changes_list,
conflict_target: ~w(address_hash token_contract_address_hash block_number)a,
on_conflict:
from(
token_balance in TokenBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
token_balance.value_fetched_at,
token_balance.value_fetched_at,
token_balance.value_fetched_at
)
]
]
),
for: TokenBalance,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
end

@ -363,7 +363,7 @@ defmodule Explorer.Chain.ImportTest do
],
timeout: 5
},
token_balances: %{
address_token_balances: %{
params: [
%{
address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
@ -1483,7 +1483,7 @@ defmodule Explorer.Chain.ImportTest do
tokens: _,
transactions: _,
transaction_forks: _,
token_balances: _
address_token_balances: _
}} =
Import.all(%{
addresses: %{
@ -1570,7 +1570,7 @@ defmodule Explorer.Chain.ImportTest do
params: [%{uncle_hash: uncle_hash, hash: transaction_hash, index: 0}],
timeout: 1
},
token_balances: %{
address_token_balances: %{
params: [
params_for(
:token_balance,

@ -135,7 +135,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
defp async_import_internal_transactions(_, _), do: :ok
defp async_import_token_balances(%{token_balances: token_balances}) do
defp async_import_token_balances(%{address_token_balances: token_balances}) do
TokenBalance.Fetcher.async_fetch(token_balances)
end

@ -25,11 +25,11 @@ defmodule Indexer.Block.Fetcher do
transaction_hash_to_block_number_option: transaction_hash_to_block_number,
addresses: Import.Addresses.options(),
address_coin_balances: Import.Address.CoinBalances.options(),
address_token_balances: Import.Address.TokenBalances.options(),
blocks: Import.Blocks.options(),
block_second_degree_relations: Import.Block.SecondDegreeRelations.options(),
broadcast: boolean,
logs: Import.Logs.options(),
token_balances: Import.token_balances_options(),
token_transfers: Import.TokenTransfers.options(),
tokens: Import.Tokens.options(),
transactions: Import.Transactions.options()
@ -113,14 +113,14 @@ defmodule Indexer.Block.Fetcher do
logs_params: logs,
transactions_params: transactions_with_receipts
}),
token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}),
address_token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}),
{:ok, inserted} <-
__MODULE__.import(
state,
%{
addresses: %{params: addresses},
address_coin_balances: %{params: coin_balances_params_set},
token_balances: %{params: token_balances},
address_token_balances: %{params: address_token_balances},
blocks: %{params: blocks},
block_second_degree_relations: %{params: block_second_degree_relations},
logs: %{params: logs},

@ -77,11 +77,11 @@ defmodule Indexer.Block.Realtime.Fetcher do
def import(
block_fetcher,
%{
address_hash_to_fetched_balance_block_number: address_hash_to_block_number,
address_coin_balances: %{params: address_coin_balances_params},
address_hash_to_fetched_balance_block_number: address_hash_to_block_number,
address_token_balances: %{params: address_token_balances_params},
addresses: %{params: addresses_params},
transactions: %{params: transactions_params},
token_balances: %{params: token_balances_params}
transactions: %{params: transactions_params}
} = options
) do
with {:ok,
@ -99,17 +99,18 @@ defmodule Indexer.Block.Realtime.Fetcher do
addresses_params: internal_transactions_addresses_params,
balances_params: address_coin_balances_params
}),
{:ok, token_balances} <- TokenBalances.fetch_token_balances_from_blockchain(token_balances_params),
{:ok, address_token_balances} <-
TokenBalances.fetch_token_balances_from_blockchain(address_token_balances_params),
chain_import_options =
options
|> Map.drop(@import_options)
|> put_in([:addresses, :params], balances_addresses_params)
|> put_in([:blocks, :params, Access.all(), :consensus], true)
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params)
|> put_in([Access.key(:token_balances), :params], token_balances),
|> put_in([Access.key(:address_token_balances), :params], address_token_balances)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params),
{:ok, imported} = ok <- Chain.import(chain_import_options) do
TokenBalances.log_fetching_errors(__MODULE__, token_balances)
TokenBalances.log_fetching_errors(__MODULE__, address_token_balances)
async_import_remaining_block_data(imported)
ok
end

@ -80,7 +80,7 @@ defmodule Indexer.TokenBalance.Fetcher do
end
def import_token_balances(token_balances_params) do
case Chain.import(%{token_balances: %{params: token_balances_params}, timeout: :infinity}) do
case Chain.import(%{address_token_balances: %{params: token_balances_params}, timeout: :infinity}) do
{:ok, _} ->
:ok

@ -99,7 +99,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
params: [],
on_conflict: :nothing
},
token_balances: %{
address_token_balances: %{
params: []
},
transactions: %{

Loading…
Cancel
Save