Extract Explorer.Chain.Address.CoinBalances

Standardize on `:address_coin_balances` instead of `:balances` for
option names.
pull/859/head
Luke Imhoff 6 years ago
parent 31ae4f51fe
commit 03557f4200
  1. 2
      .credo.exs
  2. 10
      apps/explorer/lib/explorer/chain.ex
  3. 127
      apps/explorer/lib/explorer/chain/import.ex
  4. 114
      apps/explorer/lib/explorer/chain/import/address/coin_balances.ex
  5. 4
      apps/explorer/lib/explorer/chain/import/addresses.ex
  6. 8
      apps/explorer/test/explorer/chain/import_test.exs
  7. 6
      apps/indexer/lib/indexer/address_extraction.ex
  8. 6
      apps/indexer/lib/indexer/block/fetcher.ex
  9. 8
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  10. 2
      apps/indexer/lib/indexer/coin_balance/fetcher.ex
  11. 2
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  12. 2
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs

@ -75,7 +75,7 @@
# Priority values are: `low, normal, high, higher`
#
{Credo.Check.Design.AliasUsage,
excluded_namespaces: ~w(Socket Task),
excluded_namespaces: ~w(Import Socket Task),
excluded_lastnames: ~w(Address DateTime Exporter Fetcher Full Instrumenter Name Number Repo Time Unit),
priority: :low},

@ -50,7 +50,13 @@ defmodule Explorer.Chain do
Event type where data is broadcasted whenever data is inserted from chain indexing.
"""
@type chain_event ::
:addresses | :balances | :blocks | :exchange_rate | :internal_transactions | :logs | :transactions
:addresses
| :address_coin_balances
| :blocks
| :exchange_rate
| :internal_transactions
| :logs
| :transactions
@type direction :: :from | :to
@ -1443,7 +1449,7 @@ defmodule Explorer.Chain do
"""
@spec subscribe_to_events(chain_event()) :: :ok
def subscribe_to_events(event_type)
when event_type in ~w(addresses balances blocks exchange_rate internal_transactions logs transactions)a do
when event_type in ~w(addresses address_coin_balances blocks exchange_rate internal_transactions logs transactions)a do
Registry.register(Registry.ChainEvents, event_type, [])
:ok
end

@ -14,24 +14,19 @@ defmodule Explorer.Chain.Import do
Address.TokenBalance,
Block,
Hash,
Import,
InternalTransaction,
Log,
Token,
TokenTransfer,
Transaction,
Wei
Transaction
}
alias Explorer.Chain.Import.Addresses
alias Explorer.Repo
@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all
@type params :: [map()]
@type balances_options :: %{
required(:params) => params,
optional(:timeout) => timeout
}
@type blocks_options :: %{
required(:params) => params,
optional(:timeout) => timeout
@ -76,8 +71,8 @@ defmodule Explorer.Chain.Import do
optional(:timeout) => timeout
}
@type all_options :: %{
optional(:addresses) => Addresses.options,
optional(:balances) => balances_options,
optional(:addresses) => Import.Addresses.options(),
optional(:address_coin_balances) => Import.Address.CoinBalances.options(),
optional(:blocks) => blocks_options,
optional(:block_second_degree_relations) => block_second_degree_relations_options,
optional(:broadcast) => boolean,
@ -94,10 +89,8 @@ defmodule Explorer.Chain.Import do
@type all_result ::
{:ok,
%{
optional(:addresses) => Addresses.imported(),
optional(:balances) => [
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}
],
optional(:addresses) => Import.Addresses.imported(),
optional(:address_coin_balances) => Import.Address.CoinBalances.imported(),
optional(:blocks) => [Block.t()],
optional(:block_second_degree_relations) => [
%{required(:nephew_hash) => Hash.Full.t(), required(:uncle_hash) => Hash.Full.t()}
@ -125,7 +118,6 @@ defmodule Explorer.Chain.Import do
@transaction_timeout 120_000
@insert_balances_timeout 60_000
@insert_blocks_timeout 60_000
@insert_block_second_degree_relations_timeout 60_000
@insert_internal_transactions_timeout 60_000
@ -144,11 +136,11 @@ defmodule Explorer.Chain.Import do
| Key | Value Type | Value Description |
|----------------------------------|-------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|
| `:addresses` | `[Explorer.Chain.Address.t()]` | List of `t:Explorer.Chain.Address.t/0`s |
| `:balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of `t:Explorer.Chain.Address.t/0`s |
| `:address_coin_balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of maps of the `t:Explorer.Chain.Address.CoinBalance.t/0` `address_hash` and `block_number` |
| `:blocks` | `[Explorer.Chain.Block.t()]` | List of `t:Explorer.Chain.Block.t/0`s |
| `:internal_transactions` | `[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash` |
| `:logs` | `[Explorer.Chain.Log.t()]` | List of `t:Explorer.Chain.Log.t/0`s |
| `:token_transfers` | `[Explorer.Chain.TokenTransfer.t()]` | List of `t:Explorer.Chain.TokenTransfer.t/0`s |
| `:token_transfers` | `[Explorer.Chain.TokenTransfer.t()]` | List of `t:Explorer.Chain.TokenTransfer.t/0`s |
| `:tokens` | `[Explorer.Chain.Token.t()]` | List of `t:Explorer.Chain.token.t/0`s |
| `:transactions` | `[Explorer.Chain.Hash.t()]` | List of `t:Explorer.Chain.Transaction.t/0` `hash` |
| `:transaction_forks` | `[%{uncle_hash: Explorer.Chain.Hash.t(), hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.Transaction.Fork.t/0` `uncle_hash` and `hash` |
@ -173,11 +165,12 @@ defmodule Explorer.Chain.Import do
* `:addresses`
* `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`.
* `:timeout` - the timeout for inserting all addresses. Defaults to `#{Addresses.timeout()}` milliseconds.
* `:timeout` - the timeout for inserting all addresses. Defaults to `#{Import.Addresses.timeout()}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`.
* `:balances`
* `:address_coin_balances`
* `:params` - `list` of params for `Explorer.Chain.Address.CoinBalance.changeset/2`.
* `:timeout` - the timeout for inserting all balances. Defaults to `#{@insert_balances_timeout}` milliseconds.
* `:timeout` - the timeout for inserting all balances. Defaults to `#{Import.Address.CoinBalances.timeout()}`
milliseconds.
* `:blocks`
* `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`.
* `:timeout` - the timeout for inserting all blocks. Defaults to `#{@insert_blocks_timeout}` milliseconds.
@ -236,7 +229,7 @@ defmodule Explorer.Chain.Import do
defp broadcast_events(data) do
for {event_type, event_data} <- data,
event_type in ~w(addresses balances blocks internal_transactions logs transactions)a do
event_type in ~w(addresses address_coin_balances blocks internal_transactions logs transactions)a do
broadcast_event_data(event_type, event_data)
end
end
@ -298,7 +291,7 @@ defmodule Explorer.Chain.Import do
@import_option_key_to_ecto_schema_module %{
addresses: Address,
balances: CoinBalance,
address_coin_balances: CoinBalance,
blocks: Block,
block_second_degree_relations: Block.SecondDegreeRelation,
internal_transactions: InternalTransaction,
@ -316,8 +309,8 @@ defmodule Explorer.Chain.Import do
full_options = Map.put(options, :timestamps, timestamps)
Multi.new()
|> Addresses.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_balances(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Addresses.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Address.CoinBalances.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_blocks(ecto_schema_module_to_changes_list_map, full_options)
|> run_block_second_degree_relations(ecto_schema_module_to_changes_list_map, full_options)
|> run_transactions(ecto_schema_module_to_changes_list_map, full_options)
@ -329,27 +322,6 @@ defmodule Explorer.Chain.Import do
|> run_token_balances(ecto_schema_module_to_changes_list_map, full_options)
end
defp run_balances(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{CoinBalance => balances_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :balances, fn _ ->
insert_balances(
balances_changes,
%{
timeout: options[:balances][:timeout] || @insert_balances_timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
defp run_blocks(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
@ -576,73 +548,6 @@ defmodule Explorer.Chain.Import do
end
end
@spec insert_balances(
[
%{
required(:address_hash) => Hash.Address.t(),
required(:block_number) => Block.block_number(),
required(:value) => Wei.t()
}
],
%{
required(:timeout) => timeout,
required(:timestamps) => timestamps
}
) ::
{:ok, [%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}]}
| {:error, [Changeset.t()]}
defp insert_balances(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number})
{:ok, _} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:address_hash, :block_number],
on_conflict:
from(
balance in CoinBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
balance.value_fetched_at,
balance.value_fetched_at,
balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
balance.value_fetched_at,
balance.value_fetched_at,
balance.value_fetched_at
)
]
]
),
for: CoinBalance,
timeout: timeout,
timestamps: timestamps
)
{:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash block_number)a))}
end
@spec insert_blocks([map()], %{required(:timeout) => timeout, required(:timestamps) => timestamps}) ::
{:ok, [Block.t()]} | {:error, [Changeset.t()]}
defp insert_blocks(changes_list, %{timeout: timeout, timestamps: timestamps})

@ -0,0 +1,114 @@
defmodule Explorer.Chain.Import.Address.CoinBalances do
@moduledoc """
Bulk imports `t:Explorer.Chain.Address.CoinBalance.t/0`.
"""
require Ecto.Query
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.Address.CoinBalance
alias Explorer.Chain.{Block, Hash, Import, Wei}
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}
]
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{CoinBalance => balances_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :address_coin_balances, fn _ ->
insert(
balances_changes,
%{
timeout: options[:address_coin_balances][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
def timeout, do: @timeout
@spec insert(
[
%{
required(:address_hash) => Hash.Address.t(),
required(:block_number) => Block.block_number(),
required(:value) => Wei.t()
}
],
%{
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}
) ::
{:ok, [%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}]}
| {:error, [Changeset.t()]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number})
{:ok, _} =
Import.insert_changes_list(
ordered_changes_list,
conflict_target: [:address_hash, :block_number],
on_conflict:
from(
balance in CoinBalance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value
ELSE
?
END
""",
balance.value_fetched_at,
balance.value_fetched_at,
balance.value
),
value_fetched_at:
fragment(
"""
CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN
EXCLUDED.value_fetched_at
ELSE
?
END
""",
balance.value_fetched_at,
balance.value_fetched_at,
balance.value_fetched_at
)
]
]
),
for: CoinBalance,
timeout: timeout,
timestamps: timestamps
)
{:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash block_number)a))}
end
end

@ -6,7 +6,7 @@ defmodule Explorer.Chain.Import.Addresses do
require Ecto.Query
alias Ecto.Multi
alias Explorer.Chain.{Address, Import}
alias Explorer.Chain.{Address, Hash, Import}
import Ecto.Query, only: [from: 2]
@ -21,7 +21,7 @@ defmodule Explorer.Chain.Import.Addresses do
}
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Address => addresses_changes} ->
timestamps = Map.fetch!(options, :timestamps)

@ -779,7 +779,7 @@ defmodule Explorer.Chain.ImportTest do
addresses: %{
params: [%{hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b"}]
},
balances: %{
address_coin_balances: %{
params: [%{address_hash: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b", block_number: 1}],
timeout: 5
}
@ -895,7 +895,7 @@ defmodule Explorer.Chain.ImportTest do
%{hash: "0xfa52274dd61e1643d2205169732f29114bc240b3"}
]
},
balances: %{
address_coin_balances: %{
params: [
%{
address_hash: "0x1c0fa194a9d3b44313dcd849f3c6be6ad270a0a4",
@ -1474,7 +1474,7 @@ defmodule Explorer.Chain.ImportTest do
assert {:ok,
%{
addresses: _,
balances: _,
address_coin_balances: _,
blocks: _,
block_second_degree_relations: _,
internal_transactions: _,
@ -1496,7 +1496,7 @@ defmodule Explorer.Chain.ImportTest do
],
timeout: 1
},
balances: %{
address_coin_balances: %{
params: [
%{address_hash: miner_hash, block_number: block_number, value: nil},
%{address_hash: uncle_miner_hash, block_number: block_number, value: nil}

@ -49,7 +49,7 @@ defmodule Indexer.AddressExtraction do
"""
@entity_to_address_map %{
balances: [
address_coin_balances: [
[
%{from: :address_hash, to: :hash},
%{from: :block_number, to: :fetched_coin_balance_block_number},
@ -320,7 +320,7 @@ defmodule Indexer.AddressExtraction do
cause an error as something has gone terribly wrong with the chain if different code is written to the same address.
"""
@spec extract_addresses(%{
optional(:balances) => [
optional(:address_coin_balances) => [
%{
required(:address_hash) => String.t(),
required(:block_number) => non_neg_integer(),
@ -409,7 +409,7 @@ defmodule Indexer.AddressExtraction do
end
end
# Ensure that when `:addresses` or `:balances` are present, their :fetched_coin_balance will win
# Ensure that when `:addresses` or `:address_coin_balances` are present, their :fetched_coin_balance will win
defp merge_addresses(%{hash: hash} = first, %{hash: hash} = second) do
case {first[:fetched_coin_balance], second[:fetched_coin_balance]} do
{nil, nil} ->

@ -23,8 +23,8 @@ defmodule Indexer.Block.Fetcher do
%{
address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number,
transaction_hash_to_block_number_option: transaction_hash_to_block_number,
addresses: Import.addresses_options(),
balances: Import.balances_options(),
addresses: Import.Addresses.options(),
address_coin_balances: Import.Address.CoinBalances.options(),
blocks: Import.blocks_options(),
block_second_degree_relations: Import.block_second_degree_relations_options(),
broadcast: boolean,
@ -120,7 +120,7 @@ defmodule Indexer.Block.Fetcher do
state,
%{
addresses: %{params: addresses},
balances: %{params: coin_balances_params_set},
address_coin_balances: %{params: coin_balances_params_set},
token_balances: %{params: token_balances},
blocks: %{params: blocks},
block_second_degree_relations: %{params: block_second_degree_relations},

@ -78,7 +78,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
block_fetcher,
%{
address_hash_to_fetched_balance_block_number: address_hash_to_block_number,
balances: %{params: balance_params},
address_coin_balances: %{params: address_coin_balances_params},
addresses: %{params: addresses_params},
transactions: %{params: transactions_params},
token_balances: %{params: token_balances_params}
@ -97,7 +97,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
balances(block_fetcher, %{
address_hash_to_block_number: address_hash_to_block_number,
addresses_params: internal_transactions_addresses_params,
balances_params: balance_params
balances_params: address_coin_balances_params
}),
{:ok, token_balances} <- TokenBalances.fetch_token_balances_from_blockchain(token_balances_params),
chain_import_options =
@ -105,7 +105,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
|> Map.drop(@import_options)
|> put_in([:addresses, :params], balances_addresses_params)
|> put_in([:blocks, :params, Access.all(), :consensus], true)
|> put_in([Access.key(:balances, %{}), :params], balances_params)
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params)
|> put_in([Access.key(:token_balances), :params], token_balances),
{:ok, imported} = ok <- Chain.import(chain_import_options) do
@ -247,7 +247,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
|> fetch_balances_params_list()
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) do
merged_addresses_params =
%{balances: fetched_balances_params}
%{address_coin_balances: fetched_balances_params}
|> AddressExtraction.extract_addresses()
|> Kernel.++(addresses_params)
|> AddressExtraction.merge_addresses()

@ -83,7 +83,7 @@ defmodule Indexer.CoinBalance.Fetcher do
{:ok, _} =
Chain.import(%{
addresses: %{params: addresses_params, with: :balance_changeset},
balances: %{params: importable_balances_params}
address_coin_balances: %{params: importable_balances_params}
})
:ok

@ -60,7 +60,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
]
},
address_hash_to_fetched_balance_block_number: %{miner_hash => block_number},
balances: %{
address_coin_balances: %{
params: [
%{
address_hash: miner_hash,

@ -371,7 +371,7 @@ defmodule Indexer.Block.Realtime.FetcherTest do
%Address{hash: fourth_address_hash, fetched_coin_balance_block_number: 3_946_080},
%Address{hash: fifth_address_hash, fetched_coin_balance_block_number: 3_946_079}
],
balances: [
address_coin_balances: [
%{
address_hash: first_address_hash,
block_number: 3_946_079

Loading…
Cancel
Save