Merge pull request #8165 from blockscout/np-add-current-tb-broadcast

Indexer.Fetcher.TokenBalanceOnDemand improvements
pull/8167/head
Victor Baranov 1 year ago committed by GitHub
commit 3ac122e6f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 32
      apps/block_scout_web/lib/block_scout_web/channels/address_channel.ex
  3. 2
      apps/block_scout_web/lib/block_scout_web/endpoint.ex
  4. 11
      apps/block_scout_web/lib/block_scout_web/notifier.ex
  5. 1
      apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex
  6. 2
      apps/explorer/lib/explorer/chain/events/publisher.ex
  7. 2
      apps/explorer/lib/explorer/chain/events/subscriber.ex
  8. 127
      apps/indexer/lib/indexer/fetcher/token_balance_on_demand.ex

@ -4,6 +4,7 @@
### Features
- [#8165](https://github.com/blockscout/blockscout/pull/8165) - Add broadcast of updated address_current_token_balances
- [#7952](https://github.com/blockscout/blockscout/pull/7952) - Add parsing constructor arguments for sourcify contracts
- [#6190](https://github.com/blockscout/blockscout/pull/6190) - Add EIP-1559 support to gas price oracle
- [#7977](https://github.com/blockscout/blockscout/pull/7977) - GraphQL: extend schema with new field for existing objects

@ -28,11 +28,13 @@ defmodule BlockScoutWeb.AddressChannel do
"transaction",
"verification_result",
"token_transfer",
"pending_transaction"
"pending_transaction",
"address_current_token_balances"
])
{:ok, burn_address_hash} = Chain.string_to_address_hash("0x0000000000000000000000000000000000000000")
@burn_address_hash burn_address_hash
@current_token_balances_limit 50
def join("addresses:" <> address_hash, _params, socket) do
{:ok, %{}, assign(socket, :address_hash, address_hash)}
@ -225,6 +227,34 @@ defmodule BlockScoutWeb.AddressChannel do
def handle_out("pending_transaction", data, socket), do: handle_transaction(data, socket, "transaction")
def handle_out(
"address_current_token_balances",
%{address_current_token_balances: address_current_token_balances},
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket
) do
push_current_token_balances(socket, address_current_token_balances, "erc_20", "ERC-20")
push_current_token_balances(socket, address_current_token_balances, "erc_721", "ERC-721")
push_current_token_balances(socket, address_current_token_balances, "erc_1155", "ERC-1155")
{:noreply, socket}
end
def handle_out("address_current_token_balances", _, socket) do
{:noreply, socket}
end
defp push_current_token_balances(socket, address_current_token_balances, event_postfix, token_type) do
filtered_ctbs = address_current_token_balances |> Enum.filter(fn ctb -> ctb.token_type == token_type end)
push(socket, "updated_token_balances_" <> event_postfix, %{
token_balances:
AddressViewAPI.render("token_balances.json", %{
token_balances: Enum.take(filtered_ctbs, @current_token_balances_limit)
}),
overflow: Enum.count(filtered_ctbs) > @current_token_balances_limit
})
end
def push_current_coin_balance(
%Phoenix.Socket{handler: BlockScoutWeb.UserSocketV2} = socket,
block_number,

@ -7,7 +7,7 @@ defmodule BlockScoutWeb.Endpoint do
end
socket("/socket", BlockScoutWeb.UserSocket, websocket: [timeout: 45_000])
socket("/socket/v2", BlockScoutWeb.UserSocketV2, websocket: [timeout: 45_000])
socket("/socket/v2", BlockScoutWeb.UserSocketV2, websocket: [timeout: :infinity])
# Serve at "/" the static files from "priv/static" directory.
#

@ -44,11 +44,6 @@ defmodule BlockScoutWeb.Notifier do
Enum.each(address_token_balances, &broadcast_address_token_balance/1)
end
def handle_event({:chain_event, :address_current_token_balances, type, address_current_token_balances})
when type in [:realtime, :on_demand] do
Enum.each(address_current_token_balances, &broadcast_address_token_balance/1)
end
def handle_event(
{:chain_event, :contract_verification_result, :on_demand, {address_hash, contract_verification_result}}
) do
@ -225,6 +220,12 @@ defmodule BlockScoutWeb.Notifier do
Endpoint.broadcast("addresses:#{to_string(address_hash)}", "smart_contract_was_verified", %{})
end
def handle_event({:chain_event, :address_current_token_balances, :on_demand, address_current_token_balances}) do
Endpoint.broadcast("addresses:#{address_current_token_balances.address_hash}", "address_current_token_balances", %{
address_current_token_balances: address_current_token_balances.address_current_token_balances
})
end
def handle_event(_), do: nil
def fetch_compiler_version(compiler) do

@ -26,6 +26,7 @@ defmodule BlockScoutWeb.RealtimeEventHandler do
Subscriber.to(:transactions, :realtime)
Subscriber.to(:addresses, :on_demand)
Subscriber.to(:address_coin_balances, :on_demand)
Subscriber.to(:address_current_token_balances, :on_demand)
Subscriber.to(:address_token_balances, :on_demand)
Subscriber.to(:contract_verification_result, :on_demand)
Subscriber.to(:token_total_supply, :on_demand)

@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Publisher do
Publishes events related to the Chain context.
"""
@allowed_events ~w(addresses address_coin_balances address_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified)a
@allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified)a
def broadcast(_data, false), do: :ok

@ -3,7 +3,7 @@ defmodule Explorer.Chain.Events.Subscriber do
Subscribes to events related to the Chain context.
"""
@allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified)a
@allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number token_transfers transactions contract_verification_result token_total_supply changed_bytecode smart_contract_was_verified)a
@allowed_broadcast_types ~w(catchup realtime on_demand contract_verification_result)a

@ -7,12 +7,16 @@ defmodule Indexer.Fetcher.TokenBalanceOnDemand do
use Indexer.Fetcher
alias Explorer.Chain
alias Explorer.Chain.Address.CurrentTokenBalance
alias Explorer.Chain.Cache.BlockNumber
alias Explorer.Chain.Events.Publisher
alias Explorer.Chain.Hash
alias Explorer.Counters.AverageBlockTime
alias Explorer.Token.BalanceReader
alias Timex.Duration
require Logger
## Interface
@spec trigger_fetch(Hash.t()) :: :ok
@ -35,7 +39,6 @@ defmodule Indexer.Fetcher.TokenBalanceOnDemand do
Decimal.t() | nil,
non_neg_integer()
) :: {:ok, pid}
def trigger_historic_fetch(address_hash, contract_address_hash, token_type, token_id, block_number) do
Task.start(fn ->
do_trigger_historic_fetch(address_hash, contract_address_hash, token_type, token_id, block_number)
@ -61,51 +64,101 @@ defmodule Indexer.Fetcher.TokenBalanceOnDemand do
end
defp fetch_and_update(block_number, address_hash, stale_current_token_balances) do
current_token_balances_update_params =
%{erc_1155: erc_1155_ctbs, other: other_ctbs, tokens: tokens} =
stale_current_token_balances
|> Enum.map(fn %{token_id: token_id} = stale_current_token_balance ->
stale_current_token_balances_to_fetch = [
%{
token_contract_address_hash:
"0x" <> Base.encode16(stale_current_token_balance.token.contract_address_hash.bytes),
address_hash: "0x" <> Base.encode16(address_hash.bytes),
block_number: block_number,
token_id: token_id && Decimal.to_integer(token_id)
}
]
balance_response =
case stale_current_token_balance.token_type do
"ERC-1155" -> BalanceReader.get_balances_of_erc_1155(stale_current_token_balances_to_fetch)
_ -> BalanceReader.get_balances_of(stale_current_token_balances_to_fetch)
|> Enum.reduce(%{erc_1155: [], other: [], tokens: %{}}, fn %{token_id: token_id} = stale_current_token_balance,
acc ->
prepared_ctb = %{
token_contract_address_hash:
"0x" <> Base.encode16(stale_current_token_balance.token.contract_address_hash.bytes),
address_hash: "0x" <> Base.encode16(address_hash.bytes),
block_number: block_number,
token_id: token_id && Decimal.to_integer(token_id),
token_type: stale_current_token_balance.token_type
}
updated_tokens =
Map.put_new(
acc[:tokens],
stale_current_token_balance.token.contract_address_hash.bytes,
stale_current_token_balance.token
)
result =
if stale_current_token_balance.token_type == "ERC-1155" do
Map.put(acc, :erc_1155, [prepared_ctb | acc[:erc_1155]])
else
Map.put(acc, :other, [prepared_ctb | acc[:other]])
end
updated_balance = balance_response[:ok]
if updated_balance do
%{}
|> Map.put(:address_hash, stale_current_token_balance.address_hash)
|> Map.put(:token_contract_address_hash, stale_current_token_balance.token.contract_address_hash)
|> Map.put(:token_type, stale_current_token_balance.token.type)
|> Map.put(:token_id, token_id)
|> Map.put(:block_number, block_number)
|> Map.put(:value, Decimal.new(updated_balance))
|> Map.put(:value_fetched_at, DateTime.utc_now())
else
nil
end
Map.put(result, :tokens, updated_tokens)
end)
erc_1155_ctbs_reversed = Enum.reverse(erc_1155_ctbs)
other_ctbs_reversed = Enum.reverse(other_ctbs)
updated_erc_1155_ctbs =
if Enum.count(erc_1155_ctbs_reversed) > 0 do
erc_1155_ctbs_reversed
|> BalanceReader.get_balances_of_erc_1155()
|> Enum.zip(erc_1155_ctbs_reversed)
|> Enum.map(&prepare_updated_balance(&1, block_number))
else
[]
end
updated_other_ctbs =
if Enum.count(other_ctbs_reversed) > 0 do
other_ctbs_reversed
|> BalanceReader.get_balances_of()
|> Enum.zip(other_ctbs_reversed)
|> Enum.map(&prepare_updated_balance(&1, block_number))
else
[]
end
filtered_current_token_balances_update_params =
current_token_balances_update_params
(updated_erc_1155_ctbs ++ updated_other_ctbs)
|> Enum.filter(&(!is_nil(&1)))
Chain.import(%{
address_current_token_balances: %{
params: filtered_current_token_balances_update_params
{:ok,
%{
address_current_token_balances: imported_ctbs
}} =
Chain.import(%{
address_current_token_balances: %{
params: filtered_current_token_balances_update_params
},
broadcast: false
})
Publisher.broadcast(
%{
address_current_token_balances: %{
address_hash: to_string(address_hash),
address_current_token_balances:
imported_ctbs
|> Enum.map(fn ctb -> %CurrentTokenBalance{ctb | token: tokens[ctb.token_contract_address_hash.bytes]} end)
}
},
broadcast: :on_demand
})
:on_demand
)
end
defp prepare_updated_balance({{:ok, updated_balance}, stale_current_token_balance}, block_number) do
%{}
|> Map.put(:address_hash, stale_current_token_balance.address_hash)
|> Map.put(:token_contract_address_hash, stale_current_token_balance.token_contract_address_hash)
|> Map.put(:token_type, stale_current_token_balance.token_type)
|> Map.put(:token_id, stale_current_token_balance.token_id)
|> Map.put(:block_number, block_number)
|> Map.put(:value, Decimal.new(updated_balance))
|> Map.put(:value_fetched_at, DateTime.utc_now())
end
defp prepare_updated_balance({{:error, error}, _ctb}, _block_number) do
Logger.warn(fn -> ["Error on updating current token balance: ", inspect(error)] end)
nil
end
defp do_trigger_historic_fetch(address_hash, contract_address_hash, token_type, token_id, block_number) do

Loading…
Cancel
Save