Add historic balances to on demand fetchers

pull/7422/head
Maxim Filonov 2 years ago
parent be7f14cbb5
commit db9b564f37
  1. 20
      apps/block_scout_web/lib/block_scout_web/models/transaction_state_helper.ex
  2. 22
      apps/block_scout_web/test/block_scout_web/controllers/transaction_state_controller_test.exs
  3. 3
      apps/explorer/lib/explorer/application.ex
  4. 3
      apps/indexer/lib/indexer/fetcher.ex
  5. 13
      apps/indexer/lib/indexer/fetcher/coin_balance_on_demand.ex
  6. 51
      apps/indexer/lib/indexer/fetcher/token_balance_on_demand.ex
  7. 52
      apps/indexer/test/indexer/fetcher/coin_balance_on_demand_test.exs

@ -8,7 +8,7 @@ defmodule BlockScoutWeb.Models.TransactionStateHelper do
alias Explorer.{Chain, PagingOptions} alias Explorer.{Chain, PagingOptions}
alias Explorer.Chain.{Block, Transaction, Wei} alias Explorer.Chain.{Block, Transaction, Wei}
alias Explorer.Chain.Cache.StateChanges alias Explorer.Chain.Cache.StateChanges
alias Indexer.Fetcher.{CoinBalance, TokenBalance} alias Indexer.Fetcher.{CoinBalanceOnDemand, TokenBalanceOnDemand}
{:ok, burn_address_hash} = Chain.string_to_address_hash("0x0000000000000000000000000000000000000000") {:ok, burn_address_hash} = Chain.string_to_address_hash("0x0000000000000000000000000000000000000000")
@burn_address_hash burn_address_hash @burn_address_hash burn_address_hash
@ -100,7 +100,7 @@ defmodule BlockScoutWeb.Models.TransactionStateHelper do
val val
_ -> _ ->
CoinBalance.async_fetch_balances([%{address_hash: address_hash, block_number: block_number}]) CoinBalanceOnDemand.trigger_historic_fetch(address_hash, block_number)
%Wei{value: Decimal.new(0)} %Wei{value: Decimal.new(0)}
end end
end end
@ -128,15 +128,13 @@ defmodule BlockScoutWeb.Models.TransactionStateHelper do
val val
_ -> _ ->
TokenBalance.async_fetch([ TokenBalanceOnDemand.trigger_historic_fetch(
%{ address_hash,
token_contract_address_hash: token.contract_address_hash, token.contract_address_hash,
address_hash: address_hash, token.type,
block_number: block_number, token_id,
token_type: token.type, block_number
token_id: token_id )
}
])
Decimal.new(0) Decimal.new(0)
end end

@ -8,9 +8,31 @@ defmodule BlockScoutWeb.TransactionStateControllerTest do
import EthereumJSONRPC, only: [integer_to_quantity: 1] import EthereumJSONRPC, only: [integer_to_quantity: 1]
alias Explorer.Chain.Wei alias Explorer.Chain.Wei
alias Indexer.Fetcher.CoinBalance alias Indexer.Fetcher.CoinBalance
alias Explorer.Counters.{AddressesCounter, AverageBlockTime}
alias Indexer.Fetcher.CoinBalanceOnDemand
setup :set_mox_global setup :set_mox_global
setup :verify_on_exit!
setup do
mocked_json_rpc_named_arguments = [
transport: EthereumJSONRPC.Mox,
transport_options: []
]
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
start_supervised!(AverageBlockTime)
start_supervised!(AddressesCounter)
start_supervised!({CoinBalanceOnDemand, [mocked_json_rpc_named_arguments, [name: CoinBalanceOnDemand]]})
Application.put_env(:explorer, AverageBlockTime, enabled: true, cache_period: 1_800_000)
on_exit(fn ->
Application.put_env(:explorer, AverageBlockTime, enabled: false, cache_period: 1_800_000)
end)
end
describe "GET index/3" do describe "GET index/3" do
test "loads existing transaction", %{conn: conn} do test "loads existing transaction", %{conn: conn} do
transaction = insert(:transaction) transaction = insert(:transaction)

@ -68,10 +68,7 @@ defmodule Explorer.Application do
NetVersion, NetVersion,
PendingBlockOperation, PendingBlockOperation,
Transaction, Transaction,
BlockNumber,
StateChanges, StateChanges,
con_cache_child_spec(MarketHistoryCache.cache_name()),
con_cache_child_spec(RSK.cache_name(), ttl_check_interval: :timer.minutes(1), global_ttl: :timer.minutes(30)),
Transactions, Transactions,
TransactionsApiV2, TransactionsApiV2,
Uncles, Uncles,

@ -52,8 +52,7 @@ defmodule Indexer.Fetcher do
end end
def disabled? do def disabled? do
Application.get_env(:indexer, Indexer.Supervisor, [])[:enabled] == false or Application.get_env(:indexer, __MODULE__, [])[:disabled?] == true
Application.get_env(:indexer, __MODULE__, [])[:disabled?] == true
end end
@impl Supervisor @impl Supervisor

@ -15,7 +15,7 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do
alias EthereumJSONRPC.FetchedBalances alias EthereumJSONRPC.FetchedBalances
alias Explorer.{Chain, Repo} alias Explorer.{Chain, Repo}
alias Explorer.Chain.Address alias Explorer.Chain.{Address, Hash}
alias Explorer.Chain.Address.{CoinBalance, CoinBalanceDaily} alias Explorer.Chain.Address.{CoinBalance, CoinBalanceDaily}
alias Explorer.Chain.Cache.{Accounts, BlockNumber} alias Explorer.Chain.Cache.{Accounts, BlockNumber}
alias Explorer.Counters.AverageBlockTime alias Explorer.Counters.AverageBlockTime
@ -50,6 +50,11 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do
end end
end end
@spec trigger_historic_fetch(Hash.Address.t(), non_neg_integer()) :: balance_status
def trigger_historic_fetch(address_hash, block_number) do
do_trigger_historic_fetch(address_hash, block_number)
end
## Callbacks ## Callbacks
def child_spec([json_rpc_named_arguments, server_opts]) do def child_spec([json_rpc_named_arguments, server_opts]) do
@ -134,6 +139,12 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemand do
do_trigger_balance_fetch_query(address, latest_block_number, stale_balance_window, latest, latest_by_day) do_trigger_balance_fetch_query(address, latest_block_number, stale_balance_window, latest, latest_by_day)
end end
defp do_trigger_historic_fetch(address_hash, block_number) do
GenServer.cast(__MODULE__, {:fetch_and_import, block_number, %{hash: address_hash}})
{:stale, 0}
end
defp do_trigger_balance_fetch_query( defp do_trigger_balance_fetch_query(
address, address,
latest_block_number, latest_block_number,

@ -29,6 +29,20 @@ defmodule Indexer.Fetcher.TokenBalanceOnDemand do
end end
end end
@spec trigger_historic_fetch(
Hash.t(),
Hash.t(),
String.t(),
Decimal.t() | nil,
non_neg_integer()
) :: {:ok, pid}
def trigger_historic_fetch(address_hash, contract_address_hash, token_type, token_id, block_number) do
Task.start(fn ->
do_trigger_historic_fetch(address_hash, contract_address_hash, token_type, token_id, block_number)
end)
end
## Implementation ## Implementation
defp do_trigger_fetch(address_hash, current_token_balances, latest_block_number, stale_balance_window) defp do_trigger_fetch(address_hash, current_token_balances, latest_block_number, stale_balance_window)
@ -93,6 +107,43 @@ defmodule Indexer.Fetcher.TokenBalanceOnDemand do
}) })
end end
defp do_trigger_historic_fetch(address_hash, contract_address_hash, token_type, token_id, block_number) do
request = %{
token_contract_address_hash: to_string(contract_address_hash),
address_hash: to_string(address_hash),
block_number: block_number,
token_id: token_id && Decimal.to_integer(token_id)
}
balance_response =
case token_type do
"ERC-1155" -> BalanceReader.get_balances_of_erc_1155([request])
_ -> BalanceReader.get_balances_of([request])
end
balance = balance_response[:ok]
if balance do
%{
address_token_balances: %{
params: [
%{
address_hash: address_hash,
token_contract_address_hash: contract_address_hash,
token_type: token_type,
token_id: token_id,
block_number: block_number,
value: Decimal.new(balance),
value_fetched_at: DateTime.utc_now()
}
]
},
broadcast: :on_demand
}
|> Chain.import()
end
end
defp latest_block_number do defp latest_block_number do
BlockNumber.get_max() BlockNumber.get_max()
end end

@ -5,7 +5,9 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemandTest do
use Explorer.DataCase use Explorer.DataCase
import Mox import Mox
import EthereumJSONRPC, only: [integer_to_quantity: 1]
alias Explorer.Chain
alias Explorer.Chain.Events.Subscriber alias Explorer.Chain.Events.Subscriber
alias Explorer.Chain.Wei alias Explorer.Chain.Wei
alias Explorer.Counters.AverageBlockTime alias Explorer.Counters.AverageBlockTime
@ -82,6 +84,52 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemandTest do
end end
end end
describe "trigger_historic_fetch/2" do
test "fetches and imports balance for any block" do
address = insert(:address)
block = insert(:block)
insert(:block)
string_address_hash = to_string(address.hash)
block_number = block.number
string_block_number = integer_to_quantity(block_number)
balance = 42
assert nil == Chain.get_coin_balance(address.hash, block_number)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: id,
method: "eth_getBalance",
params: [^string_address_hash, ^string_block_number]
}
],
_options ->
{:ok, [%{id: id, jsonrpc: "2.0", result: integer_to_quantity(balance)}]}
end)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: id,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^string_block_number, true]
}
],
_ ->
{:ok, [eth_block_number_fake_response(string_block_number, id)]}
end)
{:ok, expected_wei} = Wei.cast(balance)
CoinBalanceOnDemand.trigger_historic_fetch(address.hash, block_number)
:timer.sleep(1000)
assert %{value: expected_wei} = Chain.get_coin_balance(address.hash, block_number)
end
end
describe "update behaviour" do describe "update behaviour" do
setup do setup do
Subscriber.to(:addresses, :on_demand) Subscriber.to(:addresses, :on_demand)
@ -184,9 +232,9 @@ defmodule Indexer.Fetcher.CoinBalanceOnDemandTest do
end end
end end
defp eth_block_number_fake_response(block_quantity) do defp eth_block_number_fake_response(block_quantity, id \\ 0) do
%{ %{
id: 0, id: id,
jsonrpc: "2.0", jsonrpc: "2.0",
result: %{ result: %{
"author" => "0x0000000000000000000000000000000000000000", "author" => "0x0000000000000000000000000000000000000000",

Loading…
Cancel
Save