From 14fbf27606e251cc2ed31f903048d0d55a9ec396 Mon Sep 17 00:00:00 2001 From: Victor Baranov Date: Mon, 25 May 2020 18:47:57 +0300 Subject: [PATCH] Store ddresses coin balances changes daily --- CHANGELOG.md | 1 + apps/explorer/lib/explorer/chain.ex | 16 +- .../chain/address/coin_balance_daily.ex | 66 +++++ .../runner/address/coin_balances_daily.ex | 138 +++++++++++ .../chain/import/stage/address_referencing.ex | 3 +- ...0525115811_address_coin_balances_daily.exs | 17 ++ apps/indexer/lib/indexer/block/fetcher.ex | 19 +- .../lib/indexer/block/realtime/fetcher.ex | 23 +- .../lib/indexer/fetcher/block_reward.ex | 3 +- .../lib/indexer/fetcher/coin_balance.ex | 18 +- .../lib/indexer/fetcher/coin_balance_daily.ex | 226 ++++++++++++++++++ apps/indexer/lib/indexer/supervisor.ex | 3 + .../transform/address_coin_balances_daily.ex | 136 +++++++++++ 13 files changed, 643 insertions(+), 26 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex create mode 100644 apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex create mode 100644 apps/explorer/priv/repo/migrations/20200525115811_address_coin_balances_daily.exs create mode 100644 apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex create mode 100644 apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index 91567e98d2..dd2cf1adb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Features ### Fixes +- [#3125](https://github.com/poanetwork/blockscout/pull/3125) - Fix performance of coin balance history chart - [#3122](https://github.com/poanetwork/blockscout/pull/3122) - Exclude balance percentage calculation for burn address on accounts page - [#3121](https://github.com/poanetwork/blockscout/pull/3121) - Geth: handle response from eth_getblockbyhash JSON RPC method without totalDifficulty (uncle blocks) - [#3119](https://github.com/poanetwork/blockscout/pull/3119), [#3120](https://github.com/poanetwork/blockscout/pull/3120) - Fix performance of Inventory tab loading for ERC-721 tokens diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 79650b48e7..df569c4f88 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -33,6 +33,7 @@ defmodule Explorer.Chain do alias Explorer.Chain.{ Address, Address.CoinBalance, + Address.CoinBalanceDaily, Address.CurrentTokenBalance, Address.TokenBalance, Block, @@ -3488,25 +3489,12 @@ defmodule Explorer.Chain do @spec address_to_balances_by_day(Hash.Address.t()) :: [balance_by_day] def address_to_balances_by_day(address_hash) do - latest_block_timestamp = - address_hash - |> CoinBalance.last_coin_balance_timestamp() - |> Repo.one() - address_hash - |> CoinBalance.balances_by_day(latest_block_timestamp) + |> CoinBalanceDaily.balances_by_day() |> Repo.all() - |> replace_last_value(latest_block_timestamp) |> normalize_balances_by_day() end - # https://github.com/poanetwork/blockscout/issues/2658 - defp replace_last_value(items, %{value: value, timestamp: timestamp}) do - List.replace_at(items, -1, %{date: Date.convert!(timestamp, Calendar.ISO), value: value}) - end - - defp replace_last_value(items, _), do: items - defp normalize_balances_by_day(balances_by_day) do result = balances_by_day diff --git a/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex b/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex new file mode 100644 index 0000000000..063f1d8235 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex @@ -0,0 +1,66 @@ +defmodule Explorer.Chain.Address.CoinBalanceDaily do + @moduledoc """ + Maximum `t:Explorer.Chain.Wei.t/0` `value` of `t:Explorer.Chain.Address.t/0` at the day. + This table is used to display coinn balance history chart. + """ + + use Explorer.Schema + + alias Explorer.Chain.{Address, Hash, Wei} + alias Explorer.Chain.Address.CoinBalanceDaily + + @optional_fields ~w(value)a + @required_fields ~w(address_hash day)a + @allowed_fields @optional_fields ++ @required_fields + + @typedoc """ + * `address` - the `t:Explorer.Chain.Address.t/0`. + * `address_hash` - foreign key for `address`. + * `day` - the `t:Date.t/0`. + * `inserted_at` - When the balance was first inserted into the database. + * `updated_at` - When the balance was last updated. + * `value` - the max balance (`value`) of `address` during the `day`. + """ + @type t :: %__MODULE__{ + address: %Ecto.Association.NotLoaded{} | Address.t(), + address_hash: Hash.Address.t(), + day: Date.t(), + inserted_at: DateTime.t(), + updated_at: DateTime.t(), + value: Wei.t() | nil + } + + @primary_key false + schema "address_coin_balances_daily" do + field(:day, :date) + field(:value, Wei) + + timestamps() + + belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address) + end + + @doc """ + Builds an `Ecto.Query` to fetch a series of balances by day for the given account. Each element in the series + corresponds to the maximum balance in that day. Only the last 90 days of data are used. + """ + def balances_by_day(address_hash) do + CoinBalanceDaily + |> where([cbd], cbd.address_hash == ^address_hash) + |> limit_time_interval() + |> order_by([cbd], cbd.day) + |> select([cbd], %{date: cbd.day, value: cbd.value}) + end + + def limit_time_interval(query) do + query |> where([cbd], cbd.day >= fragment("date_trunc('day', now()) - interval '90 days'")) + end + + def changeset(%__MODULE__{} = balance, params) do + balance + |> cast(params, @allowed_fields) + |> validate_required(@required_fields) + |> foreign_key_constraint(:address_hash) + |> unique_constraint(:day, name: :address_coin_balances_daily_address_hash_day_index) + end +end diff --git a/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex b/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex new file mode 100644 index 0000000000..e9e747c069 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex @@ -0,0 +1,138 @@ +defmodule Explorer.Chain.Import.Runner.Address.CoinBalancesDaily do + @moduledoc """ + Bulk imports `t:Explorer.Chain.Address.CoinBalancesDaily.t/0`. + """ + + require Ecto.Query + + import Ecto.Query, only: [from: 2] + + alias Ecto.{Changeset, Multi, Repo} + alias Explorer.Chain.Address.CoinBalanceDaily + alias Explorer.Chain.{Hash, Import, Wei} + + @behaviour Import.Runner + + # milliseconds + @timeout 60_000 + + @type imported :: [ + %{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()} + ] + + @impl Import.Runner + def ecto_schema_module, do: CoinBalanceDaily + + @impl Import.Runner + def option_key, do: :address_coin_balances_daily + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[%{address_hash: Explorer.Chain.Hash.t(), day: Date.t()}]", + value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `address_hash` and `day`" + } + end + + @impl Import.Runner + def run(multi, changes_list, %{timestamps: timestamps} = options) do + insert_options = + options + |> Map.get(option_key(), %{}) + |> Map.take(~w(on_conflict timeout)a) + |> Map.put_new(:timeout, @timeout) + |> Map.put(:timestamps, timestamps) + + Multi.run(multi, :address_coin_balances_daily, fn repo, _ -> + insert(repo, changes_list, insert_options) + end) + end + + @impl Import.Runner + def timeout, do: @timeout + + @spec insert( + Repo.t(), + [ + %{ + required(:address_hash) => Hash.Address.t(), + required(:day) => Date.t(), + required(:value) => Wei.t() + } + ], + %{ + optional(:on_conflict) => Import.Runner.on_conflict(), + required(:timeout) => timeout, + required(:timestamps) => Import.timestamps() + } + ) :: + {:ok, [%{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()}]} + | {:error, [Changeset.t()]} + defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + + combined_changes_list = + changes_list + |> Enum.reduce([], fn change, acc -> + if Enum.empty?(acc) do + [change | acc] + else + target_item = + Enum.find(acc, fn item -> + item.day == change.day && item.address_hash == change.address_hash + end) + + if target_item do + if change.value > target_item.value do + acc_updated = List.delete(acc, target_item) + [change | acc_updated] + else + acc + end + else + [change | acc] + end + end + end) + + # Enforce CoinBalanceDaily ShareLocks order (see docs: sharelocks.md) + ordered_changes_list = Enum.sort_by(combined_changes_list, &{&1.address_hash, &1.day}) + + {:ok, _} = + Import.insert_changes_list( + repo, + ordered_changes_list, + conflict_target: [:address_hash, :day], + on_conflict: on_conflict, + for: CoinBalanceDaily, + timeout: timeout, + timestamps: timestamps + ) + + {:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash day)a))} + end + + def default_on_conflict do + from( + balance in CoinBalanceDaily, + update: [ + set: [ + value: + fragment( + """ + CASE WHEN EXCLUDED.value IS NOT NULL THEN + EXCLUDED.value + ELSE + ? + END + """, + balance.value + ), + inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at), + updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at) + ] + ], + where: fragment("EXCLUDED.value IS NOT NULL") + ) + end +end diff --git a/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex b/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex index 2575a0eab8..a428d51b0c 100644 --- a/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex +++ b/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex @@ -14,7 +14,8 @@ defmodule Explorer.Chain.Import.Stage.AddressReferencing do Runner.Address.CoinBalances, Runner.Blocks, Runner.StakingPools, - Runner.StakingPoolsDelegators + Runner.StakingPoolsDelegators, + Runner.Address.CoinBalancesDaily ] @impl Stage diff --git a/apps/explorer/priv/repo/migrations/20200525115811_address_coin_balances_daily.exs b/apps/explorer/priv/repo/migrations/20200525115811_address_coin_balances_daily.exs new file mode 100644 index 0000000000..dc03899679 --- /dev/null +++ b/apps/explorer/priv/repo/migrations/20200525115811_address_coin_balances_daily.exs @@ -0,0 +1,17 @@ +defmodule Explorer.Repo.Migrations.AddressCoinBalancesDaily do + use Ecto.Migration + + def change do + create table(:address_coin_balances_daily, primary_key: false) do + add(:address_hash, references(:addresses, column: :hash, type: :bytea), null: false) + add(:day, :date, null: false) + + # null until fetched + add(:value, :numeric, precision: 100, default: fragment("NULL"), null: true) + + timestamps(null: false, type: :utc_datetime_usec) + end + + create(unique_index(:address_coin_balances_daily, [:address_hash, :day])) + end +end diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 2d4d399eba..c84e9acd48 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -20,6 +20,7 @@ defmodule Indexer.Block.Fetcher do alias Indexer.Fetcher.{ BlockReward, CoinBalance, + CoinBalanceDaily, ContractCode, InternalTransaction, ReplacedTransaction, @@ -55,6 +56,7 @@ defmodule Indexer.Block.Fetcher do address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number, addresses: Import.Runner.options(), address_coin_balances: Import.Runner.options(), + address_coin_balances_daily: Import.Runner.options(), address_token_balances: Import.Runner.options(), blocks: Import.Runner.options(), block_second_degree_relations: Import.Runner.options(), @@ -164,6 +166,7 @@ defmodule Indexer.Block.Fetcher do %{ addresses: %{params: addresses}, address_coin_balances: %{params: coin_balances_params_set}, + address_coin_balances_daily: %{params: coin_balances_params_set}, address_token_balances: %{params: address_token_balances}, blocks: %{params: blocks}, block_second_degree_relations: %{params: block_second_degree_relations_params}, @@ -245,12 +248,18 @@ defmodule Indexer.Block.Fetcher do def async_import_coin_balances(%{addresses: addresses}, %{ address_hash_to_fetched_balance_block_number: address_hash_to_block_number }) do - addresses - |> Enum.map(fn %Address{hash: address_hash} -> - block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash)) - %{address_hash: address_hash, block_number: block_number} - end) + coin_balances_import_params = + addresses + |> Enum.map(fn %Address{hash: address_hash} -> + block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash)) + %{address_hash: address_hash, block_number: block_number} + end) + + coin_balances_import_params |> CoinBalance.async_fetch_balances() + + coin_balances_import_params + |> CoinBalanceDaily.async_fetch_balances() end def async_import_coin_balances(_, _), do: :ok diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 533a79c74c..0533484b7f 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -26,7 +26,7 @@ defmodule Indexer.Block.Realtime.Fetcher do ] alias Ecto.Changeset - alias EthereumJSONRPC.{FetchedBalances, Subscription} + alias EthereumJSONRPC.{Blocks, FetchedBalances, Subscription} alias Explorer.Chain alias Explorer.Chain.Cache.Accounts alias Explorer.Counters.AverageBlockTime @@ -172,6 +172,7 @@ defmodule Indexer.Block.Realtime.Fetcher do block_fetcher, %{ address_coin_balances: %{params: address_coin_balances_params}, + address_coin_balances_daily: %{params: address_coin_balances_daily_params}, address_hash_to_fetched_balance_block_number: address_hash_to_block_number, addresses: %{params: addresses_params}, block_rewards: block_rewards @@ -182,7 +183,8 @@ defmodule Indexer.Block.Realtime.Fetcher do balances(block_fetcher, %{ address_hash_to_block_number: address_hash_to_block_number, addresses_params: addresses_params, - balances_params: address_coin_balances_params + balances_params: address_coin_balances_params, + balances_daily_params: address_coin_balances_daily_params })}, {block_reward_errors, chain_import_block_rewards} = Map.pop(block_rewards, :errors), chain_import_options = @@ -381,7 +383,22 @@ defmodule Indexer.Block.Realtime.Fetcher do importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at)) - {:ok, %{addresses_params: merged_addresses_params, balances_params: importable_balances_params}} + block_number = Enum.at(params_list, 0)[:block_number] + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + + importable_balances_daily_params = Enum.map(params_list, &Map.put(&1, :day, day)) + + {:ok, + %{ + addresses_params: merged_addresses_params, + balances_params: importable_balances_params, + balances_daily_params: importable_balances_daily_params + }} {:error, _} = error -> error diff --git a/apps/indexer/lib/indexer/fetcher/block_reward.ex b/apps/indexer/lib/indexer/fetcher/block_reward.ex index ed593bb655..2075c33d99 100644 --- a/apps/indexer/lib/indexer/fetcher/block_reward.ex +++ b/apps/indexer/lib/indexer/fetcher/block_reward.ex @@ -20,7 +20,7 @@ defmodule Indexer.Fetcher.BlockReward do alias Explorer.Chain.Cache.Accounts alias Indexer.{BufferedTask, Tracer} alias Indexer.Fetcher.BlockReward.Supervisor, as: BlockRewardSupervisor - alias Indexer.Fetcher.CoinBalance + alias Indexer.Fetcher.{CoinBalance, CoinBalanceDaily} alias Indexer.Transform.{AddressCoinBalances, Addresses} @behaviour BufferedTask @@ -135,6 +135,7 @@ defmodule Indexer.Fetcher.BlockReward do Accounts.drop(addresses) CoinBalance.async_fetch_balances(address_coin_balances) + CoinBalanceDaily.async_fetch_balances(address_coin_balances) retry_errors(errors) diff --git a/apps/indexer/lib/indexer/fetcher/coin_balance.ex b/apps/indexer/lib/indexer/fetcher/coin_balance.ex index f8fadfcb53..32f37459a0 100644 --- a/apps/indexer/lib/indexer/fetcher/coin_balance.ex +++ b/apps/indexer/lib/indexer/fetcher/coin_balance.ex @@ -77,12 +77,17 @@ defmodule Indexer.Fetcher.CoinBalance do # `{address, block}`, so take unique params only unique_entries = Enum.uniq(entries) - unique_entry_count = Enum.count(unique_entries) + unique_filtered_entries = + Enum.filter(unique_entries, fn {_hash, block_number} -> + block_number > first_block_to_index() + end) + + unique_entry_count = Enum.count(unique_filtered_entries) Logger.metadata(count: unique_entry_count) Logger.debug(fn -> "fetching" end) - unique_entries + unique_filtered_entries |> Enum.map(&entry_to_params/1) |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) |> case do @@ -101,6 +106,15 @@ defmodule Indexer.Fetcher.CoinBalance do end end + defp first_block_to_index do + string_value = Application.get_env(:indexer, :first_block) + + case Integer.parse(string_value) do + {integer, ""} -> integer + _ -> 0 + end + end + defp entry_to_params({address_hash_bytes, block_number}) when is_integer(block_number) do {:ok, address_hash} = Hash.Address.cast(address_hash_bytes) %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address_hash)} diff --git a/apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex b/apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex new file mode 100644 index 0000000000..f15f599a9b --- /dev/null +++ b/apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex @@ -0,0 +1,226 @@ +defmodule Indexer.Fetcher.CoinBalanceDaily do + @moduledoc """ + Fetches `t:Explorer.Chain.Address.CoinBalanceDaily.t/0`. + """ + + use Indexer.Fetcher + use Spandex.Decorators + + require Logger + + import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1] + + alias EthereumJSONRPC.{Blocks, FetchedBalances} + alias Explorer.Chain + alias Explorer.Chain.Cache.Accounts + alias Explorer.Chain.Hash + alias Indexer.{BufferedTask, Tracer} + + @behaviour BufferedTask + + @defaults [ + flush_interval: :timer.seconds(3), + max_batch_size: 500, + max_concurrency: 4, + task_supervisor: Indexer.Fetcher.CoinBalanceDaily.TaskSupervisor, + metadata: [fetcher: :coin_balance_daily] + ] + + @doc """ + Asynchronously fetches balances for each address `hash` at the `day`. + """ + @spec async_fetch_balances([ + %{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()} + ]) :: :ok + def async_fetch_balances(balance_fields) when is_list(balance_fields) do + entries = Enum.map(balance_fields, &entry/1) + + BufferedTask.buffer(__MODULE__, entries) + end + + @doc false + # credo:disable-for-next-line Credo.Check.Design.DuplicatedCode + def child_spec([init_options, gen_server_options]) do + {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) + + unless state do + raise ArgumentError, + ":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> + "to allow for json_rpc calls when running." + end + + merged_init_options = + @defaults + |> Keyword.merge(mergeable_init_options) + |> Keyword.put(:state, state) + + Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__) + end + + @impl BufferedTask + def init(initial, reducer, _) do + {:ok, final} = + Chain.stream_unfetched_balances(initial, fn address_fields, acc -> + address_fields + |> entry() + |> reducer.(acc) + end) + + final + end + + @impl BufferedTask + @decorate trace(name: "fetch", resource: "Indexer.Fetcher.CoinBalanceDaily.run/2", service: :indexer, tracer: Tracer) + def run(entries, json_rpc_named_arguments) do + # the same address may be used more than once in the same block, but we only want one `Balance` for a given + # `{address, block}`, so take unique params only + unique_entries = Enum.uniq(entries) + + unique_filtered_entries = + Enum.filter(unique_entries, fn {_hash, block_number} -> + block_number > first_block_to_index() + end) + + unique_entry_count = Enum.count(unique_filtered_entries) + Logger.metadata(count: unique_entry_count) + + Logger.debug(fn -> "fetching" end) + + unique_filtered_entries + |> Enum.map(&entry_to_params/1) + |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) + |> case do + {:ok, fetched_balances} -> + run_fetched_balances(fetched_balances, unique_entries) + + {:error, reason} -> + Logger.error( + fn -> + ["failed to fetch: ", inspect(reason)] + end, + error_count: unique_entry_count + ) + + {:retry, unique_entries} + end + end + + defp first_block_to_index do + string_value = Application.get_env(:indexer, :first_block) + + case Integer.parse(string_value) do + {integer, ""} -> integer + _ -> 0 + end + end + + defp entry_to_params({address_hash_bytes, block_number}) when is_integer(block_number) do + {:ok, address_hash} = Hash.Address.cast(address_hash_bytes) + %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address_hash)} + end + + defp entry(%{address_hash: %Hash{bytes: address_hash_bytes}, block_number: block_number}) do + {address_hash_bytes, block_number} + end + + def balances_params_to_address_params(balances_params) do + balances_params + |> Enum.group_by(fn %{address_hash: address_hash} -> address_hash end) + |> Map.values() + |> Stream.map(&Enum.max_by(&1, fn %{block_number: block_number} -> block_number end)) + |> Enum.map(fn %{address_hash: address_hash, block_number: block_number, value: value} -> + %{ + hash: address_hash, + fetched_coin_balance_block_number: block_number, + fetched_coin_balance: value + } + end) + end + + def import_fetched_balances(%FetchedBalances{params_list: params_list}, broadcast_type \\ false) do + day = + if Enum.empty?(params_list) do + nil + else + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + block_number = Enum.at(params_list, 0)[:block_number] + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + + DateTime.to_date(block_timestamp) + end + + importable_balances_daily_params = + Enum.map(params_list, fn param -> + param + |> Map.put(:day, day) + end) + + addresses_params = balances_params_to_address_params(importable_balances_daily_params) + + Chain.import(%{ + addresses: %{params: addresses_params, with: :balance_changeset}, + address_coin_balances_daily: %{params: importable_balances_daily_params}, + broadcast: broadcast_type + }) + end + + defp run_fetched_balances(%FetchedBalances{errors: errors} = fetched_balances, _) do + {:ok, imported} = import_fetched_balances(fetched_balances) + + Accounts.drop(imported[:addresses]) + + retry(errors) + end + + defp retry([]), do: :ok + + defp retry(errors) when is_list(errors) do + retried_entries = fetched_balances_errors_to_entries(errors) + + Logger.error( + fn -> + [ + "failed to fetch: ", + fetched_balance_errors_to_iodata(errors) + ] + end, + error_count: Enum.count(retried_entries) + ) + + {:retry, retried_entries} + end + + defp fetched_balances_errors_to_entries(errors) when is_list(errors) do + Enum.map(errors, &fetched_balance_error_to_entry/1) + end + + defp fetched_balance_error_to_entry(%{data: %{block_quantity: block_quantity, day: day, hash_data: hash_data}}) + when is_binary(block_quantity) and is_binary(hash_data) do + {:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data) + block_number = quantity_to_integer(block_quantity) + {address_hash_bytes, block_number, day} + end + + defp fetched_balance_errors_to_iodata(errors) when is_list(errors) do + fetched_balance_errors_to_iodata(errors, []) + end + + defp fetched_balance_errors_to_iodata([], iodata), do: iodata + + defp fetched_balance_errors_to_iodata([error | errors], iodata) do + fetched_balance_errors_to_iodata(errors, [iodata | fetched_balance_error_to_iodata(error)]) + end + + defp fetched_balance_error_to_iodata(%{ + code: code, + message: message, + data: %{day: day, hash_data: hash_data} + }) + when is_integer(code) and is_binary(message) and is_binary(day) and is_binary(hash_data) do + [hash_data, "@", day, ": (", to_string(code), ") ", message, ?\n] + end +end diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index f22b6de487..b5f6b96f2e 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -11,6 +11,7 @@ defmodule Indexer.Supervisor do alias Indexer.Fetcher.{ BlockReward, CoinBalance, + CoinBalanceDaily, CoinBalanceOnDemand, ContractCode, InternalTransaction, @@ -109,6 +110,8 @@ defmodule Indexer.Supervisor do [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {CoinBalance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, + {CoinBalanceDaily.Supervisor, + [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {Token.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {TokenInstance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, diff --git a/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex b/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex new file mode 100644 index 0000000000..de8c9832a5 --- /dev/null +++ b/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex @@ -0,0 +1,136 @@ +defmodule Indexer.Transform.AddressCoinBalancesDaily do + @moduledoc """ + Extracts `Explorer.Chain.Address.CoinBalanceDaily` params from other schema's params. + """ + + alias EthereumJSONRPC.Blocks + + def params_set(%{} = import_options) do + Enum.reduce(import_options, MapSet.new(), &reducer/2) + end + + defp reducer({:beneficiary_params, beneficiary_params}, acc) when is_list(beneficiary_params) do + Enum.into(beneficiary_params, acc, fn %{ + address_hash: address_hash, + block_number: block_number + } + when is_binary(address_hash) and is_integer(block_number) -> + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + + %{address_hash: address_hash, day: day} + end) + end + + defp reducer({:blocks_params, blocks_params}, acc) when is_list(blocks_params) do + # a block MUST have a miner_hash and number + Enum.into(blocks_params, acc, fn %{miner_hash: address_hash, number: block_number, timestamp: block_timestamp} + when is_binary(address_hash) and is_integer(block_number) -> + day = DateTime.to_date(block_timestamp) + %{address_hash: address_hash, day: day} + end) + end + + defp reducer({:internal_transactions_params, internal_transactions_params}, initial) + when is_list(internal_transactions_params) do + Enum.reduce(internal_transactions_params, initial, &internal_transactions_params_reducer/2) + end + + defp reducer({:logs_params, logs_params}, acc) when is_list(logs_params) do + # a log MUST have address_hash and block_number + logs_params + |> Enum.into(acc, fn + %{address_hash: address_hash, block_number: block_number} + when is_binary(address_hash) and is_integer(block_number) -> + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + %{address_hash: address_hash, day: day} + + %{type: "pending"} -> + nil + end) + |> Enum.reject(fn val -> is_nil(val) end) + |> MapSet.new() + end + + defp reducer({:transactions_params, transactions_params}, initial) when is_list(transactions_params) do + Enum.reduce(transactions_params, initial, &transactions_params_reducer/2) + end + + defp reducer({:block_second_degree_relations_params, block_second_degree_relations_params}, initial) + when is_list(block_second_degree_relations_params), + do: initial + + defp internal_transactions_params_reducer( + %{block_number: block_number} = internal_transaction_params, + acc + ) + when is_integer(block_number) do + case internal_transaction_params do + %{type: "call"} -> + acc + + %{type: "create", error: _} -> + acc + + %{type: "create", created_contract_address_hash: address_hash} when is_binary(address_hash) -> + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + MapSet.put(acc, %{address_hash: address_hash, day: day}) + + %{type: "selfdestruct", from_address_hash: from_address_hash, to_address_hash: to_address_hash} + when is_binary(from_address_hash) and is_binary(to_address_hash) -> + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + + acc + |> MapSet.put(%{address_hash: from_address_hash, day: day}) + |> MapSet.put(%{address_hash: to_address_hash, day: day}) + end + end + + defp transactions_params_reducer( + %{block_number: block_number, from_address_hash: from_address_hash} = transaction_params, + initial + ) + when is_binary(from_address_hash) do + # a transaction MUST have a `from_address_hash` + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + acc = MapSet.put(initial, %{address_hash: from_address_hash, day: day}) + + # `to_address_hash` is optional + case transaction_params do + %{to_address_hash: to_address_hash} when is_binary(to_address_hash) -> + MapSet.put(acc, %{address_hash: to_address_hash, day: day}) + + _ -> + acc + end + end +end