From 9530e367232b19b82e7d89861a02ee7e6076724a Mon Sep 17 00:00:00 2001 From: Victor Baranov Date: Thu, 17 Dec 2020 16:29:54 +0300 Subject: [PATCH] Address token balance on demand fetcher --- CHANGELOG.md | 1 + .../address_token_balance_controller.ex | 3 + .../lib/block_scout_web/notifier.ex | 5 + apps/explorer/lib/explorer/chain.ex | 8 +- .../fetcher/token_balance_on_demand.ex | 130 ++++++++++++++++++ apps/indexer/lib/indexer/supervisor.ex | 2 + 6 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 apps/indexer/lib/indexer/fetcher/token_balance_on_demand.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f882dc123..1c01df9d0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#3462](https://github.com/poanetwork/blockscout/pull/3462) - Display price for bridged tokens ### Fixes +- [#3525](https://github.com/poanetwork/blockscout/pull/3525) - Address token balance on demand fetcher - [#3514](https://github.com/poanetwork/blockscout/pull/3514) - Read contract: fix internal server error - [#3513](https://github.com/poanetwork/blockscout/pull/3513) - Fix input data processing for method call (array type of data) - [#3509](https://github.com/poanetwork/blockscout/pull/3509) - Fix QR code tooltip appearance in mobile view diff --git a/apps/block_scout_web/lib/block_scout_web/controllers/address_token_balance_controller.ex b/apps/block_scout_web/lib/block_scout_web/controllers/address_token_balance_controller.ex index c59760f682..758a569217 100644 --- a/apps/block_scout_web/lib/block_scout_web/controllers/address_token_balance_controller.ex +++ b/apps/block_scout_web/lib/block_scout_web/controllers/address_token_balance_controller.ex @@ -4,6 +4,7 @@ defmodule BlockScoutWeb.AddressTokenBalanceController do import BlockScoutWeb.AddressView, only: [from_address_hash: 1] alias BlockScoutWeb.{AccessHelpers, CustomContractsHelpers} alias Explorer.{Chain, Market} + alias Indexer.Fetcher.TokenBalanceOnDemand def index(conn, %{"address_id" => address_hash_string} = params) do with true <- ajax?(conn), @@ -13,6 +14,8 @@ defmodule BlockScoutWeb.AddressTokenBalanceController do |> Chain.fetch_last_token_balances() |> Market.add_price() + TokenBalanceOnDemand.trigger_fetch(address_hash, token_balances) + circles_addresses_list = CustomContractsHelpers.get_custom_addresses_list(:circles_addresses) circles_total_balance = diff --git a/apps/block_scout_web/lib/block_scout_web/notifier.ex b/apps/block_scout_web/lib/block_scout_web/notifier.ex index d492612e53..2448e333c3 100644 --- a/apps/block_scout_web/lib/block_scout_web/notifier.ex +++ b/apps/block_scout_web/lib/block_scout_web/notifier.ex @@ -32,6 +32,11 @@ defmodule BlockScoutWeb.Notifier do Enum.each(address_token_balances, &broadcast_address_token_balance/1) end + def handle_event({:chain_event, :address_current_token_balances, type, address_current_token_balances}) + when type in [:realtime, :on_demand] do + Enum.each(address_current_token_balances, &broadcast_address_token_balance/1) + end + def handle_event( {:chain_event, :contract_verification_result, :on_demand, {address_hash, contract_verification_result, conn}} ) do diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index ce03b6a745..fed7209f52 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -1954,7 +1954,13 @@ defmodule Explorer.Chain do end def check_if_tokens_at_address(address_hash) do - Repo.exists?(from(tb in CurrentTokenBalance, where: tb.address_hash == ^address_hash)) + Repo.exists?( + from( + tb in CurrentTokenBalance, + where: tb.address_hash == ^address_hash, + where: tb.value > 0 + ) + ) end @doc """ diff --git a/apps/indexer/lib/indexer/fetcher/token_balance_on_demand.ex b/apps/indexer/lib/indexer/fetcher/token_balance_on_demand.ex new file mode 100644 index 0000000000..e3cc94d9a0 --- /dev/null +++ b/apps/indexer/lib/indexer/fetcher/token_balance_on_demand.ex @@ -0,0 +1,130 @@ +defmodule Indexer.Fetcher.TokenBalanceOnDemand do + @moduledoc """ + Ensures that we have a reasonably up to date address tokens balance. + + """ + + @latest_balance_stale_threshold :timer.hours(24) + + use GenServer + use Indexer.Fetcher + + alias Explorer.Chain + alias Explorer.Chain.Address.CurrentTokenBalance + alias Explorer.Chain.Cache.BlockNumber + alias Explorer.Counters.AverageBlockTime + alias Explorer.Token.BalanceReader + alias Timex.Duration + + ## Interface + + @spec trigger_fetch(Hash.t(), [CurrentTokenBalance.t()]) :: :ok + def trigger_fetch(address_hash, current_token_balances) do + latest_block_number = latest_block_number() + + case stale_balance_window(latest_block_number) do + {:error, _} -> + :current + + stale_balance_window -> + do_trigger_fetch(address_hash, current_token_balances, latest_block_number, stale_balance_window) + end + end + + ## Callbacks + + def child_spec([json_rpc_named_arguments, server_opts]) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [json_rpc_named_arguments, server_opts]}, + type: :worker + } + end + + def start_link(json_rpc_named_arguments, server_opts) do + GenServer.start_link(__MODULE__, json_rpc_named_arguments, server_opts) + end + + def init(json_rpc_named_arguments) do + {:ok, %{json_rpc_named_arguments: json_rpc_named_arguments}} + end + + def handle_cast({:fetch_and_update, block_number, address_hash, current_token_balances}, state) do + fetch_and_update(block_number, address_hash, current_token_balances, state.json_rpc_named_arguments) + + {:noreply, state} + end + + ## Implementation + + defp do_trigger_fetch(address_hash, current_token_balances, latest_block_number, stale_balance_window) + when not is_nil(address_hash) do + stale_current_token_balances = + current_token_balances + |> Enum.filter(fn current_token_balance -> current_token_balance.block_number < stale_balance_window end) + + if Enum.count(stale_current_token_balances) > 0 do + GenServer.cast(__MODULE__, {:fetch_and_update, latest_block_number, address_hash, stale_current_token_balances}) + + {:stale, latest_block_number} + else + :current + end + + :ok + end + + defp fetch_and_update(block_number, address_hash, stale_current_token_balances, _json_rpc_named_arguments) do + current_token_balances_update_params = + stale_current_token_balances + |> Enum.map(fn stale_current_token_balance -> + stale_current_token_balances_to_fetch = [ + %{ + token_contract_address_hash: + "0x" <> Base.encode16(stale_current_token_balance.token_contract_address_hash.bytes), + address_hash: "0x" <> Base.encode16(address_hash.bytes), + block_number: block_number + } + ] + + updated_balance = BalanceReader.get_balances_of(stale_current_token_balances_to_fetch)[:ok] + + %{} + |> Map.put(:address_hash, stale_current_token_balance.address_hash) + |> Map.put(:token_contract_address_hash, stale_current_token_balance.token_contract_address_hash) + |> Map.put(:block_number, block_number) + |> Map.put(:value, Decimal.new(updated_balance)) + |> Map.put(:value_fetched_at, DateTime.utc_now()) + end) + + Chain.import(%{ + address_current_token_balances: %{ + params: current_token_balances_update_params + }, + broadcast: :on_demand + }) + end + + defp latest_block_number do + BlockNumber.get_max() + end + + defp stale_balance_window(block_number) do + case AverageBlockTime.average_block_time() do + {:error, :disabled} -> + {:error, :no_average_block_time} + + duration -> + average_block_time = + duration + |> Duration.to_milliseconds() + |> round() + + if average_block_time == 0 do + {:error, :empty_database} + else + block_number - div(@latest_balance_stale_threshold, average_block_time) + end + end + end +end diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index a55c17e945..cea90d8b47 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -18,6 +18,7 @@ defmodule Indexer.Supervisor do ReplacedTransaction, Token, TokenBalance, + TokenBalanceOnDemand, TokenInstance, TokenTotalSupplyOnDemand, TokenUpdater, @@ -117,6 +118,7 @@ defmodule Indexer.Supervisor do # Out-of-band fetchers {CoinBalanceOnDemand.Supervisor, [json_rpc_named_arguments]}, {TokenTotalSupplyOnDemand.Supervisor, [json_rpc_named_arguments]}, + {TokenBalanceOnDemand.Supervisor, [json_rpc_named_arguments]}, # Temporary workers {UncatalogedTokenTransfers.Supervisor, [[]]},