From 14fbf27606e251cc2ed31f903048d0d55a9ec396 Mon Sep 17 00:00:00 2001 From: Victor Baranov Date: Mon, 25 May 2020 18:47:57 +0300 Subject: [PATCH 1/6] Store ddresses coin balances changes daily --- CHANGELOG.md | 1 + apps/explorer/lib/explorer/chain.ex | 16 +- .../chain/address/coin_balance_daily.ex | 66 +++++ .../runner/address/coin_balances_daily.ex | 138 +++++++++++ .../chain/import/stage/address_referencing.ex | 3 +- ...0525115811_address_coin_balances_daily.exs | 17 ++ apps/indexer/lib/indexer/block/fetcher.ex | 19 +- .../lib/indexer/block/realtime/fetcher.ex | 23 +- .../lib/indexer/fetcher/block_reward.ex | 3 +- .../lib/indexer/fetcher/coin_balance.ex | 18 +- .../lib/indexer/fetcher/coin_balance_daily.ex | 226 ++++++++++++++++++ apps/indexer/lib/indexer/supervisor.ex | 3 + .../transform/address_coin_balances_daily.ex | 136 +++++++++++ 13 files changed, 643 insertions(+), 26 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex create mode 100644 apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex create mode 100644 apps/explorer/priv/repo/migrations/20200525115811_address_coin_balances_daily.exs create mode 100644 apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex create mode 100644 apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index 91567e98d2..dd2cf1adb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Features ### Fixes +- [#3125](https://github.com/poanetwork/blockscout/pull/3125) - Fix performance of coin balance history chart - [#3122](https://github.com/poanetwork/blockscout/pull/3122) - Exclude balance percentage calculation for burn address on accounts page - [#3121](https://github.com/poanetwork/blockscout/pull/3121) - Geth: handle response from eth_getblockbyhash JSON RPC method without totalDifficulty (uncle blocks) - [#3119](https://github.com/poanetwork/blockscout/pull/3119), [#3120](https://github.com/poanetwork/blockscout/pull/3120) - Fix performance of Inventory tab loading for ERC-721 tokens diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 79650b48e7..df569c4f88 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -33,6 +33,7 @@ defmodule Explorer.Chain do alias Explorer.Chain.{ Address, Address.CoinBalance, + Address.CoinBalanceDaily, Address.CurrentTokenBalance, Address.TokenBalance, Block, @@ -3488,25 +3489,12 @@ defmodule Explorer.Chain do @spec address_to_balances_by_day(Hash.Address.t()) :: [balance_by_day] def address_to_balances_by_day(address_hash) do - latest_block_timestamp = - address_hash - |> CoinBalance.last_coin_balance_timestamp() - |> Repo.one() - address_hash - |> CoinBalance.balances_by_day(latest_block_timestamp) + |> CoinBalanceDaily.balances_by_day() |> Repo.all() - |> replace_last_value(latest_block_timestamp) |> normalize_balances_by_day() end - # https://github.com/poanetwork/blockscout/issues/2658 - defp replace_last_value(items, %{value: value, timestamp: timestamp}) do - List.replace_at(items, -1, %{date: Date.convert!(timestamp, Calendar.ISO), value: value}) - end - - defp replace_last_value(items, _), do: items - defp normalize_balances_by_day(balances_by_day) do result = balances_by_day diff --git a/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex b/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex new file mode 100644 index 0000000000..063f1d8235 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex @@ -0,0 +1,66 @@ +defmodule Explorer.Chain.Address.CoinBalanceDaily do + @moduledoc """ + Maximum `t:Explorer.Chain.Wei.t/0` `value` of `t:Explorer.Chain.Address.t/0` at the day. + This table is used to display coinn balance history chart. + """ + + use Explorer.Schema + + alias Explorer.Chain.{Address, Hash, Wei} + alias Explorer.Chain.Address.CoinBalanceDaily + + @optional_fields ~w(value)a + @required_fields ~w(address_hash day)a + @allowed_fields @optional_fields ++ @required_fields + + @typedoc """ + * `address` - the `t:Explorer.Chain.Address.t/0`. + * `address_hash` - foreign key for `address`. + * `day` - the `t:Date.t/0`. + * `inserted_at` - When the balance was first inserted into the database. + * `updated_at` - When the balance was last updated. + * `value` - the max balance (`value`) of `address` during the `day`. + """ + @type t :: %__MODULE__{ + address: %Ecto.Association.NotLoaded{} | Address.t(), + address_hash: Hash.Address.t(), + day: Date.t(), + inserted_at: DateTime.t(), + updated_at: DateTime.t(), + value: Wei.t() | nil + } + + @primary_key false + schema "address_coin_balances_daily" do + field(:day, :date) + field(:value, Wei) + + timestamps() + + belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address) + end + + @doc """ + Builds an `Ecto.Query` to fetch a series of balances by day for the given account. Each element in the series + corresponds to the maximum balance in that day. Only the last 90 days of data are used. + """ + def balances_by_day(address_hash) do + CoinBalanceDaily + |> where([cbd], cbd.address_hash == ^address_hash) + |> limit_time_interval() + |> order_by([cbd], cbd.day) + |> select([cbd], %{date: cbd.day, value: cbd.value}) + end + + def limit_time_interval(query) do + query |> where([cbd], cbd.day >= fragment("date_trunc('day', now()) - interval '90 days'")) + end + + def changeset(%__MODULE__{} = balance, params) do + balance + |> cast(params, @allowed_fields) + |> validate_required(@required_fields) + |> foreign_key_constraint(:address_hash) + |> unique_constraint(:day, name: :address_coin_balances_daily_address_hash_day_index) + end +end diff --git a/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex b/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex new file mode 100644 index 0000000000..e9e747c069 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex @@ -0,0 +1,138 @@ +defmodule Explorer.Chain.Import.Runner.Address.CoinBalancesDaily do + @moduledoc """ + Bulk imports `t:Explorer.Chain.Address.CoinBalancesDaily.t/0`. + """ + + require Ecto.Query + + import Ecto.Query, only: [from: 2] + + alias Ecto.{Changeset, Multi, Repo} + alias Explorer.Chain.Address.CoinBalanceDaily + alias Explorer.Chain.{Hash, Import, Wei} + + @behaviour Import.Runner + + # milliseconds + @timeout 60_000 + + @type imported :: [ + %{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()} + ] + + @impl Import.Runner + def ecto_schema_module, do: CoinBalanceDaily + + @impl Import.Runner + def option_key, do: :address_coin_balances_daily + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[%{address_hash: Explorer.Chain.Hash.t(), day: Date.t()}]", + value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `address_hash` and `day`" + } + end + + @impl Import.Runner + def run(multi, changes_list, %{timestamps: timestamps} = options) do + insert_options = + options + |> Map.get(option_key(), %{}) + |> Map.take(~w(on_conflict timeout)a) + |> Map.put_new(:timeout, @timeout) + |> Map.put(:timestamps, timestamps) + + Multi.run(multi, :address_coin_balances_daily, fn repo, _ -> + insert(repo, changes_list, insert_options) + end) + end + + @impl Import.Runner + def timeout, do: @timeout + + @spec insert( + Repo.t(), + [ + %{ + required(:address_hash) => Hash.Address.t(), + required(:day) => Date.t(), + required(:value) => Wei.t() + } + ], + %{ + optional(:on_conflict) => Import.Runner.on_conflict(), + required(:timeout) => timeout, + required(:timestamps) => Import.timestamps() + } + ) :: + {:ok, [%{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()}]} + | {:error, [Changeset.t()]} + defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + + combined_changes_list = + changes_list + |> Enum.reduce([], fn change, acc -> + if Enum.empty?(acc) do + [change | acc] + else + target_item = + Enum.find(acc, fn item -> + item.day == change.day && item.address_hash == change.address_hash + end) + + if target_item do + if change.value > target_item.value do + acc_updated = List.delete(acc, target_item) + [change | acc_updated] + else + acc + end + else + [change | acc] + end + end + end) + + # Enforce CoinBalanceDaily ShareLocks order (see docs: sharelocks.md) + ordered_changes_list = Enum.sort_by(combined_changes_list, &{&1.address_hash, &1.day}) + + {:ok, _} = + Import.insert_changes_list( + repo, + ordered_changes_list, + conflict_target: [:address_hash, :day], + on_conflict: on_conflict, + for: CoinBalanceDaily, + timeout: timeout, + timestamps: timestamps + ) + + {:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash day)a))} + end + + def default_on_conflict do + from( + balance in CoinBalanceDaily, + update: [ + set: [ + value: + fragment( + """ + CASE WHEN EXCLUDED.value IS NOT NULL THEN + EXCLUDED.value + ELSE + ? + END + """, + balance.value + ), + inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at), + updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at) + ] + ], + where: fragment("EXCLUDED.value IS NOT NULL") + ) + end +end diff --git a/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex b/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex index 2575a0eab8..a428d51b0c 100644 --- a/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex +++ b/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex @@ -14,7 +14,8 @@ defmodule Explorer.Chain.Import.Stage.AddressReferencing do Runner.Address.CoinBalances, Runner.Blocks, Runner.StakingPools, - Runner.StakingPoolsDelegators + Runner.StakingPoolsDelegators, + Runner.Address.CoinBalancesDaily ] @impl Stage diff --git a/apps/explorer/priv/repo/migrations/20200525115811_address_coin_balances_daily.exs b/apps/explorer/priv/repo/migrations/20200525115811_address_coin_balances_daily.exs new file mode 100644 index 0000000000..dc03899679 --- /dev/null +++ b/apps/explorer/priv/repo/migrations/20200525115811_address_coin_balances_daily.exs @@ -0,0 +1,17 @@ +defmodule Explorer.Repo.Migrations.AddressCoinBalancesDaily do + use Ecto.Migration + + def change do + create table(:address_coin_balances_daily, primary_key: false) do + add(:address_hash, references(:addresses, column: :hash, type: :bytea), null: false) + add(:day, :date, null: false) + + # null until fetched + add(:value, :numeric, precision: 100, default: fragment("NULL"), null: true) + + timestamps(null: false, type: :utc_datetime_usec) + end + + create(unique_index(:address_coin_balances_daily, [:address_hash, :day])) + end +end diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 2d4d399eba..c84e9acd48 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -20,6 +20,7 @@ defmodule Indexer.Block.Fetcher do alias Indexer.Fetcher.{ BlockReward, CoinBalance, + CoinBalanceDaily, ContractCode, InternalTransaction, ReplacedTransaction, @@ -55,6 +56,7 @@ defmodule Indexer.Block.Fetcher do address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number, addresses: Import.Runner.options(), address_coin_balances: Import.Runner.options(), + address_coin_balances_daily: Import.Runner.options(), address_token_balances: Import.Runner.options(), blocks: Import.Runner.options(), block_second_degree_relations: Import.Runner.options(), @@ -164,6 +166,7 @@ defmodule Indexer.Block.Fetcher do %{ addresses: %{params: addresses}, address_coin_balances: %{params: coin_balances_params_set}, + address_coin_balances_daily: %{params: coin_balances_params_set}, address_token_balances: %{params: address_token_balances}, blocks: %{params: blocks}, block_second_degree_relations: %{params: block_second_degree_relations_params}, @@ -245,12 +248,18 @@ defmodule Indexer.Block.Fetcher do def async_import_coin_balances(%{addresses: addresses}, %{ address_hash_to_fetched_balance_block_number: address_hash_to_block_number }) do - addresses - |> Enum.map(fn %Address{hash: address_hash} -> - block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash)) - %{address_hash: address_hash, block_number: block_number} - end) + coin_balances_import_params = + addresses + |> Enum.map(fn %Address{hash: address_hash} -> + block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash)) + %{address_hash: address_hash, block_number: block_number} + end) + + coin_balances_import_params |> CoinBalance.async_fetch_balances() + + coin_balances_import_params + |> CoinBalanceDaily.async_fetch_balances() end def async_import_coin_balances(_, _), do: :ok diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 533a79c74c..0533484b7f 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -26,7 +26,7 @@ defmodule Indexer.Block.Realtime.Fetcher do ] alias Ecto.Changeset - alias EthereumJSONRPC.{FetchedBalances, Subscription} + alias EthereumJSONRPC.{Blocks, FetchedBalances, Subscription} alias Explorer.Chain alias Explorer.Chain.Cache.Accounts alias Explorer.Counters.AverageBlockTime @@ -172,6 +172,7 @@ defmodule Indexer.Block.Realtime.Fetcher do block_fetcher, %{ address_coin_balances: %{params: address_coin_balances_params}, + address_coin_balances_daily: %{params: address_coin_balances_daily_params}, address_hash_to_fetched_balance_block_number: address_hash_to_block_number, addresses: %{params: addresses_params}, block_rewards: block_rewards @@ -182,7 +183,8 @@ defmodule Indexer.Block.Realtime.Fetcher do balances(block_fetcher, %{ address_hash_to_block_number: address_hash_to_block_number, addresses_params: addresses_params, - balances_params: address_coin_balances_params + balances_params: address_coin_balances_params, + balances_daily_params: address_coin_balances_daily_params })}, {block_reward_errors, chain_import_block_rewards} = Map.pop(block_rewards, :errors), chain_import_options = @@ -381,7 +383,22 @@ defmodule Indexer.Block.Realtime.Fetcher do importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at)) - {:ok, %{addresses_params: merged_addresses_params, balances_params: importable_balances_params}} + block_number = Enum.at(params_list, 0)[:block_number] + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + + importable_balances_daily_params = Enum.map(params_list, &Map.put(&1, :day, day)) + + {:ok, + %{ + addresses_params: merged_addresses_params, + balances_params: importable_balances_params, + balances_daily_params: importable_balances_daily_params + }} {:error, _} = error -> error diff --git a/apps/indexer/lib/indexer/fetcher/block_reward.ex b/apps/indexer/lib/indexer/fetcher/block_reward.ex index ed593bb655..2075c33d99 100644 --- a/apps/indexer/lib/indexer/fetcher/block_reward.ex +++ b/apps/indexer/lib/indexer/fetcher/block_reward.ex @@ -20,7 +20,7 @@ defmodule Indexer.Fetcher.BlockReward do alias Explorer.Chain.Cache.Accounts alias Indexer.{BufferedTask, Tracer} alias Indexer.Fetcher.BlockReward.Supervisor, as: BlockRewardSupervisor - alias Indexer.Fetcher.CoinBalance + alias Indexer.Fetcher.{CoinBalance, CoinBalanceDaily} alias Indexer.Transform.{AddressCoinBalances, Addresses} @behaviour BufferedTask @@ -135,6 +135,7 @@ defmodule Indexer.Fetcher.BlockReward do Accounts.drop(addresses) CoinBalance.async_fetch_balances(address_coin_balances) + CoinBalanceDaily.async_fetch_balances(address_coin_balances) retry_errors(errors) diff --git a/apps/indexer/lib/indexer/fetcher/coin_balance.ex b/apps/indexer/lib/indexer/fetcher/coin_balance.ex index f8fadfcb53..32f37459a0 100644 --- a/apps/indexer/lib/indexer/fetcher/coin_balance.ex +++ b/apps/indexer/lib/indexer/fetcher/coin_balance.ex @@ -77,12 +77,17 @@ defmodule Indexer.Fetcher.CoinBalance do # `{address, block}`, so take unique params only unique_entries = Enum.uniq(entries) - unique_entry_count = Enum.count(unique_entries) + unique_filtered_entries = + Enum.filter(unique_entries, fn {_hash, block_number} -> + block_number > first_block_to_index() + end) + + unique_entry_count = Enum.count(unique_filtered_entries) Logger.metadata(count: unique_entry_count) Logger.debug(fn -> "fetching" end) - unique_entries + unique_filtered_entries |> Enum.map(&entry_to_params/1) |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) |> case do @@ -101,6 +106,15 @@ defmodule Indexer.Fetcher.CoinBalance do end end + defp first_block_to_index do + string_value = Application.get_env(:indexer, :first_block) + + case Integer.parse(string_value) do + {integer, ""} -> integer + _ -> 0 + end + end + defp entry_to_params({address_hash_bytes, block_number}) when is_integer(block_number) do {:ok, address_hash} = Hash.Address.cast(address_hash_bytes) %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address_hash)} diff --git a/apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex b/apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex new file mode 100644 index 0000000000..f15f599a9b --- /dev/null +++ b/apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex @@ -0,0 +1,226 @@ +defmodule Indexer.Fetcher.CoinBalanceDaily do + @moduledoc """ + Fetches `t:Explorer.Chain.Address.CoinBalanceDaily.t/0`. + """ + + use Indexer.Fetcher + use Spandex.Decorators + + require Logger + + import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1] + + alias EthereumJSONRPC.{Blocks, FetchedBalances} + alias Explorer.Chain + alias Explorer.Chain.Cache.Accounts + alias Explorer.Chain.Hash + alias Indexer.{BufferedTask, Tracer} + + @behaviour BufferedTask + + @defaults [ + flush_interval: :timer.seconds(3), + max_batch_size: 500, + max_concurrency: 4, + task_supervisor: Indexer.Fetcher.CoinBalanceDaily.TaskSupervisor, + metadata: [fetcher: :coin_balance_daily] + ] + + @doc """ + Asynchronously fetches balances for each address `hash` at the `day`. + """ + @spec async_fetch_balances([ + %{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()} + ]) :: :ok + def async_fetch_balances(balance_fields) when is_list(balance_fields) do + entries = Enum.map(balance_fields, &entry/1) + + BufferedTask.buffer(__MODULE__, entries) + end + + @doc false + # credo:disable-for-next-line Credo.Check.Design.DuplicatedCode + def child_spec([init_options, gen_server_options]) do + {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) + + unless state do + raise ArgumentError, + ":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> + "to allow for json_rpc calls when running." + end + + merged_init_options = + @defaults + |> Keyword.merge(mergeable_init_options) + |> Keyword.put(:state, state) + + Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__) + end + + @impl BufferedTask + def init(initial, reducer, _) do + {:ok, final} = + Chain.stream_unfetched_balances(initial, fn address_fields, acc -> + address_fields + |> entry() + |> reducer.(acc) + end) + + final + end + + @impl BufferedTask + @decorate trace(name: "fetch", resource: "Indexer.Fetcher.CoinBalanceDaily.run/2", service: :indexer, tracer: Tracer) + def run(entries, json_rpc_named_arguments) do + # the same address may be used more than once in the same block, but we only want one `Balance` for a given + # `{address, block}`, so take unique params only + unique_entries = Enum.uniq(entries) + + unique_filtered_entries = + Enum.filter(unique_entries, fn {_hash, block_number} -> + block_number > first_block_to_index() + end) + + unique_entry_count = Enum.count(unique_filtered_entries) + Logger.metadata(count: unique_entry_count) + + Logger.debug(fn -> "fetching" end) + + unique_filtered_entries + |> Enum.map(&entry_to_params/1) + |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) + |> case do + {:ok, fetched_balances} -> + run_fetched_balances(fetched_balances, unique_entries) + + {:error, reason} -> + Logger.error( + fn -> + ["failed to fetch: ", inspect(reason)] + end, + error_count: unique_entry_count + ) + + {:retry, unique_entries} + end + end + + defp first_block_to_index do + string_value = Application.get_env(:indexer, :first_block) + + case Integer.parse(string_value) do + {integer, ""} -> integer + _ -> 0 + end + end + + defp entry_to_params({address_hash_bytes, block_number}) when is_integer(block_number) do + {:ok, address_hash} = Hash.Address.cast(address_hash_bytes) + %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address_hash)} + end + + defp entry(%{address_hash: %Hash{bytes: address_hash_bytes}, block_number: block_number}) do + {address_hash_bytes, block_number} + end + + def balances_params_to_address_params(balances_params) do + balances_params + |> Enum.group_by(fn %{address_hash: address_hash} -> address_hash end) + |> Map.values() + |> Stream.map(&Enum.max_by(&1, fn %{block_number: block_number} -> block_number end)) + |> Enum.map(fn %{address_hash: address_hash, block_number: block_number, value: value} -> + %{ + hash: address_hash, + fetched_coin_balance_block_number: block_number, + fetched_coin_balance: value + } + end) + end + + def import_fetched_balances(%FetchedBalances{params_list: params_list}, broadcast_type \\ false) do + day = + if Enum.empty?(params_list) do + nil + else + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + block_number = Enum.at(params_list, 0)[:block_number] + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + + DateTime.to_date(block_timestamp) + end + + importable_balances_daily_params = + Enum.map(params_list, fn param -> + param + |> Map.put(:day, day) + end) + + addresses_params = balances_params_to_address_params(importable_balances_daily_params) + + Chain.import(%{ + addresses: %{params: addresses_params, with: :balance_changeset}, + address_coin_balances_daily: %{params: importable_balances_daily_params}, + broadcast: broadcast_type + }) + end + + defp run_fetched_balances(%FetchedBalances{errors: errors} = fetched_balances, _) do + {:ok, imported} = import_fetched_balances(fetched_balances) + + Accounts.drop(imported[:addresses]) + + retry(errors) + end + + defp retry([]), do: :ok + + defp retry(errors) when is_list(errors) do + retried_entries = fetched_balances_errors_to_entries(errors) + + Logger.error( + fn -> + [ + "failed to fetch: ", + fetched_balance_errors_to_iodata(errors) + ] + end, + error_count: Enum.count(retried_entries) + ) + + {:retry, retried_entries} + end + + defp fetched_balances_errors_to_entries(errors) when is_list(errors) do + Enum.map(errors, &fetched_balance_error_to_entry/1) + end + + defp fetched_balance_error_to_entry(%{data: %{block_quantity: block_quantity, day: day, hash_data: hash_data}}) + when is_binary(block_quantity) and is_binary(hash_data) do + {:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data) + block_number = quantity_to_integer(block_quantity) + {address_hash_bytes, block_number, day} + end + + defp fetched_balance_errors_to_iodata(errors) when is_list(errors) do + fetched_balance_errors_to_iodata(errors, []) + end + + defp fetched_balance_errors_to_iodata([], iodata), do: iodata + + defp fetched_balance_errors_to_iodata([error | errors], iodata) do + fetched_balance_errors_to_iodata(errors, [iodata | fetched_balance_error_to_iodata(error)]) + end + + defp fetched_balance_error_to_iodata(%{ + code: code, + message: message, + data: %{day: day, hash_data: hash_data} + }) + when is_integer(code) and is_binary(message) and is_binary(day) and is_binary(hash_data) do + [hash_data, "@", day, ": (", to_string(code), ") ", message, ?\n] + end +end diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index f22b6de487..b5f6b96f2e 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -11,6 +11,7 @@ defmodule Indexer.Supervisor do alias Indexer.Fetcher.{ BlockReward, CoinBalance, + CoinBalanceDaily, CoinBalanceOnDemand, ContractCode, InternalTransaction, @@ -109,6 +110,8 @@ defmodule Indexer.Supervisor do [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {CoinBalance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, + {CoinBalanceDaily.Supervisor, + [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {Token.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {TokenInstance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, diff --git a/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex b/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex new file mode 100644 index 0000000000..de8c9832a5 --- /dev/null +++ b/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex @@ -0,0 +1,136 @@ +defmodule Indexer.Transform.AddressCoinBalancesDaily do + @moduledoc """ + Extracts `Explorer.Chain.Address.CoinBalanceDaily` params from other schema's params. + """ + + alias EthereumJSONRPC.Blocks + + def params_set(%{} = import_options) do + Enum.reduce(import_options, MapSet.new(), &reducer/2) + end + + defp reducer({:beneficiary_params, beneficiary_params}, acc) when is_list(beneficiary_params) do + Enum.into(beneficiary_params, acc, fn %{ + address_hash: address_hash, + block_number: block_number + } + when is_binary(address_hash) and is_integer(block_number) -> + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + + %{address_hash: address_hash, day: day} + end) + end + + defp reducer({:blocks_params, blocks_params}, acc) when is_list(blocks_params) do + # a block MUST have a miner_hash and number + Enum.into(blocks_params, acc, fn %{miner_hash: address_hash, number: block_number, timestamp: block_timestamp} + when is_binary(address_hash) and is_integer(block_number) -> + day = DateTime.to_date(block_timestamp) + %{address_hash: address_hash, day: day} + end) + end + + defp reducer({:internal_transactions_params, internal_transactions_params}, initial) + when is_list(internal_transactions_params) do + Enum.reduce(internal_transactions_params, initial, &internal_transactions_params_reducer/2) + end + + defp reducer({:logs_params, logs_params}, acc) when is_list(logs_params) do + # a log MUST have address_hash and block_number + logs_params + |> Enum.into(acc, fn + %{address_hash: address_hash, block_number: block_number} + when is_binary(address_hash) and is_integer(block_number) -> + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + %{address_hash: address_hash, day: day} + + %{type: "pending"} -> + nil + end) + |> Enum.reject(fn val -> is_nil(val) end) + |> MapSet.new() + end + + defp reducer({:transactions_params, transactions_params}, initial) when is_list(transactions_params) do + Enum.reduce(transactions_params, initial, &transactions_params_reducer/2) + end + + defp reducer({:block_second_degree_relations_params, block_second_degree_relations_params}, initial) + when is_list(block_second_degree_relations_params), + do: initial + + defp internal_transactions_params_reducer( + %{block_number: block_number} = internal_transaction_params, + acc + ) + when is_integer(block_number) do + case internal_transaction_params do + %{type: "call"} -> + acc + + %{type: "create", error: _} -> + acc + + %{type: "create", created_contract_address_hash: address_hash} when is_binary(address_hash) -> + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + MapSet.put(acc, %{address_hash: address_hash, day: day}) + + %{type: "selfdestruct", from_address_hash: from_address_hash, to_address_hash: to_address_hash} + when is_binary(from_address_hash) and is_binary(to_address_hash) -> + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + + acc + |> MapSet.put(%{address_hash: from_address_hash, day: day}) + |> MapSet.put(%{address_hash: to_address_hash, day: day}) + end + end + + defp transactions_params_reducer( + %{block_number: block_number, from_address_hash: from_address_hash} = transaction_params, + initial + ) + when is_binary(from_address_hash) do + # a transaction MUST have a `from_address_hash` + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: blocks_params}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + block_timestamp = Enum.at(blocks_params, 0).timestamp + day = DateTime.to_date(block_timestamp) + acc = MapSet.put(initial, %{address_hash: from_address_hash, day: day}) + + # `to_address_hash` is optional + case transaction_params do + %{to_address_hash: to_address_hash} when is_binary(to_address_hash) -> + MapSet.put(acc, %{address_hash: to_address_hash, day: day}) + + _ -> + acc + end + end +end From 14079ef12341884ecbd0bb8c5a94996a833db713 Mon Sep 17 00:00:00 2001 From: Victor Baranov Date: Tue, 26 May 2020 16:40:33 +0300 Subject: [PATCH 2/6] Fix merging conflict --- apps/indexer/lib/indexer/fetcher/coin_balance.ex | 9 --------- 1 file changed, 9 deletions(-) diff --git a/apps/indexer/lib/indexer/fetcher/coin_balance.ex b/apps/indexer/lib/indexer/fetcher/coin_balance.ex index da06fc2732..06c4bf6fa3 100644 --- a/apps/indexer/lib/indexer/fetcher/coin_balance.ex +++ b/apps/indexer/lib/indexer/fetcher/coin_balance.ex @@ -115,15 +115,6 @@ defmodule Indexer.Fetcher.CoinBalance do end end - defp first_block_to_index do - string_value = Application.get_env(:indexer, :first_block) - - case Integer.parse(string_value) do - {integer, ""} -> integer - _ -> 0 - end - end - defp entry_to_params({address_hash_bytes, block_number}) when is_integer(block_number) do {:ok, address_hash} = Hash.Address.cast(address_hash_bytes) %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address_hash)} From c6693596d5eeaa496a32f829b4cccc888f1831b5 Mon Sep 17 00:00:00 2001 From: Victor Baranov Date: Thu, 28 May 2020 13:29:54 +0300 Subject: [PATCH 3/6] Finalize coin balance history chart update --- apps/block_scout_web/config/config.exs | 4 + ...ss_coin_balance_by_day_controller_test.exs | 2 + .../api/rpc/address_controller_test.exs | 71 ++++++ apps/explorer/lib/explorer/chain.ex | 13 + .../chain/address/coin_balance_daily.ex | 17 +- .../runner/address/coin_balances_daily.ex | 3 +- .../explorer/chain_spec/parity/importer.ex | 22 +- .../chain_spec/parity/importer_test.exs | 78 +++++- apps/explorer/test/explorer/chain_test.exs | 4 + apps/explorer/test/support/factory.ex | 13 + apps/indexer/lib/indexer/block/fetcher.ex | 28 ++- .../lib/indexer/block/realtime/fetcher.ex | 40 +++- .../lib/indexer/fetcher/block_reward.ex | 9 +- .../lib/indexer/fetcher/coin_balance.ex | 79 +++++- .../lib/indexer/fetcher/coin_balance_daily.ex | 226 ------------------ .../indexer/fetcher/coin_balance_on_demand.ex | 55 ++++- apps/indexer/lib/indexer/supervisor.ex | 3 - .../transform/address_coin_balances_daily.ex | 59 +++-- .../test/indexer/block/fetcher_test.exs | 135 ++++++++++- .../indexer/block/realtime/fetcher_test.exs | 120 +++++++++- .../indexer/fetcher/block_reward_test.exs | 126 ++++++++++ .../fetcher/coin_balance_on_demand_test.exs | 65 +++++ .../indexer/fetcher/coin_balance_test.exs | 137 ++++++++++- 23 files changed, 1018 insertions(+), 291 deletions(-) delete mode 100644 apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex diff --git a/apps/block_scout_web/config/config.exs b/apps/block_scout_web/config/config.exs index 9fd1da1f08..b5bd4087f6 100644 --- a/apps/block_scout_web/config/config.exs +++ b/apps/block_scout_web/config/config.exs @@ -84,6 +84,10 @@ config :block_scout_web, BlockScoutWeb.Chain.TransactionHistoryChartController, # days history_size: 30 +config :block_scout_web, BlockScoutWeb.Chain.Address.CoinBalance, + # days + coin_balance_history_days: System.get_env("COIN_BALANCE_HISTORY_DAYS", "10") + config :ex_cldr, default_locale: "en", default_backend: BlockScoutWeb.Cldr diff --git a/apps/block_scout_web/test/block_scout_web/controllers/address_coin_balance_by_day_controller_test.exs b/apps/block_scout_web/test/block_scout_web/controllers/address_coin_balance_by_day_controller_test.exs index 3778793300..8d94957b9c 100644 --- a/apps/block_scout_web/test/block_scout_web/controllers/address_coin_balance_by_day_controller_test.exs +++ b/apps/block_scout_web/test/block_scout_web/controllers/address_coin_balance_by_day_controller_test.exs @@ -11,6 +11,8 @@ defmodule BlockScoutWeb.AddressCoinBalanceByDayControllerTest do block_one_day_ago = insert(:block, timestamp: Timex.shift(noon, days: -1), number: 1) insert(:fetched_balance, address_hash: address.hash, value: 1000, block_number: block.number) insert(:fetched_balance, address_hash: address.hash, value: 2000, block_number: block_one_day_ago.number) + insert(:fetched_balance_daily, address_hash: address.hash, value: 1000, day: noon) + insert(:fetched_balance_daily, address_hash: address.hash, value: 2000, day: Timex.shift(noon, days: -1)) conn = get(conn, address_coin_balance_by_day_path(conn, :index, Address.checksum(address)), %{"type" => "JSON"}) diff --git a/apps/block_scout_web/test/block_scout_web/controllers/api/rpc/address_controller_test.exs b/apps/block_scout_web/test/block_scout_web/controllers/api/rpc/address_controller_test.exs index 1f18b5422d..5378ca6cfe 100644 --- a/apps/block_scout_web/test/block_scout_web/controllers/api/rpc/address_controller_test.exs +++ b/apps/block_scout_web/test/block_scout_web/controllers/api/rpc/address_controller_test.exs @@ -111,6 +111,30 @@ defmodule BlockScoutWeb.API.RPC.AddressControllerTest do address_hash = to_string(address.hash) + expect(EthereumJSONRPC.Mox, :json_rpc, 1, fn [ + %{ + id: id, + method: "eth_getBalance", + params: [^mining_address_hash, "0x65"] + } + ], + _options -> + {:ok, [%{id: id, jsonrpc: "2.0", result: "0x02"}]} + end) + + res = eth_block_number_fake_response("0x65") + + expect(EthereumJSONRPC.Mox, :json_rpc, 1, fn [ + %{ + id: 0, + method: "eth_getBlockByNumber", + params: ["0x65", true] + } + ], + _ -> + {:ok, [res]} + end) + expect(EthereumJSONRPC.Mox, :json_rpc, 1, fn [ %{ id: id, @@ -122,6 +146,17 @@ defmodule BlockScoutWeb.API.RPC.AddressControllerTest do {:ok, [%{id: id, jsonrpc: "2.0", result: "0x02"}]} end) + expect(EthereumJSONRPC.Mox, :json_rpc, 1, fn [ + %{ + id: 0, + method: "eth_getBlockByNumber", + params: ["0x65", true] + } + ], + _ -> + {:ok, [res]} + end) + response = conn |> get("/api", params) @@ -2719,4 +2754,40 @@ defmodule BlockScoutWeb.API.RPC.AddressControllerTest do |> put_in(["properties", "result"], result) |> ExJsonSchema.Schema.resolve() end + + defp eth_block_number_fake_response(block_quantity) do + %{ + id: 0, + jsonrpc: "2.0", + result: %{ + "author" => "0x0000000000000000000000000000000000000000", + "difficulty" => "0x20000", + "extraData" => "0x", + "gasLimit" => "0x663be0", + "gasUsed" => "0x0", + "hash" => "0x5b28c1bfd3a15230c9a46b399cd0f9a6920d432e85381cc6a140b06e8410112f", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0x0000000000000000000000000000000000000000", + "number" => block_quantity, + "parentHash" => "0x0000000000000000000000000000000000000000000000000000000000000000", + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x80", + "0xb8410000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "size" => "0x215", + "stateRoot" => "0xfad4af258fd11939fae0c6c6eec9d340b1caac0b0196fd9a1bc3f489c5bf00b3", + "step" => "0", + "timestamp" => "0x0", + "totalDifficulty" => "0x20000", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + } + } + end end diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index bdc2062c8f..ed300bebc7 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -3514,12 +3514,25 @@ defmodule Explorer.Chain do @spec address_to_balances_by_day(Hash.Address.t()) :: [balance_by_day] def address_to_balances_by_day(address_hash) do + latest_block_timestamp = + address_hash + |> CoinBalance.last_coin_balance_timestamp() + |> Repo.one() + address_hash |> CoinBalanceDaily.balances_by_day() |> Repo.all() + |> replace_last_value(latest_block_timestamp) |> normalize_balances_by_day() end + # https://github.com/poanetwork/blockscout/issues/2658 + defp replace_last_value(items, %{value: value, timestamp: timestamp}) do + List.replace_at(items, -1, %{date: Date.convert!(timestamp, Calendar.ISO), value: value}) + end + + defp replace_last_value(items, _), do: items + defp normalize_balances_by_day(balances_by_day) do result = balances_by_day diff --git a/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex b/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex index 063f1d8235..4cc68b8970 100644 --- a/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex +++ b/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex @@ -42,18 +42,27 @@ defmodule Explorer.Chain.Address.CoinBalanceDaily do @doc """ Builds an `Ecto.Query` to fetch a series of balances by day for the given account. Each element in the series - corresponds to the maximum balance in that day. Only the last 90 days of data are used. + corresponds to the maximum balance in that day. Only the last `n` days of data are used. + `n` is configurable via COIN_BALANCE_HISTORY_DAYS ENV var. """ def balances_by_day(address_hash) do + {days_to_consider, _} = + Application.get_env(:block_scout_web, BlockScoutWeb.Chain.Address.CoinBalance)[:coin_balance_history_days] + |> Integer.parse() + CoinBalanceDaily |> where([cbd], cbd.address_hash == ^address_hash) - |> limit_time_interval() + |> limit_time_interval(days_to_consider) |> order_by([cbd], cbd.day) |> select([cbd], %{date: cbd.day, value: cbd.value}) end - def limit_time_interval(query) do - query |> where([cbd], cbd.day >= fragment("date_trunc('day', now()) - interval '90 days'")) + def limit_time_interval(query, days_to_consider) do + query + |> where( + [cbd], + cbd.day >= fragment("date_trunc('day', now() - CAST(? AS INTERVAL))", ^%Postgrex.Interval{days: days_to_consider}) + ) end def changeset(%__MODULE__{} = balance, params) do diff --git a/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex b/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex index e9e747c069..d8f279d9ad 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances_daily.ex @@ -120,12 +120,13 @@ defmodule Explorer.Chain.Import.Runner.Address.CoinBalancesDaily do value: fragment( """ - CASE WHEN EXCLUDED.value IS NOT NULL THEN + CASE WHEN EXCLUDED.value IS NOT NULL AND EXCLUDED.value > ? THEN EXCLUDED.value ELSE ? END """, + balance.value, balance.value ), inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at), diff --git a/apps/explorer/lib/explorer/chain_spec/parity/importer.ex b/apps/explorer/lib/explorer/chain_spec/parity/importer.ex index ba4c7a3e37..cdd2da1eed 100644 --- a/apps/explorer/lib/explorer/chain_spec/parity/importer.ex +++ b/apps/explorer/lib/explorer/chain_spec/parity/importer.ex @@ -5,6 +5,7 @@ defmodule Explorer.ChainSpec.Parity.Importer do require Logger + alias EthereumJSONRPC.Blocks alias Explorer.{Chain, Repo} alias Explorer.Chain.Block.{EmissionReward, Range} alias Explorer.Chain.Hash.Address, as: AddressHash @@ -33,6 +34,21 @@ defmodule Explorer.ChainSpec.Parity.Importer do end) |> Enum.to_list() + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + {:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} = + EthereumJSONRPC.fetch_blocks_by_range(1..1, json_rpc_named_arguments) + + day = DateTime.to_date(timestamp) + + balance_daily_params = + chain_spec + |> genesis_accounts() + |> Stream.map(fn balance_map -> + Map.put(balance_map, :day, day) + end) + |> Enum.to_list() + address_params = balance_params |> Stream.map(fn %{address_hash: hash} = map -> @@ -40,7 +56,11 @@ defmodule Explorer.ChainSpec.Parity.Importer do end) |> Enum.to_list() - params = %{address_coin_balances: %{params: balance_params}, addresses: %{params: address_params}} + params = %{ + address_coin_balances: %{params: balance_params}, + address_coin_balances_daily: %{params: balance_daily_params}, + addresses: %{params: address_params} + } Chain.import(params) end diff --git a/apps/explorer/test/explorer/chain_spec/parity/importer_test.exs b/apps/explorer/test/explorer/chain_spec/parity/importer_test.exs index ba97a94731..2f19675c43 100644 --- a/apps/explorer/test/explorer/chain_spec/parity/importer_test.exs +++ b/apps/explorer/test/explorer/chain_spec/parity/importer_test.exs @@ -1,12 +1,17 @@ defmodule Explorer.ChainSpec.Parity.ImporterTest do use Explorer.DataCase - alias Explorer.Chain.Address.CoinBalance + import Mox + import EthereumJSONRPC, only: [integer_to_quantity: 1] + + alias Explorer.Chain.Address.{CoinBalance, CoinBalanceDaily} alias Explorer.Chain.Block.{EmissionReward, Range} alias Explorer.Chain.{Address, Hash, Wei} alias Explorer.ChainSpec.Parity.Importer alias Explorer.Repo + setup :set_mox_global + @chain_spec "#{File.cwd!()}/test/support/fixture/chain_spec/foundation.json" |> File.read!() |> Jason.decode!() @@ -141,14 +146,37 @@ defmodule Explorer.ChainSpec.Parity.ImporterTest do describe "import_genesis_accounts/1" do test "imports accounts" do + block_quantity = integer_to_quantity(1) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{id: 0, jsonrpc: "2.0", method: "eth_getBlockByNumber", params: ["0x1", true]} + ], + _ -> + {:ok, [res]} + end) + {:ok, %{address_coin_balances: address_coin_balances}} = Importer.import_genesis_accounts(@chain_spec) assert Enum.count(address_coin_balances) == 403 assert CoinBalance |> Repo.all() |> Enum.count() == 403 + assert CoinBalanceDaily |> Repo.all() |> Enum.count() == 403 assert Address |> Repo.all() |> Enum.count() == 403 end test "imports contract code" do + block_quantity = integer_to_quantity(1) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{id: 0, jsonrpc: "2.0", method: "eth_getBlockByNumber", params: ["0x1", true]} + ], + [] -> + {:ok, [res]} + end) + code = "0x608060405234801561001057600080fd5b50600436106100cf5760003560e01c806391ad27b41161008c57806398d5fdca1161006657806398d5fdca14610262578063a97e5c9314610280578063df5dd1a5146102dc578063eebd48b014610320576100cf565b806391ad27b4146101e457806391b7f5ed14610202578063955d14cd14610244576100cf565b80630aa6f2fe146100d457806320ba81ee1461011657806322a90082146101345780634c2c987c14610176578063764cbcd1146101985780637837efdc146101da575b600080fd5b610100600480360360208110156100ea57600080fd5b8101908080359060200190929190505050610353565b6040518082815260200191505060405180910390f35b61011e6103c4565b6040518082815260200191505060405180910390f35b6101606004803603602081101561014a57600080fd5b81019080803590602001909291905050506103ce565b6040518082815260200191505060405180910390f35b61017e61043f565b604051808215151515815260200191505060405180910390f35b6101c4600480360360208110156101ae57600080fd5b8101908080359060200190929190505050610456565b6040518082815260200191505060405180910390f35b6101e26104c7565b005b6101ec6104d2565b6040518082815260200191505060405180910390f35b61022e6004803603602081101561021857600080fd5b81019080803590602001909291905050506104dc565b6040518082815260200191505060405180910390f35b61024c6106a2565b6040518082815260200191505060405180910390f35b61026a6106ac565b6040518082815260200191505060405180910390f35b6102c26004803603602081101561029657600080fd5b81019080803573ffffffffffffffffffffffffffffffffffffffff1690602001909291905050506106b6565b604051808215151515815260200191505060405180910390f35b61031e600480360360208110156102f257600080fd5b81019080803573ffffffffffffffffffffffffffffffffffffffff1690602001909291905050506106d3565b005b61032861073d565b6040518085815260200184815260200183815260200182815260200194505050505060405180910390f35b600061035e336106b6565b6103b3576040517f08c379a0000000000000000000000000000000000000000000000000000000008152600401808060200182810382526030815260200180610b4f6030913960400191505060405180910390fd5b816004819055506004549050919050565b6000600454905090565b60006103d9336106b6565b61042e576040517f08c379a0000000000000000000000000000000000000000000000000000000008152600401808060200182810382526030815260200180610b4f6030913960400191505060405180910390fd5b816003819055506003549050919050565b6000600560009054906101000a900460ff16905090565b6000610461336106b6565b6104b6576040517f08c379a0000000000000000000000000000000000000000000000000000000008152600401808060200182810382526030815260200180610b4f6030913960400191505060405180910390fd5b816002819055506002549050919050565b6104d033610771565b565b6000600354905090565b60006104e7336106b6565b61053c576040517f08c379a0000000000000000000000000000000000000000000000000000000008152600401808060200182810382526030815260200180610b4f6030913960400191505060405180910390fd5b600082116105b2576040517f08c379a00000000000000000000000000000000000000000000000000000000081526004018080602001828103825260098152602001807f7072696365203c3d30000000000000000000000000000000000000000000000081525060200191505060405180910390fd5b6105ba6104d2565b6105c26106a2565b01421015610638576040517f08c379a00000000000000000000000000000000000000000000000000000000081526004018080602001828103825260148152602001807f54494d455f4c4f434b5f494e434f4d504c45544500000000000000000000000081525060200191505060405180910390fd5b610641826107cb565b5061064b42610456565b503373ffffffffffffffffffffffffffffffffffffffff167f95dce27040c59c8b1c445b284f81a3aaae6eecd7d08d5c7684faee64cdb514a1836040518082815260200191505060405180910390a2819050919050565b6000600254905090565b6000600154905090565b60006106cc82600061083c90919063ffffffff16565b9050919050565b6106dc336106b6565b610731576040517f08c379a0000000000000000000000000000000000000000000000000000000008152600401808060200182810382526030815260200180610b4f6030913960400191505060405180910390fd5b61073a8161091a565b50565b60008060008061074b6106ac565b6107536104d2565b61075b6103c4565b6107636106a2565b935093509350935090919293565b61078581600061097390919063ffffffff16565b8073ffffffffffffffffffffffffffffffffffffffff167f9c8e7d83025bef8a04c664b2f753f64b8814bdb7e27291d7e50935f18cc3c71260405160405180910390a250565b60006107d6336106b6565b61082b576040517f08c379a0000000000000000000000000000000000000000000000000000000008152600401808060200182810382526030815260200180610b4f6030913960400191505060405180910390fd5b816001819055506001549050919050565b60008073ffffffffffffffffffffffffffffffffffffffff168273ffffffffffffffffffffffffffffffffffffffff1614156108c3576040517f08c379a0000000000000000000000000000000000000000000000000000000008152600401808060200182810382526022815260200180610b2d6022913960400191505060405180910390fd5b8260000160008373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060009054906101000a900460ff16905092915050565b61092e816000610a3090919063ffffffff16565b8073ffffffffffffffffffffffffffffffffffffffff167e47706786c922d17b39285dc59d696bafea72c0b003d3841ae1202076f4c2e460405160405180910390a250565b61097d828261083c565b6109d2576040517f08c379a0000000000000000000000000000000000000000000000000000000008152600401808060200182810382526021815260200180610b0c6021913960400191505060405180910390fd5b60008260000160008373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060006101000a81548160ff0219169083151502179055505050565b610a3a828261083c565b15610aad576040517f08c379a000000000000000000000000000000000000000000000000000000000815260040180806020018281038252601f8152602001807f526f6c65733a206163636f756e7420616c72656164792068617320726f6c650081525060200191505060405180910390fd5b60018260000160008373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060006101000a81548160ff021916908315150217905550505056fe526f6c65733a206163636f756e7420646f6573206e6f74206861766520726f6c65526f6c65733a206163636f756e7420697320746865207a65726f20616464726573734f7261636c65526f6c653a2063616c6c657220646f6573206e6f74206861766520746865204f7261636c6520726f6c65a265627a7a72315820df30730da57a5061c487e0b37e84e80308fa443e2e80ee9117a13fa8149caf4164736f6c634300050b0032" @@ -169,11 +197,59 @@ defmodule Explorer.ChainSpec.Parity.ImporterTest do end test "imports coin balances without 0x" do + block_quantity = integer_to_quantity(1) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{id: 0, jsonrpc: "2.0", method: "eth_getBlockByNumber", params: ["0x1", true]} + ], + [] -> + {:ok, [res]} + end) + {:ok, %{address_coin_balances: address_coin_balances}} = Importer.import_genesis_accounts(@chain_classic_spec) assert Enum.count(address_coin_balances) == 8894 assert CoinBalance |> Repo.all() |> Enum.count() == 8894 + assert CoinBalanceDaily |> Repo.all() |> Enum.count() == 8894 assert Address |> Repo.all() |> Enum.count() == 8894 end end + + defp eth_block_number_fake_response(block_quantity) do + %{ + id: 0, + jsonrpc: "2.0", + result: %{ + "author" => "0x0000000000000000000000000000000000000000", + "difficulty" => "0x20000", + "extraData" => "0x", + "gasLimit" => "0x663be0", + "gasUsed" => "0x0", + "hash" => "0x5b28c1bfd3a15230c9a46b399cd0f9a6920d432e85381cc6a140b06e8410112f", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0x0000000000000000000000000000000000000000", + "number" => block_quantity, + "parentHash" => "0x0000000000000000000000000000000000000000000000000000000000000000", + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x80", + "0xb8410000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "size" => "0x215", + "stateRoot" => "0xfad4af258fd11939fae0c6c6eec9d340b1caac0b0196fd9a1bc3f489c5bf00b3", + "step" => "0", + "timestamp" => "0x0", + "totalDifficulty" => "0x20000", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + } + } + end end diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index ade78e9f64..158e11fe58 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -4719,6 +4719,8 @@ defmodule Explorer.ChainTest do block_one_day_ago = insert(:block, timestamp: yesterday, number: 49) insert(:fetched_balance, address_hash: address.hash, value: 1000, block_number: block.number) insert(:fetched_balance, address_hash: address.hash, value: 2000, block_number: block_one_day_ago.number) + insert(:fetched_balance_daily, address_hash: address.hash, value: 1000, day: noon) + insert(:fetched_balance_daily, address_hash: address.hash, value: 2000, day: yesterday) balances = Chain.address_to_balances_by_day(address.hash) @@ -4735,6 +4737,7 @@ defmodule Explorer.ChainTest do yesterday = Timex.shift(noon, days: -1) block_one_day_ago = insert(:block, timestamp: yesterday) insert(:fetched_balance, address_hash: address.hash, value: 1000, block_number: block_one_day_ago.number) + insert(:fetched_balance_daily, address_hash: address.hash, value: 1000, day: yesterday) balances = Chain.address_to_balances_by_day(address.hash) @@ -4754,6 +4757,7 @@ defmodule Explorer.ChainTest do block_past = insert(:block, timestamp: past, number: 2) insert(:fetched_balance, address_hash: address.hash, value: 0, block_number: block_past.number) + insert(:fetched_balance_daily, address_hash: address.hash, value: 0, day: today) [balance] = Chain.address_to_balances_by_day(address.hash) diff --git a/apps/explorer/test/support/factory.ex b/apps/explorer/test/support/factory.ex index 09420fc1c4..fa932c1e1d 100644 --- a/apps/explorer/test/support/factory.ex +++ b/apps/explorer/test/support/factory.ex @@ -16,6 +16,7 @@ defmodule Explorer.Factory do Address.CurrentTokenBalance, Address.TokenBalance, Address.CoinBalance, + Address.CoinBalanceDaily, Block, ContractMethod, Data, @@ -56,6 +57,13 @@ defmodule Explorer.Factory do } end + def unfetched_balance_daily_factory do + %CoinBalanceDaily{ + address_hash: address_hash(), + day: Timex.shift(Timex.now(), days: Enum.random(0..100) * -1) + } + end + def update_balance_value(%CoinBalance{address_hash: address_hash, block_number: block_number}, value) do Repo.update_all( from( @@ -71,6 +79,11 @@ defmodule Explorer.Factory do |> struct!(value: Enum.random(1..100_000)) end + def fetched_balance_daily_factory do + unfetched_balance_daily_factory() + |> struct!(value: Enum.random(1..100_000)) + end + def contract_address_factory do %Address{ hash: address_hash(), diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index c84e9acd48..1792b941ea 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -20,7 +20,6 @@ defmodule Indexer.Block.Fetcher do alias Indexer.Fetcher.{ BlockReward, CoinBalance, - CoinBalanceDaily, ContractCode, InternalTransaction, ReplacedTransaction, @@ -35,6 +34,7 @@ defmodule Indexer.Block.Fetcher do alias Indexer.Transform.{ AddressCoinBalances, + AddressCoinBalancesDaily, Addresses, AddressTokenBalances, MintTransfers, @@ -155,6 +155,14 @@ defmodule Indexer.Block.Fetcher do transactions_params: transactions_with_receipts } |> AddressCoinBalances.params_set(), + coin_balances_params_daily_set = + %{ + beneficiary_params: MapSet.to_list(beneficiary_params_set), + blocks_params: blocks, + logs_params: logs, + transactions_params: transactions_with_receipts + } + |> AddressCoinBalancesDaily.params_set(), beneficiaries_with_gas_payment <- beneficiary_params_set |> add_gas_payments(transactions_with_receipts, blocks) @@ -166,7 +174,7 @@ defmodule Indexer.Block.Fetcher do %{ addresses: %{params: addresses}, address_coin_balances: %{params: coin_balances_params_set}, - address_coin_balances_daily: %{params: coin_balances_params_set}, + address_coin_balances_daily: %{params: coin_balances_params_daily_set}, address_token_balances: %{params: address_token_balances}, blocks: %{params: blocks}, block_second_degree_relations: %{params: block_second_degree_relations_params}, @@ -248,18 +256,12 @@ defmodule Indexer.Block.Fetcher do def async_import_coin_balances(%{addresses: addresses}, %{ address_hash_to_fetched_balance_block_number: address_hash_to_block_number }) do - coin_balances_import_params = - addresses - |> Enum.map(fn %Address{hash: address_hash} -> - block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash)) - %{address_hash: address_hash, block_number: block_number} - end) - - coin_balances_import_params + addresses + |> Enum.map(fn %Address{hash: address_hash} -> + block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash)) + %{address_hash: address_hash, block_number: block_number} + end) |> CoinBalance.async_fetch_balances() - - coin_balances_import_params - |> CoinBalanceDaily.async_fetch_balances() end def async_import_coin_balances(_, _), do: :ok diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 0533484b7f..df4daa5837 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -178,7 +178,13 @@ defmodule Indexer.Block.Realtime.Fetcher do block_rewards: block_rewards } = options ) do - with {:balances, {:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}}} <- + with {:balances, + {:ok, + %{ + addresses_params: balances_addresses_params, + balances_params: balances_params, + balances_daily_params: balances_daily_params + }}} <- {:balances, balances(block_fetcher, %{ address_hash_to_block_number: address_hash_to_block_number, @@ -193,7 +199,8 @@ defmodule Indexer.Block.Realtime.Fetcher do |> put_in([:addresses, :params], balances_addresses_params) |> put_in([:blocks, :params, Access.all(), :consensus], true) |> put_in([:block_rewards], chain_import_block_rewards) - |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params), + |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params) + |> put_in([Access.key(:address_coin_balances_daily, %{}), :params], balances_daily_params), {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do async_import_remaining_block_data( imported, @@ -383,15 +390,26 @@ defmodule Indexer.Block.Realtime.Fetcher do importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at)) - block_number = Enum.at(params_list, 0)[:block_number] - - {:ok, %Blocks{blocks_params: blocks_params}} = - EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) - - block_timestamp = Enum.at(blocks_params, 0).timestamp - day = DateTime.to_date(block_timestamp) - - importable_balances_daily_params = Enum.map(params_list, &Map.put(&1, :day, day)) + block_numbers = + params_list + |> Enum.map(&Map.get(&1, :block_number)) + |> Enum.sort() + |> Enum.dedup() + + block_timestamp_map = + Enum.reduce(block_numbers, %{}, fn block_number, map -> + {:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + day = DateTime.to_date(timestamp) + Map.put(map, "#{block_number}", day) + end) + + importable_balances_daily_params = + Enum.map(params_list, fn param -> + day = Map.get(block_timestamp_map, "#{param.block_number}") + Map.put(param, :day, day) + end) {:ok, %{ diff --git a/apps/indexer/lib/indexer/fetcher/block_reward.ex b/apps/indexer/lib/indexer/fetcher/block_reward.ex index 2075c33d99..7bd933eb5f 100644 --- a/apps/indexer/lib/indexer/fetcher/block_reward.ex +++ b/apps/indexer/lib/indexer/fetcher/block_reward.ex @@ -20,8 +20,8 @@ defmodule Indexer.Fetcher.BlockReward do alias Explorer.Chain.Cache.Accounts alias Indexer.{BufferedTask, Tracer} alias Indexer.Fetcher.BlockReward.Supervisor, as: BlockRewardSupervisor - alias Indexer.Fetcher.{CoinBalance, CoinBalanceDaily} - alias Indexer.Transform.{AddressCoinBalances, Addresses} + alias Indexer.Fetcher.CoinBalance + alias Indexer.Transform.{AddressCoinBalances, AddressCoinBalancesDaily, Addresses} @behaviour BufferedTask @@ -135,7 +135,6 @@ defmodule Indexer.Fetcher.BlockReward do Accounts.drop(addresses) CoinBalance.async_fetch_balances(address_coin_balances) - CoinBalanceDaily.async_fetch_balances(address_coin_balances) retry_errors(errors) @@ -246,9 +245,13 @@ defmodule Indexer.Fetcher.BlockReward do addresses_params = Addresses.extract_addresses(%{block_reward_contract_beneficiaries: block_rewards_params}) address_coin_balances_params_set = AddressCoinBalances.params_set(%{beneficiary_params: block_rewards_params}) + address_coin_balances_daily_params_set = + AddressCoinBalancesDaily.params_set(%{beneficiary_params: block_rewards_params}) + Chain.import(%{ addresses: %{params: addresses_params}, address_coin_balances: %{params: address_coin_balances_params_set}, + address_coin_balances_daily: %{params: address_coin_balances_daily_params_set}, block_rewards: %{params: block_rewards_params} }) end diff --git a/apps/indexer/lib/indexer/fetcher/coin_balance.ex b/apps/indexer/lib/indexer/fetcher/coin_balance.ex index 06c4bf6fa3..5212729c2e 100644 --- a/apps/indexer/lib/indexer/fetcher/coin_balance.ex +++ b/apps/indexer/lib/indexer/fetcher/coin_balance.ex @@ -11,7 +11,7 @@ defmodule Indexer.Fetcher.CoinBalance do import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1] - alias EthereumJSONRPC.FetchedBalances + alias EthereumJSONRPC.{Blocks, FetchedBalances} alias Explorer.Chain alias Explorer.Chain.{Block, Hash} alias Explorer.Chain.Cache.Accounts @@ -141,11 +141,88 @@ defmodule Indexer.Fetcher.CoinBalance do importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at)) + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + block_numbers = + params_list + |> Enum.map(&Map.get(&1, :block_number)) + |> Enum.sort() + |> Enum.dedup() + + block_timestamp_map = + Enum.reduce(block_numbers, %{}, fn block_number, map -> + {:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + day = DateTime.to_date(timestamp) + Map.put(map, "#{block_number}", day) + end) + + importable_balances_daily_params = + params_list + |> Enum.map(fn balance_param -> + day = Map.get(block_timestamp_map, "#{balance_param.block_number}") + + incoming_balance_daily_param = %{ + address_hash: balance_param.address_hash, + day: day, + value: balance_param.value + } + + incoming_balance_daily_param + end) + addresses_params = balances_params_to_address_params(importable_balances_params) Chain.import(%{ addresses: %{params: addresses_params, with: :balance_changeset}, address_coin_balances: %{params: importable_balances_params}, + address_coin_balances_daily: %{params: importable_balances_daily_params}, + broadcast: broadcast_type + }) + end + + def import_fetched_daily_balances(%FetchedBalances{params_list: params_list}, broadcast_type \\ false) do + value_fetched_at = DateTime.utc_now() + + importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at)) + + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + block_numbers = + params_list + |> Enum.map(&Map.get(&1, :block_number)) + |> Enum.sort() + |> Enum.dedup() + + block_timestamp_map = + Enum.reduce(block_numbers, %{}, fn block_number, map -> + {:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + day = DateTime.to_date(timestamp) + Map.put(map, "#{block_number}", day) + end) + + importable_balances_daily_params = + params_list + |> Enum.map(fn balance_param -> + day = Map.get(block_timestamp_map, "#{balance_param.block_number}") + + incoming_balance_daily_param = %{ + address_hash: balance_param.address_hash, + day: day, + value: balance_param.value + } + + incoming_balance_daily_param + end) + + addresses_params = balances_params_to_address_params(importable_balances_params) + + Chain.import(%{ + addresses: %{params: addresses_params, with: :balance_changeset}, + address_coin_balances_daily: %{params: importable_balances_daily_params}, broadcast: broadcast_type }) end diff --git a/apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex b/apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex deleted file mode 100644 index f15f599a9b..0000000000 --- a/apps/indexer/lib/indexer/fetcher/coin_balance_daily.ex +++ /dev/null @@ -1,226 +0,0 @@ -defmodule Indexer.Fetcher.CoinBalanceDaily do - @moduledoc """ - Fetches `t:Explorer.Chain.Address.CoinBalanceDaily.t/0`. - """ - - use Indexer.Fetcher - use Spandex.Decorators - - require Logger - - import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1] - - alias EthereumJSONRPC.{Blocks, FetchedBalances} - alias Explorer.Chain - alias Explorer.Chain.Cache.Accounts - alias Explorer.Chain.Hash - alias Indexer.{BufferedTask, Tracer} - - @behaviour BufferedTask - - @defaults [ - flush_interval: :timer.seconds(3), - max_batch_size: 500, - max_concurrency: 4, - task_supervisor: Indexer.Fetcher.CoinBalanceDaily.TaskSupervisor, - metadata: [fetcher: :coin_balance_daily] - ] - - @doc """ - Asynchronously fetches balances for each address `hash` at the `day`. - """ - @spec async_fetch_balances([ - %{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()} - ]) :: :ok - def async_fetch_balances(balance_fields) when is_list(balance_fields) do - entries = Enum.map(balance_fields, &entry/1) - - BufferedTask.buffer(__MODULE__, entries) - end - - @doc false - # credo:disable-for-next-line Credo.Check.Design.DuplicatedCode - def child_spec([init_options, gen_server_options]) do - {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) - - unless state do - raise ArgumentError, - ":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> - "to allow for json_rpc calls when running." - end - - merged_init_options = - @defaults - |> Keyword.merge(mergeable_init_options) - |> Keyword.put(:state, state) - - Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__) - end - - @impl BufferedTask - def init(initial, reducer, _) do - {:ok, final} = - Chain.stream_unfetched_balances(initial, fn address_fields, acc -> - address_fields - |> entry() - |> reducer.(acc) - end) - - final - end - - @impl BufferedTask - @decorate trace(name: "fetch", resource: "Indexer.Fetcher.CoinBalanceDaily.run/2", service: :indexer, tracer: Tracer) - def run(entries, json_rpc_named_arguments) do - # the same address may be used more than once in the same block, but we only want one `Balance` for a given - # `{address, block}`, so take unique params only - unique_entries = Enum.uniq(entries) - - unique_filtered_entries = - Enum.filter(unique_entries, fn {_hash, block_number} -> - block_number > first_block_to_index() - end) - - unique_entry_count = Enum.count(unique_filtered_entries) - Logger.metadata(count: unique_entry_count) - - Logger.debug(fn -> "fetching" end) - - unique_filtered_entries - |> Enum.map(&entry_to_params/1) - |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) - |> case do - {:ok, fetched_balances} -> - run_fetched_balances(fetched_balances, unique_entries) - - {:error, reason} -> - Logger.error( - fn -> - ["failed to fetch: ", inspect(reason)] - end, - error_count: unique_entry_count - ) - - {:retry, unique_entries} - end - end - - defp first_block_to_index do - string_value = Application.get_env(:indexer, :first_block) - - case Integer.parse(string_value) do - {integer, ""} -> integer - _ -> 0 - end - end - - defp entry_to_params({address_hash_bytes, block_number}) when is_integer(block_number) do - {:ok, address_hash} = Hash.Address.cast(address_hash_bytes) - %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address_hash)} - end - - defp entry(%{address_hash: %Hash{bytes: address_hash_bytes}, block_number: block_number}) do - {address_hash_bytes, block_number} - end - - def balances_params_to_address_params(balances_params) do - balances_params - |> Enum.group_by(fn %{address_hash: address_hash} -> address_hash end) - |> Map.values() - |> Stream.map(&Enum.max_by(&1, fn %{block_number: block_number} -> block_number end)) - |> Enum.map(fn %{address_hash: address_hash, block_number: block_number, value: value} -> - %{ - hash: address_hash, - fetched_coin_balance_block_number: block_number, - fetched_coin_balance: value - } - end) - end - - def import_fetched_balances(%FetchedBalances{params_list: params_list}, broadcast_type \\ false) do - day = - if Enum.empty?(params_list) do - nil - else - json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) - block_number = Enum.at(params_list, 0)[:block_number] - - {:ok, %Blocks{blocks_params: blocks_params}} = - EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) - - block_timestamp = Enum.at(blocks_params, 0).timestamp - - DateTime.to_date(block_timestamp) - end - - importable_balances_daily_params = - Enum.map(params_list, fn param -> - param - |> Map.put(:day, day) - end) - - addresses_params = balances_params_to_address_params(importable_balances_daily_params) - - Chain.import(%{ - addresses: %{params: addresses_params, with: :balance_changeset}, - address_coin_balances_daily: %{params: importable_balances_daily_params}, - broadcast: broadcast_type - }) - end - - defp run_fetched_balances(%FetchedBalances{errors: errors} = fetched_balances, _) do - {:ok, imported} = import_fetched_balances(fetched_balances) - - Accounts.drop(imported[:addresses]) - - retry(errors) - end - - defp retry([]), do: :ok - - defp retry(errors) when is_list(errors) do - retried_entries = fetched_balances_errors_to_entries(errors) - - Logger.error( - fn -> - [ - "failed to fetch: ", - fetched_balance_errors_to_iodata(errors) - ] - end, - error_count: Enum.count(retried_entries) - ) - - {:retry, retried_entries} - end - - defp fetched_balances_errors_to_entries(errors) when is_list(errors) do - Enum.map(errors, &fetched_balance_error_to_entry/1) - end - - defp fetched_balance_error_to_entry(%{data: %{block_quantity: block_quantity, day: day, hash_data: hash_data}}) - when is_binary(block_quantity) and is_binary(hash_data) do - {:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data) - block_number = quantity_to_integer(block_quantity) - {address_hash_bytes, block_number, day} - end - - defp fetched_balance_errors_to_iodata(errors) when is_list(errors) do - fetched_balance_errors_to_iodata(errors, []) - end - - defp fetched_balance_errors_to_iodata([], iodata), do: iodata - - defp fetched_balance_errors_to_iodata([error | errors], iodata) do - fetched_balance_errors_to_iodata(errors, [iodata | fetched_balance_error_to_iodata(error)]) - end - - defp fetched_balance_error_to_iodata(%{ - code: code, - message: message, - data: %{day: day, hash_data: hash_data} - }) - when is_integer(code) and is_binary(message) and is_binary(day) and is_binary(hash_data) do - [hash_data, "@", day, ": (", to_string(code), ") ", message, ?\n] - end -end diff --git a/apps/indexer/lib/indexer/fetcher/coin_balance_on_demand.ex b/apps/indexer/lib/indexer/fetcher/coin_balance_on_demand.ex index 17014d2ede..3f68f11cb6 100644 --- a/apps/indexer/lib/indexer/fetcher/coin_balance_on_demand.ex +++ b/apps/indexer/lib/indexer/fetcher/coin_balance_on_demand.ex @@ -15,10 +15,10 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do import Ecto.Query, only: [from: 2] import EthereumJSONRPC, only: [integer_to_quantity: 1] - alias EthereumJSONRPC.FetchedBalances + alias EthereumJSONRPC.{FetchedBalances} alias Explorer.{Chain, Repo} alias Explorer.Chain.Address - alias Explorer.Chain.Address.CoinBalance + alias Explorer.Chain.Address.{CoinBalance, CoinBalanceDaily} alias Explorer.Chain.Cache.{Accounts, BlockNumber} alias Explorer.Counters.AverageBlockTime alias Indexer.Fetcher.CoinBalance, as: CoinBalanceFetcher @@ -86,6 +86,12 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do {:noreply, state} end + def handle_cast({:fetch_and_import_daily_balances, block_number, address}, state) do + fetch_and_import_daily_balances(block_number, address, state.json_rpc_named_arguments) + + {:noreply, state} + end + ## Implementation defp do_trigger_fetch(%Address{fetched_coin_balance_block_number: nil} = address, latest_block_number, _) do @@ -95,6 +101,14 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do end defp do_trigger_fetch(address, latest_block_number, stale_balance_window) do + latest_by_day = + from( + cbd in CoinBalanceDaily, + where: cbd.address_hash == ^address.hash, + order_by: [desc: :day], + limit: 1 + ) + latest = from( cb in CoinBalance, @@ -105,15 +119,28 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do limit: 1 ) + do_trigger_balance_fetch_query(address, latest_block_number, stale_balance_window, latest, latest_by_day) + end + + defp do_trigger_balance_fetch_query( + address, + latest_block_number, + stale_balance_window, + query_balances, + query_balances_daily + ) do if address.fetched_coin_balance_block_number < stale_balance_window do + do_trigger_balance_daily_fetch_query(address, latest_block_number, query_balances_daily) GenServer.cast(__MODULE__, {:fetch_and_update, latest_block_number, address}) {:stale, latest_block_number} else - case Repo.one(latest) do + case Repo.one(query_balances) do nil -> # There is no recent coin balance to fetch, so we check to see how old the # balance is on the address. If it is too old, we check again, just to be safe. + do_trigger_balance_daily_fetch_query(address, latest_block_number, query_balances_daily) + :current %CoinBalance{value_fetched_at: nil, block_number: block_number} -> @@ -122,11 +149,19 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do {:pending, block_number} %CoinBalance{} -> + do_trigger_balance_daily_fetch_query(address, latest_block_number, query_balances_daily) + :current end end end + defp do_trigger_balance_daily_fetch_query(address, latest_block_number, query) do + if Repo.one(query) == nil do + GenServer.cast(__MODULE__, {:fetch_and_import_daily_balances, latest_block_number, address}) + end + end + defp fetch_and_import(block_number, address, json_rpc_named_arguments) do case fetch_balances(block_number, address, json_rpc_named_arguments) do {:ok, fetched_balances} -> do_import(fetched_balances) @@ -134,6 +169,13 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do end end + defp fetch_and_import_daily_balances(block_number, address, json_rpc_named_arguments) do + case fetch_balances(block_number, address, json_rpc_named_arguments) do + {:ok, fetched_balances} -> do_import_daily_balances(fetched_balances) + _ -> :ok + end + end + defp fetch_and_update(block_number, address, json_rpc_named_arguments) do case fetch_balances(block_number, address, json_rpc_named_arguments) do {:ok, %{params_list: []}} -> @@ -165,6 +207,13 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do end end + defp do_import_daily_balances(%FetchedBalances{} = fetched_balances) do + case CoinBalanceFetcher.import_fetched_daily_balances(fetched_balances, :on_demand) do + {:ok, %{addresses: [address]}} -> {:ok, address} + _ -> :error + end + end + defp latest_block_number do BlockNumber.get_max() end diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index b5f6b96f2e..f22b6de487 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -11,7 +11,6 @@ defmodule Indexer.Supervisor do alias Indexer.Fetcher.{ BlockReward, CoinBalance, - CoinBalanceDaily, CoinBalanceOnDemand, ContractCode, InternalTransaction, @@ -110,8 +109,6 @@ defmodule Indexer.Supervisor do [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {CoinBalance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, - {CoinBalanceDaily.Supervisor, - [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {Token.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {TokenInstance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, diff --git a/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex b/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex index de8c9832a5..8382570ebc 100644 --- a/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex +++ b/apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex @@ -10,18 +10,29 @@ defmodule Indexer.Transform.AddressCoinBalancesDaily do end defp reducer({:beneficiary_params, beneficiary_params}, acc) when is_list(beneficiary_params) do + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + block_numbers = + beneficiary_params + |> Enum.map(&Map.get(&1, :block_number)) + |> Enum.sort() + |> Enum.dedup() + + block_timestamp_map = + Enum.reduce(block_numbers, %{}, fn block_number, map -> + {:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + day = DateTime.to_date(timestamp) + Map.put(map, "#{block_number}", day) + end) + Enum.into(beneficiary_params, acc, fn %{ address_hash: address_hash, block_number: block_number } when is_binary(address_hash) and is_integer(block_number) -> - json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) - - {:ok, %Blocks{blocks_params: blocks_params}} = - EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) - - block_timestamp = Enum.at(blocks_params, 0).timestamp - day = DateTime.to_date(block_timestamp) + day = Map.get(block_timestamp_map, "#{block_number}") %{address_hash: address_hash, day: day} end) @@ -43,17 +54,28 @@ defmodule Indexer.Transform.AddressCoinBalancesDaily do defp reducer({:logs_params, logs_params}, acc) when is_list(logs_params) do # a log MUST have address_hash and block_number + json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) + + block_numbers = + logs_params + |> Enum.map(&Map.get(&1, :block_number)) + |> Enum.sort() + |> Enum.dedup() + + block_timestamp_map = + Enum.reduce(block_numbers, %{}, fn block_number, map -> + {:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} = + EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) + + day = DateTime.to_date(timestamp) + Map.put(map, "#{block_number}", day) + end) + logs_params |> Enum.into(acc, fn %{address_hash: address_hash, block_number: block_number} when is_binary(address_hash) and is_integer(block_number) -> - json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) - - {:ok, %Blocks{blocks_params: blocks_params}} = - EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) - - block_timestamp = Enum.at(blocks_params, 0).timestamp - day = DateTime.to_date(block_timestamp) + day = Map.get(block_timestamp_map, "#{block_number}") %{address_hash: address_hash, day: day} %{type: "pending"} -> @@ -86,10 +108,9 @@ defmodule Indexer.Transform.AddressCoinBalancesDaily do %{type: "create", created_contract_address_hash: address_hash} when is_binary(address_hash) -> json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) - {:ok, %Blocks{blocks_params: blocks_params}} = + {:ok, %Blocks{blocks_params: [%{timestamp: block_timestamp}]}} = EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) - block_timestamp = Enum.at(blocks_params, 0).timestamp day = DateTime.to_date(block_timestamp) MapSet.put(acc, %{address_hash: address_hash, day: day}) @@ -97,10 +118,9 @@ defmodule Indexer.Transform.AddressCoinBalancesDaily do when is_binary(from_address_hash) and is_binary(to_address_hash) -> json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) - {:ok, %Blocks{blocks_params: blocks_params}} = + {:ok, %Blocks{blocks_params: [%{timestamp: block_timestamp}]}} = EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) - block_timestamp = Enum.at(blocks_params, 0).timestamp day = DateTime.to_date(block_timestamp) acc @@ -117,10 +137,9 @@ defmodule Indexer.Transform.AddressCoinBalancesDaily do # a transaction MUST have a `from_address_hash` json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) - {:ok, %Blocks{blocks_params: blocks_params}} = + {:ok, %Blocks{blocks_params: [%{timestamp: block_timestamp}]}} = EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) - block_timestamp = Enum.at(blocks_params, 0).timestamp day = DateTime.to_date(block_timestamp) acc = MapSet.put(initial, %{address_hash: from_address_hash, day: day}) diff --git a/apps/indexer/test/indexer/block/fetcher_test.exs b/apps/indexer/test/indexer/block/fetcher_test.exs index 4bd04dc6dc..f3b1bdd131 100644 --- a/apps/indexer/test/indexer/block/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/fetcher_test.exs @@ -79,6 +79,8 @@ defmodule Indexer.Block.FetcherTest do block_quantity = integer_to_quantity(block_number) miner_hash = "0x0000000000000000000000000000000000000000" + res = eth_block_number_fake_response(block_quantity) + case Keyword.fetch!(json_rpc_named_arguments, :variant) do EthereumJSONRPC.Parity -> EthereumJSONRPC.Mox @@ -137,6 +139,17 @@ defmodule Indexer.Block.FetcherTest do {:ok, [%{id: id, result: []}]} end end) + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) EthereumJSONRPC.Geth -> EthereumJSONRPC.Mox @@ -381,10 +394,78 @@ defmodule Indexer.Block.FetcherTest do end) # async requests need to be grouped in one expect because the order is non-deterministic while multiple expect # calls on the same name/arity are used in order - |> expect(:json_rpc, 5, fn json, _options -> + |> expect(:json_rpc, 11, fn json, _options -> [request] = json case request do + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } -> + {:ok, + [ + %{ + id: 0, + jsonrpc: "2.0", + result: %{ + "author" => "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca", + "difficulty" => "0xfffffffffffffffffffffffffffffffe", + "extraData" => "0xd5830108048650617269747986312e32322e31826c69", + "gasLimit" => "0x69fe20", + "gasUsed" => "0xc512", + "hash" => "0xf6b4b8c88df3ebd252ec476328334dc026cf66606a84fb769b3d3cbccc8471bd", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000200000000000000000000020000000000000000200000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca", + "number" => "0x25", + "parentHash" => "0xc37bbad7057945d1bf128c1ff009fb1ad632110bf6a000aac025a80f7766b66e", + "receiptsRoot" => "0xd300311aab7dcc98c05ac3f1893629b2c9082c189a0a0c76f4f63e292ac419d5", + "sealFields" => [ + "0x84120a71de", + "0xb841fcdb570511ec61edda93849bb7c6b3232af60feb2ea74e4035f0143ab66dfdd00f67eb3eda1adddbb6b572db1e0abd39ce00f9b3ccacb9f47973279ff306fe5401" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "fcdb570511ec61edda93849bb7c6b3232af60feb2ea74e4035f0143ab66dfdd00f67eb3eda1adddbb6b572db1e0abd39ce00f9b3ccacb9f47973279ff306fe5401", + "size" => "0x2cf", + "stateRoot" => "0x2cd84079b0d0c267ed387e3895fd1c1dc21ff82717beb1132adac64276886e19", + "step" => "302674398", + "timestamp" => "0x5a343956", + "totalDifficulty" => "0x24ffffffffffffffffffffffffedf78dfd", + "transactions" => [ + %{ + "blockHash" => "0xf6b4b8c88df3ebd252ec476328334dc026cf66606a84fb769b3d3cbccc8471bd", + "blockNumber" => "0x25", + "chainId" => "0x4d", + "condition" => nil, + "creates" => nil, + "from" => from_address_hash, + "gas" => "0x47b760", + "gasPrice" => "0x174876e800", + "hash" => transaction_hash, + "input" => "0x10855269000000000000000000000000862d67cb0773ee3f8ce7ea89b328ffea861ab3ef", + "nonce" => "0x4", + "publicKey" => + "0xe5d196ad4ceada719d9e592f7166d0c75700f6eab2e3c3de34ba751ea786527cb3f6eb96ad9fdfdb9989ff572df50f1c42ef800af9c5207a38b929aff969b5c9", + "r" => "0xa7f8f45cce375bb7af8750416e1b03e0473f93c256da2285d1134fc97a700e01", + "raw" => + "0xf88a0485174876e8008347b760948bf38d4764929064f2d4d3a56520a76ab3df415b80a410855269000000000000000000000000862d67cb0773ee3f8ce7ea89b328ffea861ab3ef81bea0a7f8f45cce375bb7af8750416e1b03e0473f93c256da2285d1134fc97a700e01a01f87a076f13824f4be8963e3dffd7300dae64d5f23c9a062af0c6ead347c135f", + "s" => "0x1f87a076f13824f4be8963e3dffd7300dae64d5f23c9a062af0c6ead347c135f", + "standardV" => "0x1", + "to" => to_address_hash, + "transactionIndex" => "0x0", + "v" => "0xbe", + "value" => "0x0" + } + ], + "transactionsRoot" => "0x68e314a05495f390f9cd0c36267159522e5450d2adf254a74567b452e767bf34", + "uncles" => [] + } + } + ]} + %{id: id, method: "eth_getBalance", params: [^to_address_hash, ^block_quantity]} -> {:ok, [%{id: id, jsonrpc: "2.0", result: "0x1"}]} @@ -662,6 +743,22 @@ defmodule Indexer.Block.FetcherTest do } %{id: id, method: "trace_block"} -> + block_quantity = integer_to_quantity(block_number) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) + %{ id: id, result: [ @@ -742,4 +839,40 @@ defmodule Indexer.Block.FetcherTest do counts.buffer == 0 and counts.tasks == 0 end) end + + defp eth_block_number_fake_response(block_quantity) do + %{ + id: 0, + jsonrpc: "2.0", + result: %{ + "author" => "0x0000000000000000000000000000000000000000", + "difficulty" => "0x20000", + "extraData" => "0x", + "gasLimit" => "0x663be0", + "gasUsed" => "0x0", + "hash" => "0x5b28c1bfd3a15230c9a46b399cd0f9a6920d432e85381cc6a140b06e8410112f", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0x0000000000000000000000000000000000000000", + "number" => block_quantity, + "parentHash" => "0x0000000000000000000000000000000000000000000000000000000000000000", + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x80", + "0xb8410000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "size" => "0x215", + "stateRoot" => "0xfad4af258fd11939fae0c6c6eec9d340b1caac0b0196fd9a1bc3f489c5bf00b3", + "step" => "0", + "timestamp" => "0x0", + "totalDifficulty" => "0x20000", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + } + } + end end diff --git a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs index f222805d26..c4d1e5dc96 100644 --- a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs @@ -205,7 +205,7 @@ defmodule Indexer.Block.Realtime.FetcherTest do } ]} end) - |> expect(:json_rpc, 2, fn + |> expect(:json_rpc, 5, fn [ %{id: 0, jsonrpc: "2.0", method: "trace_block", params: ["0x3C365F"]}, %{id: 1, jsonrpc: "2.0", method: "trace_block", params: ["0x3C3660"]} @@ -217,6 +217,124 @@ defmodule Indexer.Block.Realtime.FetcherTest do %{id: 1, jsonrpc: "2.0", result: []} ]} + [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: ["0x3C365F", true] + } + ], + _ -> + {:ok, + [ + %{ + id: 0, + jsonrpc: "2.0", + result: %{ + "author" => "0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", + "difficulty" => "0xfffffffffffffffffffffffffffffffe", + "extraData" => "0xd583010b088650617269747986312e32372e32826c69", + "gasLimit" => "0x7a1200", + "gasUsed" => "0x2886e", + "hash" => "0xa4ec735cabe1510b5ae081b30f17222580b4588dbec52830529753a688b046cc", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", + "number" => "0x3c365f", + "parentHash" => "0x57f6d66e07488defccd5216c4d2968dd6afd3bd32415e284de3b02af6535e8dc", + "receiptsRoot" => "0x111be72e682cea9c93e02f1ef503fb64aa821b2ef510fd9177c49b37d0af98b5", + "sealFields" => [ + "0x841246c63f", + "0xb841ba3d11db672fd7893d1b7906275fa7c4c7f4fbcc8fa29eab0331480332361516545ef10a36d800ad2be2b449dde8d5703125156a9cf8a035f5a8623463e051b700" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "ba3d11db672fd7893d1b7906275fa7c4c7f4fbcc8fa29eab0331480332361516545ef10a36d800ad2be2b449dde8d5703125156a9cf8a035f5a8623463e051b700", + "size" => "0x33e", + "stateRoot" => "0x7f73f5fb9f891213b671356126c31e9795d038844392c7aa8800ed4f52307209", + "step" => "306628159", + "timestamp" => "0x5b61df3b", + "totalDifficulty" => "0x3c365effffffffffffffffffffffffed7f0362", + "transactions" => [ + %{ + "blockHash" => "0xa4ec735cabe1510b5ae081b30f17222580b4588dbec52830529753a688b046cc", + "blockNumber" => "0x3c365f", + "chainId" => "0x63", + "condition" => nil, + "creates" => nil, + "from" => "0x40b18103537c0f15d5e137dd8ddd019b84949d16", + "gas" => "0x3d9c5", + "gasPrice" => "0x3b9aca00", + "hash" => "0xd3937e70fab3fb2bfe8feefac36815408bf07de3b9e09fe81114b9a6b17f55c8", + "input" => + "0x8841ac11000000000000000000000000000000000000000000000000000000000000006c000000000000000000000000000000000000000000000000000000000000000500000000000000000000000000000000000000000000000000000000000000050000000000000000000000000000000000000000000000000000000000000005", + "nonce" => "0x65b", + "publicKey" => + "0x89c2123ed4b5d141cf1f4b6f5f3d754418f03aea2e870a1c50888d94bf5531f74237e2fea72d0bc198ef213272b62c6869615720757255e6cba087f9db6e759f", + "r" => "0x55a1a93541d7f782f97f6699437bb60fa4606d63760b30c1ee317e648f93995", + "raw" => + "0xf8f582065b843b9aca008303d9c594698bf6943bab687b2756394624aa183f434f65da8901158e4f216242a000b8848841ac11000000000000000000000000000000000000000000000000000000000000006c00000000000000000000000000000000000000000000000000000000000000050000000000000000000000000000000000000000000000000000000000000005000000000000000000000000000000000000000000000000000000000000000581eaa0055a1a93541d7f782f97f6699437bb60fa4606d63760b30c1ee317e648f93995a06affd4da5eca84fbca2b016c980f861e0af1f8d6535e2fe29d8f96dc0ce358f7", + "s" => "0x6affd4da5eca84fbca2b016c980f861e0af1f8d6535e2fe29d8f96dc0ce358f7", + "standardV" => "0x1", + "to" => "0x698bf6943bab687b2756394624aa183f434f65da", + "transactionIndex" => "0x0", + "v" => "0xea", + "value" => "0x1158e4f216242a000" + } + ], + "transactionsRoot" => "0xd7c39a93eafe0bdcbd1324c13dcd674bed8c9fa8adbf8f95bf6a59788985da6f", + "uncles" => ["0xa4ec735cabe1510b5ae081b30f17222580b4588dbec52830529753a688b046cd"] + } + } + ]} + + [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: ["0x3C3660", true] + } + ], + _ -> + {:ok, + [ + %{ + id: 0, + jsonrpc: "2.0", + result: %{ + "author" => "0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", + "difficulty" => "0xfffffffffffffffffffffffffffffffe", + "extraData" => "0xd583010a068650617269747986312e32362e32826c69", + "gasLimit" => "0x7a1200", + "gasUsed" => "0x0", + "hash" => "0xfb483e511d316fa4072694da3f7abc94b06286406af45061e5e681395bdc6815", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", + "number" => "0x3c3660", + "parentHash" => "0xa4ec735cabe1510b5ae081b30f17222580b4588dbec52830529753a688b046cc", + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x841246c640", + "0xb84114db3fd7526b7ea3635f5c85c30dd8a645453aa2f8afe5fd33fe0ec663c9c7b653b0fb5d8dc7d0b809674fa9dca9887d1636a586bf62191da22255eb068bf20800" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "14db3fd7526b7ea3635f5c85c30dd8a645453aa2f8afe5fd33fe0ec663c9c7b653b0fb5d8dc7d0b809674fa9dca9887d1636a586bf62191da22255eb068bf20800", + "size" => "0x243", + "stateRoot" => "0x3174c461989e9f99e08fa9b4ffb8bce8d9a281c8fc9f80694bb9d3acd4f15559", + "step" => "306628160", + "timestamp" => "0x5b61df40", + "totalDifficulty" => "0x3c365fffffffffffffffffffffffffed7f0360", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + } + } + ]} + [ %{ id: 0, diff --git a/apps/indexer/test/indexer/fetcher/block_reward_test.exs b/apps/indexer/test/indexer/fetcher/block_reward_test.exs index 85d31b6d05..3de7117357 100644 --- a/apps/indexer/test/indexer/fetcher/block_reward_test.exs +++ b/apps/indexer/test/indexer/fetcher/block_reward_test.exs @@ -117,6 +117,21 @@ defmodule Indexer.Fetcher.BlockRewardTest do } end) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) + assert count(Chain.Block.Reward) == 0 parent = self() @@ -190,6 +205,21 @@ defmodule Indexer.Fetcher.BlockRewardTest do } end) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) + parent = self() pid = @@ -320,6 +350,21 @@ defmodule Indexer.Fetcher.BlockRewardTest do } end) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) + assert count(Chain.Block.Reward) == 0 assert count(Chain.Address.CoinBalance) == 0 @@ -408,6 +453,21 @@ defmodule Indexer.Fetcher.BlockRewardTest do } end) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) + assert count(Chain.Block.Reward) == 0 assert count(Chain.Address.CoinBalance) == 0 @@ -486,6 +546,21 @@ defmodule Indexer.Fetcher.BlockRewardTest do } end) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) + assert count(Chain.Block.Reward) == 1 assert count(Chain.Address.CoinBalance) == 1 @@ -625,6 +700,21 @@ defmodule Indexer.Fetcher.BlockRewardTest do } end) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) + assert count(Chain.Block.Reward) == 0 assert count(Chain.Address.CoinBalance) == 0 @@ -684,4 +774,40 @@ defmodule Indexer.Fetcher.BlockRewardTest do do_wait_until(parent, ref, producer) end end + + defp eth_block_number_fake_response(block_quantity) do + %{ + id: 0, + jsonrpc: "2.0", + result: %{ + "author" => "0x0000000000000000000000000000000000000000", + "difficulty" => "0x20000", + "extraData" => "0x", + "gasLimit" => "0x663be0", + "gasUsed" => "0x0", + "hash" => "0x5b28c1bfd3a15230c9a46b399cd0f9a6920d432e85381cc6a140b06e8410112f", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0x0000000000000000000000000000000000000000", + "number" => block_quantity, + "parentHash" => "0x0000000000000000000000000000000000000000000000000000000000000000", + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x80", + "0xb8410000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "size" => "0x215", + "stateRoot" => "0xfad4af258fd11939fae0c6c6eec9d340b1caac0b0196fd9a1bc3f489c5bf00b3", + "step" => "0", + "timestamp" => "0x0", + "totalDifficulty" => "0x20000", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + } + } + end end diff --git a/apps/indexer/test/indexer/fetcher/coin_balance_on_demand_test.exs b/apps/indexer/test/indexer/fetcher/coin_balance_on_demand_test.exs index 8f0ae262e4..bab4f54da2 100644 --- a/apps/indexer/test/indexer/fetcher/coin_balance_on_demand_test.exs +++ b/apps/indexer/test/indexer/fetcher/coin_balance_on_demand_test.exs @@ -117,6 +117,21 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemandTest do {:ok, [%{id: id, jsonrpc: "2.0", result: "0x02"}]} end) + res = eth_block_number_fake_response("0x65") + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: ["0x65", true] + } + ], + _ -> + {:ok, [res]} + end) + assert CoinBalanceOnDemand.trigger_fetch(address) == {:stale, 101} {:ok, expected_wei} = Wei.cast(2) @@ -144,6 +159,20 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemandTest do {:ok, [%{id: id, jsonrpc: "2.0", result: "0x02"}]} end) + EthereumJSONRPC.Mox + |> expect(:json_rpc, 1, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: ["0x66", true] + } + ], + _ -> + res = eth_block_number_fake_response("0x66") + {:ok, [res]} + end) + assert CoinBalanceOnDemand.trigger_fetch(address) == {:pending, 102} {:ok, expected_wei} = Wei.cast(2) @@ -154,4 +183,40 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemandTest do ) end end + + defp eth_block_number_fake_response(block_quantity) do + %{ + id: 0, + jsonrpc: "2.0", + result: %{ + "author" => "0x0000000000000000000000000000000000000000", + "difficulty" => "0x20000", + "extraData" => "0x", + "gasLimit" => "0x663be0", + "gasUsed" => "0x0", + "hash" => "0x5b28c1bfd3a15230c9a46b399cd0f9a6920d432e85381cc6a140b06e8410112f", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0x0000000000000000000000000000000000000000", + "number" => block_quantity, + "parentHash" => "0x0000000000000000000000000000000000000000000000000000000000000000", + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x80", + "0xb8410000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "size" => "0x215", + "stateRoot" => "0xfad4af258fd11939fae0c6c6eec9d340b1caac0b0196fd9a1bc3f489c5bf00b3", + "step" => "0", + "timestamp" => "0x0", + "totalDifficulty" => "0x20000", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + } + } + end end diff --git a/apps/indexer/test/indexer/fetcher/coin_balance_test.exs b/apps/indexer/test/indexer/fetcher/coin_balance_test.exs index 94fbf9f774..e61d54d6f8 100644 --- a/apps/indexer/test/indexer/fetcher/coin_balance_test.exs +++ b/apps/indexer/test/indexer/fetcher/coin_balance_test.exs @@ -58,6 +58,21 @@ defmodule Indexer.Fetcher.CoinBalanceTest do _options -> {:ok, [%{id: id, result: integer_to_quantity(fetched_balance)}]} end) + + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) end {:ok, miner_hash} = Hash.Address.cast(miner_hash_data) @@ -114,6 +129,21 @@ defmodule Indexer.Fetcher.CoinBalanceTest do _options -> {:ok, [%{id: id, result: integer_to_quantity(fetched_balance)}]} end) + + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) end {:ok, miner_hash} = Hash.Address.cast(miner_hash_data) @@ -178,6 +208,21 @@ defmodule Indexer.Fetcher.CoinBalanceTest do _options -> {:ok, [%{id: id, result: integer_to_quantity(fetched_balance)}]} end) + + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) end CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) @@ -254,6 +299,25 @@ defmodule Indexer.Fetcher.CoinBalanceTest do {:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data) entries = Enum.map(block_quantities, &{address_hash_bytes, quantity_to_integer(&1)}) + res1 = eth_block_number_fake_response("0x1") + res2 = eth_block_number_fake_response("0x2") + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{id: 0, jsonrpc: "2.0", method: "eth_getBlockByNumber", params: ["0x1", true]} + ], + _ -> + {:ok, [res1]} + end) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{id: 0, jsonrpc: "2.0", method: "eth_getBlockByNumber", params: ["0x2", true]} + ], + _ -> + {:ok, [res2]} + end) + case CoinBalance.run(entries, json_rpc_named_arguments) do :ok -> balances = Repo.all(from(balance in Address.CoinBalance, where: balance.address_hash == ^hash_data)) @@ -314,16 +378,33 @@ defmodule Indexer.Fetcher.CoinBalanceTest do test "retries none if all imported and no fetch errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do %Hash{bytes: address_hash_bytes} = address_hash() - entries = [{address_hash_bytes, block_number()}] + block_number = block_number() + entries = [{address_hash_bytes, block_number}] expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "eth_getBalance", params: [_, _]}], _ -> {:ok, [%{id: id, result: "0x1"}]} end) + block_quantity = integer_to_quantity(block_number) + res = eth_block_number_fake_response(block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [^block_quantity, true] + } + ], + _ -> + {:ok, [res]} + end) + assert :ok = CoinBalance.run(entries, json_rpc_named_arguments) end - test "retries retries fetch errors if all imported", %{json_rpc_named_arguments: json_rpc_named_arguments} do + test "retries fetch errors if all imported", %{json_rpc_named_arguments: json_rpc_named_arguments} do %Hash{bytes: address_hash_bytes} = address_hash() bad_block_number = block_number() good_block_number = block_number() @@ -359,6 +440,22 @@ defmodule Indexer.Fetcher.CoinBalanceTest do {:ok, responses} end) + bad_block_quantity = integer_to_quantity(bad_block_number) + res_bad = eth_block_number_fake_response(bad_block_quantity) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_getBlockByNumber", + params: [bad_block_quantity, true] + } + ], + [] -> + {:ok, [res_bad]} + end) + assert {:retry, [{^address_hash_bytes, ^bad_block_number}]} = CoinBalance.run( [{address_hash_bytes, good_block_number}, {address_hash_bytes, bad_block_number}], @@ -374,4 +471,40 @@ defmodule Indexer.Fetcher.CoinBalanceTest do Process.sleep(100) wait(producer) end + + defp eth_block_number_fake_response(block_quantity) do + %{ + id: 0, + jsonrpc: "2.0", + result: %{ + "author" => "0x0000000000000000000000000000000000000000", + "difficulty" => "0x20000", + "extraData" => "0x", + "gasLimit" => "0x663be0", + "gasUsed" => "0x0", + "hash" => "0x5b28c1bfd3a15230c9a46b399cd0f9a6920d432e85381cc6a140b06e8410112f", + "logsBloom" => + "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner" => "0x0000000000000000000000000000000000000000", + "number" => block_quantity, + "parentHash" => "0x0000000000000000000000000000000000000000000000000000000000000000", + "receiptsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sealFields" => [ + "0x80", + "0xb8410000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + ], + "sha3Uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "signature" => + "0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "size" => "0x215", + "stateRoot" => "0xfad4af258fd11939fae0c6c6eec9d340b1caac0b0196fd9a1bc3f489c5bf00b3", + "step" => "0", + "timestamp" => "0x0", + "totalDifficulty" => "0x20000", + "transactions" => [], + "transactionsRoot" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles" => [] + } + } + end end From 536f78cacc03b3814624442ed576731fbe2fd4ac Mon Sep 17 00:00:00 2001 From: Victor Baranov Date: Mon, 1 Jun 2020 21:30:55 +0300 Subject: [PATCH 4/6] Remove orderinng by day in the query --- apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex b/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex index 4cc68b8970..ca90840771 100644 --- a/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex +++ b/apps/explorer/lib/explorer/chain/address/coin_balance_daily.ex @@ -53,7 +53,6 @@ defmodule Explorer.Chain.Address.CoinBalanceDaily do CoinBalanceDaily |> where([cbd], cbd.address_hash == ^address_hash) |> limit_time_interval(days_to_consider) - |> order_by([cbd], cbd.day) |> select([cbd], %{date: cbd.day, value: cbd.value}) end From 83156769f6fbc1b109ce6b8aa867607ec7259d0c Mon Sep 17 00:00:00 2001 From: Victor Baranov Date: Mon, 1 Jun 2020 22:28:18 +0300 Subject: [PATCH 5/6] Balances by day processing: orrering by day --- apps/explorer/lib/explorer/chain.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index ed300bebc7..ea3ace7278 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -3522,6 +3522,7 @@ defmodule Explorer.Chain do address_hash |> CoinBalanceDaily.balances_by_day() |> Repo.all() + |> Enum.sort(&(&1.date <= &2.date)) |> replace_last_value(latest_block_timestamp) |> normalize_balances_by_day() end From 75145508b18e0d1c99cf778e5999fda423701866 Mon Sep 17 00:00:00 2001 From: Victor Baranov Date: Mon, 1 Jun 2020 23:16:17 +0300 Subject: [PATCH 6/6] Refine processing of balance by day --- apps/explorer/lib/explorer/chain.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index ea3ace7278..481ae4a730 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -3522,7 +3522,7 @@ defmodule Explorer.Chain do address_hash |> CoinBalanceDaily.balances_by_day() |> Repo.all() - |> Enum.sort(&(&1.date <= &2.date)) + |> Enum.sort_by(fn %{date: d} -> {d.year, d.month, d.day} end) |> replace_last_value(latest_block_timestamp) |> normalize_balances_by_day() end @@ -3537,7 +3537,6 @@ defmodule Explorer.Chain do defp normalize_balances_by_day(balances_by_day) do result = balances_by_day - |> Enum.map(fn day -> Map.take(day, [:date, :value]) end) |> Enum.filter(fn day -> day.value end) |> Enum.map(fn day -> Map.update!(day, :date, &to_string(&1)) end) |> Enum.map(fn day -> Map.update!(day, :value, &Wei.to(&1, :ether)) end)