parent
95c5856347
commit
14fbf27606
@ -0,0 +1,66 @@ |
||||
defmodule Explorer.Chain.Address.CoinBalanceDaily do |
||||
@moduledoc """ |
||||
Maximum `t:Explorer.Chain.Wei.t/0` `value` of `t:Explorer.Chain.Address.t/0` at the day. |
||||
This table is used to display coinn balance history chart. |
||||
""" |
||||
|
||||
use Explorer.Schema |
||||
|
||||
alias Explorer.Chain.{Address, Hash, Wei} |
||||
alias Explorer.Chain.Address.CoinBalanceDaily |
||||
|
||||
@optional_fields ~w(value)a |
||||
@required_fields ~w(address_hash day)a |
||||
@allowed_fields @optional_fields ++ @required_fields |
||||
|
||||
@typedoc """ |
||||
* `address` - the `t:Explorer.Chain.Address.t/0`. |
||||
* `address_hash` - foreign key for `address`. |
||||
* `day` - the `t:Date.t/0`. |
||||
* `inserted_at` - When the balance was first inserted into the database. |
||||
* `updated_at` - When the balance was last updated. |
||||
* `value` - the max balance (`value`) of `address` during the `day`. |
||||
""" |
||||
@type t :: %__MODULE__{ |
||||
address: %Ecto.Association.NotLoaded{} | Address.t(), |
||||
address_hash: Hash.Address.t(), |
||||
day: Date.t(), |
||||
inserted_at: DateTime.t(), |
||||
updated_at: DateTime.t(), |
||||
value: Wei.t() | nil |
||||
} |
||||
|
||||
@primary_key false |
||||
schema "address_coin_balances_daily" do |
||||
field(:day, :date) |
||||
field(:value, Wei) |
||||
|
||||
timestamps() |
||||
|
||||
belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address) |
||||
end |
||||
|
||||
@doc """ |
||||
Builds an `Ecto.Query` to fetch a series of balances by day for the given account. Each element in the series |
||||
corresponds to the maximum balance in that day. Only the last 90 days of data are used. |
||||
""" |
||||
def balances_by_day(address_hash) do |
||||
CoinBalanceDaily |
||||
|> where([cbd], cbd.address_hash == ^address_hash) |
||||
|> limit_time_interval() |
||||
|> order_by([cbd], cbd.day) |
||||
|> select([cbd], %{date: cbd.day, value: cbd.value}) |
||||
end |
||||
|
||||
def limit_time_interval(query) do |
||||
query |> where([cbd], cbd.day >= fragment("date_trunc('day', now()) - interval '90 days'")) |
||||
end |
||||
|
||||
def changeset(%__MODULE__{} = balance, params) do |
||||
balance |
||||
|> cast(params, @allowed_fields) |
||||
|> validate_required(@required_fields) |
||||
|> foreign_key_constraint(:address_hash) |
||||
|> unique_constraint(:day, name: :address_coin_balances_daily_address_hash_day_index) |
||||
end |
||||
end |
@ -0,0 +1,138 @@ |
||||
defmodule Explorer.Chain.Import.Runner.Address.CoinBalancesDaily do |
||||
@moduledoc """ |
||||
Bulk imports `t:Explorer.Chain.Address.CoinBalancesDaily.t/0`. |
||||
""" |
||||
|
||||
require Ecto.Query |
||||
|
||||
import Ecto.Query, only: [from: 2] |
||||
|
||||
alias Ecto.{Changeset, Multi, Repo} |
||||
alias Explorer.Chain.Address.CoinBalanceDaily |
||||
alias Explorer.Chain.{Hash, Import, Wei} |
||||
|
||||
@behaviour Import.Runner |
||||
|
||||
# milliseconds |
||||
@timeout 60_000 |
||||
|
||||
@type imported :: [ |
||||
%{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()} |
||||
] |
||||
|
||||
@impl Import.Runner |
||||
def ecto_schema_module, do: CoinBalanceDaily |
||||
|
||||
@impl Import.Runner |
||||
def option_key, do: :address_coin_balances_daily |
||||
|
||||
@impl Import.Runner |
||||
def imported_table_row do |
||||
%{ |
||||
value_type: "[%{address_hash: Explorer.Chain.Hash.t(), day: Date.t()}]", |
||||
value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `address_hash` and `day`" |
||||
} |
||||
end |
||||
|
||||
@impl Import.Runner |
||||
def run(multi, changes_list, %{timestamps: timestamps} = options) do |
||||
insert_options = |
||||
options |
||||
|> Map.get(option_key(), %{}) |
||||
|> Map.take(~w(on_conflict timeout)a) |
||||
|> Map.put_new(:timeout, @timeout) |
||||
|> Map.put(:timestamps, timestamps) |
||||
|
||||
Multi.run(multi, :address_coin_balances_daily, fn repo, _ -> |
||||
insert(repo, changes_list, insert_options) |
||||
end) |
||||
end |
||||
|
||||
@impl Import.Runner |
||||
def timeout, do: @timeout |
||||
|
||||
@spec insert( |
||||
Repo.t(), |
||||
[ |
||||
%{ |
||||
required(:address_hash) => Hash.Address.t(), |
||||
required(:day) => Date.t(), |
||||
required(:value) => Wei.t() |
||||
} |
||||
], |
||||
%{ |
||||
optional(:on_conflict) => Import.Runner.on_conflict(), |
||||
required(:timeout) => timeout, |
||||
required(:timestamps) => Import.timestamps() |
||||
} |
||||
) :: |
||||
{:ok, [%{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()}]} |
||||
| {:error, [Changeset.t()]} |
||||
defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do |
||||
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) |
||||
|
||||
combined_changes_list = |
||||
changes_list |
||||
|> Enum.reduce([], fn change, acc -> |
||||
if Enum.empty?(acc) do |
||||
[change | acc] |
||||
else |
||||
target_item = |
||||
Enum.find(acc, fn item -> |
||||
item.day == change.day && item.address_hash == change.address_hash |
||||
end) |
||||
|
||||
if target_item do |
||||
if change.value > target_item.value do |
||||
acc_updated = List.delete(acc, target_item) |
||||
[change | acc_updated] |
||||
else |
||||
acc |
||||
end |
||||
else |
||||
[change | acc] |
||||
end |
||||
end |
||||
end) |
||||
|
||||
# Enforce CoinBalanceDaily ShareLocks order (see docs: sharelocks.md) |
||||
ordered_changes_list = Enum.sort_by(combined_changes_list, &{&1.address_hash, &1.day}) |
||||
|
||||
{:ok, _} = |
||||
Import.insert_changes_list( |
||||
repo, |
||||
ordered_changes_list, |
||||
conflict_target: [:address_hash, :day], |
||||
on_conflict: on_conflict, |
||||
for: CoinBalanceDaily, |
||||
timeout: timeout, |
||||
timestamps: timestamps |
||||
) |
||||
|
||||
{:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash day)a))} |
||||
end |
||||
|
||||
def default_on_conflict do |
||||
from( |
||||
balance in CoinBalanceDaily, |
||||
update: [ |
||||
set: [ |
||||
value: |
||||
fragment( |
||||
""" |
||||
CASE WHEN EXCLUDED.value IS NOT NULL THEN |
||||
EXCLUDED.value |
||||
ELSE |
||||
? |
||||
END |
||||
""", |
||||
balance.value |
||||
), |
||||
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at), |
||||
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at) |
||||
] |
||||
], |
||||
where: fragment("EXCLUDED.value IS NOT NULL") |
||||
) |
||||
end |
||||
end |
@ -0,0 +1,17 @@ |
||||
defmodule Explorer.Repo.Migrations.AddressCoinBalancesDaily do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
create table(:address_coin_balances_daily, primary_key: false) do |
||||
add(:address_hash, references(:addresses, column: :hash, type: :bytea), null: false) |
||||
add(:day, :date, null: false) |
||||
|
||||
# null until fetched |
||||
add(:value, :numeric, precision: 100, default: fragment("NULL"), null: true) |
||||
|
||||
timestamps(null: false, type: :utc_datetime_usec) |
||||
end |
||||
|
||||
create(unique_index(:address_coin_balances_daily, [:address_hash, :day])) |
||||
end |
||||
end |
@ -0,0 +1,226 @@ |
||||
defmodule Indexer.Fetcher.CoinBalanceDaily do |
||||
@moduledoc """ |
||||
Fetches `t:Explorer.Chain.Address.CoinBalanceDaily.t/0`. |
||||
""" |
||||
|
||||
use Indexer.Fetcher |
||||
use Spandex.Decorators |
||||
|
||||
require Logger |
||||
|
||||
import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1] |
||||
|
||||
alias EthereumJSONRPC.{Blocks, FetchedBalances} |
||||
alias Explorer.Chain |
||||
alias Explorer.Chain.Cache.Accounts |
||||
alias Explorer.Chain.Hash |
||||
alias Indexer.{BufferedTask, Tracer} |
||||
|
||||
@behaviour BufferedTask |
||||
|
||||
@defaults [ |
||||
flush_interval: :timer.seconds(3), |
||||
max_batch_size: 500, |
||||
max_concurrency: 4, |
||||
task_supervisor: Indexer.Fetcher.CoinBalanceDaily.TaskSupervisor, |
||||
metadata: [fetcher: :coin_balance_daily] |
||||
] |
||||
|
||||
@doc """ |
||||
Asynchronously fetches balances for each address `hash` at the `day`. |
||||
""" |
||||
@spec async_fetch_balances([ |
||||
%{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()} |
||||
]) :: :ok |
||||
def async_fetch_balances(balance_fields) when is_list(balance_fields) do |
||||
entries = Enum.map(balance_fields, &entry/1) |
||||
|
||||
BufferedTask.buffer(__MODULE__, entries) |
||||
end |
||||
|
||||
@doc false |
||||
# credo:disable-for-next-line Credo.Check.Design.DuplicatedCode |
||||
def child_spec([init_options, gen_server_options]) do |
||||
{state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) |
||||
|
||||
unless state do |
||||
raise ArgumentError, |
||||
":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> |
||||
"to allow for json_rpc calls when running." |
||||
end |
||||
|
||||
merged_init_options = |
||||
@defaults |
||||
|> Keyword.merge(mergeable_init_options) |
||||
|> Keyword.put(:state, state) |
||||
|
||||
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__) |
||||
end |
||||
|
||||
@impl BufferedTask |
||||
def init(initial, reducer, _) do |
||||
{:ok, final} = |
||||
Chain.stream_unfetched_balances(initial, fn address_fields, acc -> |
||||
address_fields |
||||
|> entry() |
||||
|> reducer.(acc) |
||||
end) |
||||
|
||||
final |
||||
end |
||||
|
||||
@impl BufferedTask |
||||
@decorate trace(name: "fetch", resource: "Indexer.Fetcher.CoinBalanceDaily.run/2", service: :indexer, tracer: Tracer) |
||||
def run(entries, json_rpc_named_arguments) do |
||||
# the same address may be used more than once in the same block, but we only want one `Balance` for a given |
||||
# `{address, block}`, so take unique params only |
||||
unique_entries = Enum.uniq(entries) |
||||
|
||||
unique_filtered_entries = |
||||
Enum.filter(unique_entries, fn {_hash, block_number} -> |
||||
block_number > first_block_to_index() |
||||
end) |
||||
|
||||
unique_entry_count = Enum.count(unique_filtered_entries) |
||||
Logger.metadata(count: unique_entry_count) |
||||
|
||||
Logger.debug(fn -> "fetching" end) |
||||
|
||||
unique_filtered_entries |
||||
|> Enum.map(&entry_to_params/1) |
||||
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) |
||||
|> case do |
||||
{:ok, fetched_balances} -> |
||||
run_fetched_balances(fetched_balances, unique_entries) |
||||
|
||||
{:error, reason} -> |
||||
Logger.error( |
||||
fn -> |
||||
["failed to fetch: ", inspect(reason)] |
||||
end, |
||||
error_count: unique_entry_count |
||||
) |
||||
|
||||
{:retry, unique_entries} |
||||
end |
||||
end |
||||
|
||||
defp first_block_to_index do |
||||
string_value = Application.get_env(:indexer, :first_block) |
||||
|
||||
case Integer.parse(string_value) do |
||||
{integer, ""} -> integer |
||||
_ -> 0 |
||||
end |
||||
end |
||||
|
||||
defp entry_to_params({address_hash_bytes, block_number}) when is_integer(block_number) do |
||||
{:ok, address_hash} = Hash.Address.cast(address_hash_bytes) |
||||
%{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address_hash)} |
||||
end |
||||
|
||||
defp entry(%{address_hash: %Hash{bytes: address_hash_bytes}, block_number: block_number}) do |
||||
{address_hash_bytes, block_number} |
||||
end |
||||
|
||||
def balances_params_to_address_params(balances_params) do |
||||
balances_params |
||||
|> Enum.group_by(fn %{address_hash: address_hash} -> address_hash end) |
||||
|> Map.values() |
||||
|> Stream.map(&Enum.max_by(&1, fn %{block_number: block_number} -> block_number end)) |
||||
|> Enum.map(fn %{address_hash: address_hash, block_number: block_number, value: value} -> |
||||
%{ |
||||
hash: address_hash, |
||||
fetched_coin_balance_block_number: block_number, |
||||
fetched_coin_balance: value |
||||
} |
||||
end) |
||||
end |
||||
|
||||
def import_fetched_balances(%FetchedBalances{params_list: params_list}, broadcast_type \\ false) do |
||||
day = |
||||
if Enum.empty?(params_list) do |
||||
nil |
||||
else |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
block_number = Enum.at(params_list, 0)[:block_number] |
||||
|
||||
{:ok, %Blocks{blocks_params: blocks_params}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
block_timestamp = Enum.at(blocks_params, 0).timestamp |
||||
|
||||
DateTime.to_date(block_timestamp) |
||||
end |
||||
|
||||
importable_balances_daily_params = |
||||
Enum.map(params_list, fn param -> |
||||
param |
||||
|> Map.put(:day, day) |
||||
end) |
||||
|
||||
addresses_params = balances_params_to_address_params(importable_balances_daily_params) |
||||
|
||||
Chain.import(%{ |
||||
addresses: %{params: addresses_params, with: :balance_changeset}, |
||||
address_coin_balances_daily: %{params: importable_balances_daily_params}, |
||||
broadcast: broadcast_type |
||||
}) |
||||
end |
||||
|
||||
defp run_fetched_balances(%FetchedBalances{errors: errors} = fetched_balances, _) do |
||||
{:ok, imported} = import_fetched_balances(fetched_balances) |
||||
|
||||
Accounts.drop(imported[:addresses]) |
||||
|
||||
retry(errors) |
||||
end |
||||
|
||||
defp retry([]), do: :ok |
||||
|
||||
defp retry(errors) when is_list(errors) do |
||||
retried_entries = fetched_balances_errors_to_entries(errors) |
||||
|
||||
Logger.error( |
||||
fn -> |
||||
[ |
||||
"failed to fetch: ", |
||||
fetched_balance_errors_to_iodata(errors) |
||||
] |
||||
end, |
||||
error_count: Enum.count(retried_entries) |
||||
) |
||||
|
||||
{:retry, retried_entries} |
||||
end |
||||
|
||||
defp fetched_balances_errors_to_entries(errors) when is_list(errors) do |
||||
Enum.map(errors, &fetched_balance_error_to_entry/1) |
||||
end |
||||
|
||||
defp fetched_balance_error_to_entry(%{data: %{block_quantity: block_quantity, day: day, hash_data: hash_data}}) |
||||
when is_binary(block_quantity) and is_binary(hash_data) do |
||||
{:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data) |
||||
block_number = quantity_to_integer(block_quantity) |
||||
{address_hash_bytes, block_number, day} |
||||
end |
||||
|
||||
defp fetched_balance_errors_to_iodata(errors) when is_list(errors) do |
||||
fetched_balance_errors_to_iodata(errors, []) |
||||
end |
||||
|
||||
defp fetched_balance_errors_to_iodata([], iodata), do: iodata |
||||
|
||||
defp fetched_balance_errors_to_iodata([error | errors], iodata) do |
||||
fetched_balance_errors_to_iodata(errors, [iodata | fetched_balance_error_to_iodata(error)]) |
||||
end |
||||
|
||||
defp fetched_balance_error_to_iodata(%{ |
||||
code: code, |
||||
message: message, |
||||
data: %{day: day, hash_data: hash_data} |
||||
}) |
||||
when is_integer(code) and is_binary(message) and is_binary(day) and is_binary(hash_data) do |
||||
[hash_data, "@", day, ": (", to_string(code), ") ", message, ?\n] |
||||
end |
||||
end |
@ -0,0 +1,136 @@ |
||||
defmodule Indexer.Transform.AddressCoinBalancesDaily do |
||||
@moduledoc """ |
||||
Extracts `Explorer.Chain.Address.CoinBalanceDaily` params from other schema's params. |
||||
""" |
||||
|
||||
alias EthereumJSONRPC.Blocks |
||||
|
||||
def params_set(%{} = import_options) do |
||||
Enum.reduce(import_options, MapSet.new(), &reducer/2) |
||||
end |
||||
|
||||
defp reducer({:beneficiary_params, beneficiary_params}, acc) when is_list(beneficiary_params) do |
||||
Enum.into(beneficiary_params, acc, fn %{ |
||||
address_hash: address_hash, |
||||
block_number: block_number |
||||
} |
||||
when is_binary(address_hash) and is_integer(block_number) -> |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
{:ok, %Blocks{blocks_params: blocks_params}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
block_timestamp = Enum.at(blocks_params, 0).timestamp |
||||
day = DateTime.to_date(block_timestamp) |
||||
|
||||
%{address_hash: address_hash, day: day} |
||||
end) |
||||
end |
||||
|
||||
defp reducer({:blocks_params, blocks_params}, acc) when is_list(blocks_params) do |
||||
# a block MUST have a miner_hash and number |
||||
Enum.into(blocks_params, acc, fn %{miner_hash: address_hash, number: block_number, timestamp: block_timestamp} |
||||
when is_binary(address_hash) and is_integer(block_number) -> |
||||
day = DateTime.to_date(block_timestamp) |
||||
%{address_hash: address_hash, day: day} |
||||
end) |
||||
end |
||||
|
||||
defp reducer({:internal_transactions_params, internal_transactions_params}, initial) |
||||
when is_list(internal_transactions_params) do |
||||
Enum.reduce(internal_transactions_params, initial, &internal_transactions_params_reducer/2) |
||||
end |
||||
|
||||
defp reducer({:logs_params, logs_params}, acc) when is_list(logs_params) do |
||||
# a log MUST have address_hash and block_number |
||||
logs_params |
||||
|> Enum.into(acc, fn |
||||
%{address_hash: address_hash, block_number: block_number} |
||||
when is_binary(address_hash) and is_integer(block_number) -> |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
{:ok, %Blocks{blocks_params: blocks_params}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
block_timestamp = Enum.at(blocks_params, 0).timestamp |
||||
day = DateTime.to_date(block_timestamp) |
||||
%{address_hash: address_hash, day: day} |
||||
|
||||
%{type: "pending"} -> |
||||
nil |
||||
end) |
||||
|> Enum.reject(fn val -> is_nil(val) end) |
||||
|> MapSet.new() |
||||
end |
||||
|
||||
defp reducer({:transactions_params, transactions_params}, initial) when is_list(transactions_params) do |
||||
Enum.reduce(transactions_params, initial, &transactions_params_reducer/2) |
||||
end |
||||
|
||||
defp reducer({:block_second_degree_relations_params, block_second_degree_relations_params}, initial) |
||||
when is_list(block_second_degree_relations_params), |
||||
do: initial |
||||
|
||||
defp internal_transactions_params_reducer( |
||||
%{block_number: block_number} = internal_transaction_params, |
||||
acc |
||||
) |
||||
when is_integer(block_number) do |
||||
case internal_transaction_params do |
||||
%{type: "call"} -> |
||||
acc |
||||
|
||||
%{type: "create", error: _} -> |
||||
acc |
||||
|
||||
%{type: "create", created_contract_address_hash: address_hash} when is_binary(address_hash) -> |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
{:ok, %Blocks{blocks_params: blocks_params}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
block_timestamp = Enum.at(blocks_params, 0).timestamp |
||||
day = DateTime.to_date(block_timestamp) |
||||
MapSet.put(acc, %{address_hash: address_hash, day: day}) |
||||
|
||||
%{type: "selfdestruct", from_address_hash: from_address_hash, to_address_hash: to_address_hash} |
||||
when is_binary(from_address_hash) and is_binary(to_address_hash) -> |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
{:ok, %Blocks{blocks_params: blocks_params}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
block_timestamp = Enum.at(blocks_params, 0).timestamp |
||||
day = DateTime.to_date(block_timestamp) |
||||
|
||||
acc |
||||
|> MapSet.put(%{address_hash: from_address_hash, day: day}) |
||||
|> MapSet.put(%{address_hash: to_address_hash, day: day}) |
||||
end |
||||
end |
||||
|
||||
defp transactions_params_reducer( |
||||
%{block_number: block_number, from_address_hash: from_address_hash} = transaction_params, |
||||
initial |
||||
) |
||||
when is_binary(from_address_hash) do |
||||
# a transaction MUST have a `from_address_hash` |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
{:ok, %Blocks{blocks_params: blocks_params}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
block_timestamp = Enum.at(blocks_params, 0).timestamp |
||||
day = DateTime.to_date(block_timestamp) |
||||
acc = MapSet.put(initial, %{address_hash: from_address_hash, day: day}) |
||||
|
||||
# `to_address_hash` is optional |
||||
case transaction_params do |
||||
%{to_address_hash: to_address_hash} when is_binary(to_address_hash) -> |
||||
MapSet.put(acc, %{address_hash: to_address_hash, day: day}) |
||||
|
||||
_ -> |
||||
acc |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue