Merge pull request #8512 from blockscout/np-cut-heavy-counter

Add caching and improve `/tabs-counters` performance
pull/8594/head
Victor Baranov 1 year ago committed by GitHub
commit 73ad933435
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 7
      apps/block_scout_web/lib/block_scout_web/controllers/api/v2/address_controller.ex
  3. 233
      apps/block_scout_web/test/block_scout_web/controllers/api/v2/address_controller_test.exs
  4. 2
      apps/explorer/lib/explorer/application.ex
  5. 379
      apps/explorer/lib/explorer/chain/address/counters.ex
  6. 135
      apps/explorer/lib/explorer/chain/cache/addresses_tabs_counters.ex
  7. 4
      apps/explorer/lib/explorer/counters/helper.ex
  8. 3
      config/runtime.exs
  9. 1
      docker-compose/envs/common-blockscout.env

@ -4,6 +4,7 @@
### Features ### Features
- [#8512](https://github.com/blockscout/blockscout/pull/8512) - Add caching and improve `/tabs-counters` performance
- [#8472](https://github.com/blockscout/blockscout/pull/8472) - Integrate `/api/v2/bytecodes/sources:search-all` of `eth_bytecode_db` - [#8472](https://github.com/blockscout/blockscout/pull/8472) - Integrate `/api/v2/bytecodes/sources:search-all` of `eth_bytecode_db`
- [#8589](https://github.com/blockscout/blockscout/pull/8589) - DefiLlama TVL source - [#8589](https://github.com/blockscout/blockscout/pull/8589) - DefiLlama TVL source
- [#8544](https://github.com/blockscout/blockscout/pull/8544) - Fix `nil` `"structLogs"` - [#8544](https://github.com/blockscout/blockscout/pull/8544) - Fix `nil` `"structLogs"`

@ -414,8 +414,8 @@ defmodule BlockScoutWeb.API.V2.AddressController do
with {:format, {:ok, address_hash}} <- {:format, Chain.string_to_address_hash(address_hash_string)}, with {:format, {:ok, address_hash}} <- {:format, Chain.string_to_address_hash(address_hash_string)},
{:ok, false} <- AccessHelper.restricted_access?(address_hash_string, params), {:ok, false} <- AccessHelper.restricted_access?(address_hash_string, params),
{:not_found, {:ok, _address}} <- {:not_found, Chain.hash_to_address(address_hash, @api_true, false)} do {:not_found, {:ok, _address}} <- {:not_found, Chain.hash_to_address(address_hash, @api_true, false)} do
{validations, transactions, token_transfers, token_balances, logs, withdrawals, internal_txs, coin_balances} = {validations, transactions, token_transfers, token_balances, logs, withdrawals, internal_txs} =
Counters.address_limited_counters(address_hash_string, @api_true) Counters.address_limited_counters(address_hash, @api_true)
conn conn
|> put_status(200) |> put_status(200)
@ -426,8 +426,7 @@ defmodule BlockScoutWeb.API.V2.AddressController do
token_balances_count: token_balances, token_balances_count: token_balances,
logs_count: logs, logs_count: logs,
withdrawals_count: withdrawals, withdrawals_count: withdrawals,
internal_txs_count: internal_txs, internal_txs_count: internal_txs
coin_balances_count: coin_balances
}) })
end end
end end

@ -1670,6 +1670,239 @@ defmodule BlockScoutWeb.API.V2.AddressControllerTest do
end end
end end
describe "/addresses/{address_hash}/tabs-counters" do
test "get 404 on non existing address", %{conn: conn} do
address = build(:address)
request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters")
assert %{"message" => "Not found"} = json_response(request, 404)
end
test "get 422 on invalid address", %{conn: conn} do
request = get(conn, "/api/v2/addresses/0x/tabs-counters")
assert %{"message" => "Invalid parameter(s)"} = json_response(request, 422)
end
test "get counters with 0s", %{conn: conn} do
address = insert(:address)
request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters")
assert %{
"validations_count" => 0,
"transactions_count" => 0,
"token_transfers_count" => 0,
"token_balances_count" => 0,
"logs_count" => 0,
"withdrawals_count" => 0,
"internal_txs_count" => 0
} = json_response(request, 200)
end
test "get counters and check that cache works", %{conn: conn} do
address = insert(:address, withdrawals: insert_list(60, :withdrawal))
insert(:transaction, from_address: address) |> with_block()
insert(:transaction, to_address: address) |> with_block()
another_tx = insert(:transaction) |> with_block()
insert(:token_transfer,
from_address: address,
transaction: another_tx,
block: another_tx.block,
block_number: another_tx.block_number
)
insert(:token_transfer,
to_address: address,
transaction: another_tx,
block: another_tx.block,
block_number: another_tx.block_number
)
insert(:block, miner: address)
tx =
:transaction
|> insert()
|> with_block()
for x <- 1..2 do
insert(:internal_transaction,
transaction: tx,
index: x,
block_number: tx.block_number,
transaction_index: tx.index,
block_hash: tx.block_hash,
block_index: x,
from_address: address
)
end
for _ <- 0..60 do
insert(:address_current_token_balance_with_token_id, address: address)
end
for x <- 0..60 do
tx =
:transaction
|> insert()
|> with_block()
insert(:log,
transaction: tx,
index: x,
block: tx.block,
block_number: tx.block_number,
address: address
)
end
request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters")
assert %{
"validations_count" => 1,
"transactions_count" => 2,
"token_transfers_count" => 2,
"token_balances_count" => 51,
"logs_count" => 51,
"withdrawals_count" => 51,
"internal_txs_count" => 2
} = json_response(request, 200)
for x <- 3..4 do
insert(:internal_transaction,
transaction: tx,
index: x,
block_number: tx.block_number,
transaction_index: tx.index,
block_hash: tx.block_hash,
block_index: x,
from_address: address
)
end
request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters")
assert %{
"validations_count" => 1,
"transactions_count" => 2,
"token_transfers_count" => 2,
"token_balances_count" => 51,
"logs_count" => 51,
"withdrawals_count" => 51,
"internal_txs_count" => 2
} = json_response(request, 200)
end
test "check counters cache ttl", %{conn: conn} do
address = insert(:address, withdrawals: insert_list(60, :withdrawal))
insert(:transaction, from_address: address) |> with_block()
insert(:transaction, to_address: address) |> with_block()
another_tx = insert(:transaction) |> with_block()
insert(:token_transfer,
from_address: address,
transaction: another_tx,
block: another_tx.block,
block_number: another_tx.block_number
)
insert(:token_transfer,
to_address: address,
transaction: another_tx,
block: another_tx.block,
block_number: another_tx.block_number
)
insert(:block, miner: address)
tx =
:transaction
|> insert()
|> with_block()
for x <- 1..2 do
insert(:internal_transaction,
transaction: tx,
index: x,
block_number: tx.block_number,
transaction_index: tx.index,
block_hash: tx.block_hash,
block_index: x,
from_address: address
)
end
for _ <- 0..60 do
insert(:address_current_token_balance_with_token_id, address: address)
end
for x <- 0..60 do
tx =
:transaction
|> insert()
|> with_block()
insert(:log,
transaction: tx,
index: x,
block: tx.block,
block_number: tx.block_number,
address: address
)
end
request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters")
assert %{
"validations_count" => 1,
"transactions_count" => 2,
"token_transfers_count" => 2,
"token_balances_count" => 51,
"logs_count" => 51,
"withdrawals_count" => 51,
"internal_txs_count" => 2
} = json_response(request, 200)
old_env = Application.get_env(:explorer, Explorer.Chain.Cache.AddressesTabsCounters)
Application.put_env(:explorer, Explorer.Chain.Cache.AddressesTabsCounters, ttl: 200)
:timer.sleep(200)
for x <- 3..4 do
insert(:internal_transaction,
transaction: tx,
index: x,
block_number: tx.block_number,
transaction_index: tx.index,
block_hash: tx.block_hash,
block_index: x,
from_address: address
)
end
insert(:transaction, from_address: address) |> with_block()
insert(:transaction, to_address: address) |> with_block()
request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters")
assert %{
"validations_count" => 1,
"transactions_count" => 4,
"token_transfers_count" => 2,
"token_balances_count" => 51,
"logs_count" => 51,
"withdrawals_count" => 51,
"internal_txs_count" => 4
} = json_response(request, 200)
Application.put_env(:explorer, Explorer.Chain.Cache.AddressesTabsCounters, old_env)
end
end
defp compare_item(%Address{} = address, json) do defp compare_item(%Address{} = address, json) do
assert Address.checksum(address.hash) == json["hash"] assert Address.checksum(address.hash) == json["hash"]
assert to_string(address.nonce + 1) == json["tx_count"] assert to_string(address.nonce + 1) == json["tx_count"]

@ -9,6 +9,7 @@ defmodule Explorer.Application do
alias Explorer.Chain.Cache.{ alias Explorer.Chain.Cache.{
Accounts, Accounts,
AddressesTabsCounters,
AddressSum, AddressSum,
AddressSumMinusBurnt, AddressSumMinusBurnt,
Block, Block,
@ -77,6 +78,7 @@ defmodule Explorer.Application do
Transactions, Transactions,
TransactionsApiV2, TransactionsApiV2,
Uncles, Uncles,
AddressesTabsCounters,
con_cache_child_spec(MarketHistoryCache.cache_name()), con_cache_child_spec(MarketHistoryCache.cache_name()),
con_cache_child_spec(RSK.cache_name(), ttl_check_interval: :timer.minutes(1), global_ttl: :timer.minutes(30)), con_cache_child_spec(RSK.cache_name(), ttl_check_interval: :timer.minutes(1), global_ttl: :timer.minutes(30)),
{Redix, redix_opts()}, {Redix, redix_opts()},

@ -2,7 +2,7 @@ defmodule Explorer.Chain.Address.Counters do
@moduledoc """ @moduledoc """
Functions related to Explorer.Chain.Address counters Functions related to Explorer.Chain.Address counters
""" """
import Ecto.Query, only: [from: 2, limit: 2, select: 3, subquery: 1, union: 2, where: 3] import Ecto.Query, only: [from: 2, limit: 2, select: 3, union: 2, where: 3]
import Explorer.Chain, import Explorer.Chain,
only: [select_repo: 1, wrapped_union_subquery: 1] only: [select_repo: 1, wrapped_union_subquery: 1]
@ -19,7 +19,6 @@ defmodule Explorer.Chain.Address.Counters do
alias Explorer.Chain.{ alias Explorer.Chain.{
Address, Address,
Address.CoinBalance,
Address.CurrentTokenBalance, Address.CurrentTokenBalance,
Block, Block,
Hash, Hash,
@ -30,10 +29,17 @@ defmodule Explorer.Chain.Address.Counters do
Withdrawal Withdrawal
} }
alias Explorer.Chain.Cache.AddressesTabsCounters
alias Explorer.Chain.Cache.Helper, as: CacheHelper alias Explorer.Chain.Cache.Helper, as: CacheHelper
require Logger require Logger
@typep counter :: non_neg_integer() | nil
@counters_limit 51
@types [:validations, :txs, :token_transfers, :token_balances, :logs, :withdrawals, :internal_txs]
@txs_types [:txs_from, :txs_to, :txs_contract]
defp address_hash_to_logs_query(address_hash) do defp address_hash_to_logs_query(address_hash) do
from(l in Log, where: l.address_hash == ^address_hash) from(l in Log, where: l.address_hash == ^address_hash)
end end
@ -50,22 +56,6 @@ defmodule Explorer.Chain.Address.Counters do
select_repo(options).exists?(address_hash_to_logs_query(address_hash)) select_repo(options).exists?(address_hash_to_logs_query(address_hash))
end end
defp address_hash_to_coin_balances(address_hash) do
query =
from(
cb in CoinBalance,
where: cb.address_hash == ^address_hash,
where: not is_nil(cb.value),
select_merge: %{
delta: fragment("? - coalesce(lead(?, 1) over (order by ? desc), 0)", cb.value, cb.value, cb.block_number)
}
)
from(balance in subquery(query),
where: balance.delta != 0
)
end
def check_if_token_transfers_at_address(address_hash, options \\ []) do def check_if_token_transfers_at_address(address_hash, options \\ []) do
select_repo(options).exists?(from(tt in TokenTransfer, where: tt.from_address_hash == ^address_hash)) || select_repo(options).exists?(from(tt in TokenTransfer, where: tt.from_address_hash == ^address_hash)) ||
select_repo(options).exists?(from(tt in TokenTransfer, where: tt.to_address_hash == ^address_hash)) select_repo(options).exists?(from(tt in TokenTransfer, where: tt.to_address_hash == ^address_hash))
@ -255,6 +245,39 @@ defmodule Explorer.Chain.Address.Counters do
end end
end end
defp address_hash_to_internal_txs_limited_count_query(address_hash) do
query_to_address_hash_wrapped =
InternalTransaction
|> InternalTransaction.where_nonpending_block()
|> InternalTransaction.where_address_fields_match(address_hash, :to_address_hash)
|> InternalTransaction.where_is_different_from_parent_transaction()
|> limit(@counters_limit)
|> wrapped_union_subquery()
query_from_address_hash_wrapped =
InternalTransaction
|> InternalTransaction.where_nonpending_block()
|> InternalTransaction.where_address_fields_match(address_hash, :from_address_hash)
|> InternalTransaction.where_is_different_from_parent_transaction()
|> limit(@counters_limit)
|> wrapped_union_subquery()
query_created_contract_address_hash_wrapped =
InternalTransaction
|> InternalTransaction.where_nonpending_block()
|> InternalTransaction.where_address_fields_match(address_hash, :created_contract_address_hash)
|> InternalTransaction.where_is_different_from_parent_transaction()
|> limit(@counters_limit)
|> wrapped_union_subquery()
query_to_address_hash_wrapped
|> union(^query_from_address_hash_wrapped)
|> union(^query_created_contract_address_hash_wrapped)
|> wrapped_union_subquery()
|> InternalTransaction.where_is_different_from_parent_transaction()
|> limit(@counters_limit)
end
def address_counters(address, options \\ []) do def address_counters(address, options \\ []) do
validation_count_task = validation_count_task =
Task.async(fn -> Task.async(fn ->
@ -304,28 +327,33 @@ defmodule Explorer.Chain.Address.Counters do
AddressTransactionsGasUsageCounter.fetch(address) AddressTransactionsGasUsageCounter.fetch(address)
end end
@counters_limit 51 @spec address_limited_counters(Hash.t(), Keyword.t()) ::
{counter(), counter(), counter(), counter(), counter(), counter(), counter()}
def address_limited_counters(address_hash, options) do def address_limited_counters(address_hash, options) do
start = Time.utc_now() cached_counters =
Enum.reduce(@types, %{}, fn type, acc ->
case AddressesTabsCounters.get_counter(type, address_hash) do
{_datetime, counter, status} ->
Map.put(acc, type, {status, counter})
validations_count_task = _ ->
Task.async(fn -> acc
result = end
address_hash end)
|> address_hash_to_validated_blocks_query()
|> limit(@counters_limit)
|> select_repo(options).aggregate(:count)
Logger.info( start = System.monotonic_time()
"Time consumed for validations_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms"
)
result validations_count_task =
end) configure_task(
:validations,
cached_counters,
address_hash_to_validated_blocks_query(address_hash),
address_hash,
options
)
transactions_from_count_task = transactions_from_count_task =
Task.async(fn -> run_or_ignore(cached_counters[:txs], :txs_from, address_hash, fn ->
result = result =
Transaction Transaction
|> where([t], t.from_address_hash == ^address_hash) |> where([t], t.from_address_hash == ^address_hash)
@ -334,15 +362,19 @@ defmodule Explorer.Chain.Address.Counters do
|> limit(@counters_limit) |> limit(@counters_limit)
|> select_repo(options).all() |> select_repo(options).all()
Logger.info( stop = System.monotonic_time()
"Time consumed for transactions_from_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" diff = System.convert_time_unit(stop - start, :native, :millisecond)
)
result Logger.info("Time consumed for transactions_from_count_task for #{address_hash} is #{diff}ms")
AddressesTabsCounters.save_txs_counter_progress(address_hash, %{txs_types: [:txs_from], txs_from: result})
AddressesTabsCounters.drop_task(:txs_from, address_hash)
{:txs_from, result}
end) end)
transactions_to_count_task = transactions_to_count_task =
Task.async(fn -> run_or_ignore(cached_counters[:txs], :txs_to, address_hash, fn ->
result = result =
Transaction Transaction
|> where([t], t.to_address_hash == ^address_hash) |> where([t], t.to_address_hash == ^address_hash)
@ -351,15 +383,19 @@ defmodule Explorer.Chain.Address.Counters do
|> limit(@counters_limit) |> limit(@counters_limit)
|> select_repo(options).all() |> select_repo(options).all()
Logger.info( stop = System.monotonic_time()
"Time consumed for transactions_to_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" diff = System.convert_time_unit(stop - start, :native, :millisecond)
)
result Logger.info("Time consumed for transactions_to_count_task for #{address_hash} is #{diff}ms")
AddressesTabsCounters.save_txs_counter_progress(address_hash, %{txs_types: [:txs_to], txs_to: result})
AddressesTabsCounters.drop_task(:txs_to, address_hash)
{:txs_to, result}
end) end)
transactions_created_contract_count_task = transactions_created_contract_count_task =
Task.async(fn -> run_or_ignore(cached_counters[:txs], :txs_contract, address_hash, fn ->
result = result =
Transaction Transaction
|> where([t], t.created_contract_address_hash == ^address_hash) |> where([t], t.created_contract_address_hash == ^address_hash)
@ -368,176 +404,181 @@ defmodule Explorer.Chain.Address.Counters do
|> limit(@counters_limit) |> limit(@counters_limit)
|> select_repo(options).all() |> select_repo(options).all()
Logger.info( stop = System.monotonic_time()
"Time consumed for transactions_created_contract_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" diff = System.convert_time_unit(stop - start, :native, :millisecond)
)
result Logger.info("Time consumed for transactions_created_contract_count_task for #{address_hash} is #{diff}ms")
end)
token_transfer_count_task = AddressesTabsCounters.save_txs_counter_progress(address_hash, %{
Task.async(fn -> txs_types: [:txs_contract],
result = txs_contract: result
address_hash })
|> address_to_token_transfer_count_query()
|> limit(@counters_limit)
|> select_repo(options).aggregate(:count)
Logger.info( AddressesTabsCounters.drop_task(:txs_contract, address_hash)
"Time consumed for token_transfer_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms"
)
result {:txs_contract, result}
end) end)
token_balances_count_task = token_transfers_count_task =
Task.async(fn -> configure_task(
result = :token_transfers,
address_hash cached_counters,
|> address_hash_to_token_balances_query() address_to_token_transfer_count_query(address_hash),
|> limit(@counters_limit) address_hash,
|> select_repo(options).aggregate(:count) options
Logger.info(
"Time consumed for token_balances_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms"
) )
result token_balances_count_task =
end) configure_task(
:token_balances,
cached_counters,
address_hash_to_token_balances_query(address_hash),
address_hash,
options
)
logs_count_task = logs_count_task =
Task.async(fn -> configure_task(
result = :logs,
address_hash cached_counters,
|> address_hash_to_logs_query() address_hash_to_logs_query(address_hash),
|> limit(@counters_limit) address_hash,
|> select_repo(options).aggregate(:count) options
Logger.info(
"Time consumed for logs_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms"
) )
result
end)
withdrawals_count_task = withdrawals_count_task =
Task.async(fn -> configure_task(
result = :withdrawals,
address_hash cached_counters,
|> Withdrawal.address_hash_to_withdrawals_unordered_query() Withdrawal.address_hash_to_withdrawals_unordered_query(address_hash),
|> limit(@counters_limit) address_hash,
|> select_repo(options).aggregate(:count) options
Logger.info(
"Time consumed for withdrawals_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms"
) )
result
end)
internal_txs_count_task = internal_txs_count_task =
Task.async(fn -> configure_task(
query_to_address_hash_wrapped = :internal_txs,
InternalTransaction cached_counters,
|> InternalTransaction.where_nonpending_block() address_hash_to_internal_txs_limited_count_query(address_hash),
|> InternalTransaction.where_address_fields_match(address_hash, :to_address_hash) address_hash,
|> InternalTransaction.where_is_different_from_parent_transaction() options
|> limit(@counters_limit)
|> wrapped_union_subquery()
query_from_address_hash_wrapped =
InternalTransaction
|> InternalTransaction.where_nonpending_block()
|> InternalTransaction.where_address_fields_match(address_hash, :from_address_hash)
|> InternalTransaction.where_is_different_from_parent_transaction()
|> limit(@counters_limit)
|> wrapped_union_subquery()
query_created_contract_address_hash_wrapped =
InternalTransaction
|> InternalTransaction.where_nonpending_block()
|> InternalTransaction.where_address_fields_match(address_hash, :created_contract_address_hash)
|> InternalTransaction.where_is_different_from_parent_transaction()
|> limit(@counters_limit)
|> wrapped_union_subquery()
result =
query_to_address_hash_wrapped
|> union(^query_from_address_hash_wrapped)
|> union(^query_created_contract_address_hash_wrapped)
|> wrapped_union_subquery()
|> InternalTransaction.where_is_different_from_parent_transaction()
|> limit(@counters_limit)
|> select_repo(options).aggregate(:count)
Logger.info(
"Time consumed for internal_txs_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms"
)
result
end)
coin_balances_count_task =
Task.async(fn ->
result =
address_hash
|> address_hash_to_coin_balances()
|> limit(@counters_limit)
|> select_repo(options).aggregate(:count)
Logger.info(
"Time consumed for coin_balances_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms"
) )
result map =
end)
{validations, txs_from, txs_to, txs_contract, token_transfers, token_balances, logs, withdrawals, internal_txs,
coin_balances} =
[ [
validations_count_task, validations_count_task,
transactions_from_count_task, transactions_from_count_task,
transactions_to_count_task, transactions_to_count_task,
transactions_created_contract_count_task, transactions_created_contract_count_task,
token_transfer_count_task, token_transfers_count_task,
token_balances_count_task, token_balances_count_task,
logs_count_task, logs_count_task,
withdrawals_count_task, withdrawals_count_task,
internal_txs_count_task, internal_txs_count_task
coin_balances_count_task
] ]
|> Task.yield_many(:timer.seconds(30)) |> Enum.reject(&is_nil/1)
|> Enum.map(fn {_task, res} -> |> Task.yield_many(:timer.seconds(1))
|> Enum.reduce(Map.merge(prepare_cache_values(cached_counters), %{txs_types: [], txs_hashes: []}), fn {task, res},
acc ->
case res do case res do
{:ok, result} -> {:ok, {txs_type, txs_hashes}} when txs_type in @txs_types ->
result acc
|> (&Map.put(&1, :txs_types, [txs_type | &1[:txs_types] || []])).()
|> (&Map.put(&1, :txs_hashes, &1[:txs_hashes] ++ txs_hashes)).()
{:ok, {type, counter}} ->
Map.put(acc, type, counter)
{:exit, reason} -> {:exit, reason} ->
Logger.warn(fn -> Logger.warn(fn ->
[ [
"Query fetching address counters terminated: #{inspect(reason)}" "Query fetching address counters for #{address_hash} terminated: #{inspect(reason)}"
] ]
end) end)
nil acc
nil -> nil ->
Logger.warn(fn -> Logger.warn(fn ->
[ [
"Query fetching address counters timed out." "Query fetching address counters for #{address_hash} timed out."
] ]
end) end)
nil Task.ignore(task)
acc
end end
end) end)
|> List.to_tuple() |> process_txs_counter()
{map[:validations], map[:txs], map[:token_transfers], map[:token_balances], map[:logs], map[:withdrawals],
map[:internal_txs]}
end
defp run_or_ignore({ok, _counter}, _type, _address_hash, _fun) when ok in [:up_to_date, :limit_value], do: nil
defp run_or_ignore(_, type, address_hash, fun) do
if !AddressesTabsCounters.get_task(type, address_hash) do
AddressesTabsCounters.set_task(type, address_hash)
Task.async(fun)
end
end
{validations, defp configure_task(counter_type, cache, query, address_hash, options) do
(sanitize_list(txs_from) ++ sanitize_list(txs_to) ++ sanitize_list(txs_contract)) |> Enum.dedup() |> Enum.count(), address_hash = to_string(address_hash)
token_transfers, token_balances, logs, withdrawals, internal_txs, coin_balances} start = System.monotonic_time()
run_or_ignore(cache[counter_type], counter_type, address_hash, fn ->
result =
query
|> limit(@counters_limit)
|> select_repo(options).aggregate(:count)
stop = System.monotonic_time()
diff = System.convert_time_unit(stop - start, :native, :millisecond)
Logger.info("Time consumed for #{counter_type} counter task for #{address_hash} is #{diff}ms")
AddressesTabsCounters.set_counter(counter_type, address_hash, result)
AddressesTabsCounters.drop_task(counter_type, address_hash)
{counter_type, result}
end)
end
defp process_txs_counter(%{txs_types: [_ | _] = txs_types, txs_hashes: hashes} = map) do
counter = hashes |> Enum.uniq() |> Enum.count() |> min(@counters_limit)
if Enum.count(txs_types) == 3 || counter == @counters_limit do
map |> Map.put(:txs, counter)
else
map
end
end
defp process_txs_counter(map), do: map
defp prepare_cache_values(cached_counters) do
Enum.reduce(cached_counters, %{}, fn
{k, {_, counter}}, acc ->
Map.put(acc, k, counter)
{k, v}, acc ->
Map.put(acc, k, v)
end)
end end
defp sanitize_list(nil), do: [] @doc """
defp sanitize_list(other), do: other Returns all possible transactions type
"""
@spec txs_types :: list(atom)
def txs_types, do: @txs_types
@doc """
Returns max counter value
"""
@spec counters_limit :: integer()
def counters_limit, do: @counters_limit
end end

@ -0,0 +1,135 @@
defmodule Explorer.Chain.Cache.AddressesTabsCounters do
@moduledoc """
Cache for tabs counters on address
"""
use GenServer
import Explorer.Counters.Helper, only: [fetch_from_cache: 3]
alias Explorer.Chain.Address.Counters
@cache_name :addresses_tabs_counters
@typep counter_type :: :validations | :txs | :token_transfers | :token_balances | :logs | :withdrawals | :internal_txs
@typep response_status :: :limit_value | :stale | :up_to_date
@spec get_counter(counter_type, String.t()) :: {DateTime.t(), non_neg_integer(), response_status} | nil
def get_counter(counter_type, address_hash) do
address_hash |> cache_key(counter_type) |> fetch_from_cache(@cache_name, nil) |> check_staleness()
end
@spec set_counter(counter_type, String.t(), non_neg_integer()) :: :ok
def set_counter(counter_type, address_hash, counter, need_to_modify_state? \\ true) do
:ets.insert(@cache_name, {cache_key(address_hash, counter_type), {DateTime.utc_now(), counter}})
if need_to_modify_state?, do: ignore_txs(counter_type, address_hash)
:ok
end
@spec set_task(atom, String.t()) :: true
def set_task(counter_type, address_hash) do
:ets.insert(@cache_name, {task_cache_key(address_hash, counter_type), true})
end
@spec drop_task(atom, String.t()) :: true
def drop_task(counter_type, address_hash) do
:ets.delete(@cache_name, task_cache_key(address_hash, counter_type))
end
@spec get_task(atom, String.t()) :: true | nil
def get_task(counter_type, address_hash) do
address_hash |> task_cache_key(counter_type) |> fetch_from_cache(@cache_name, nil)
end
@spec ignore_txs(atom, String.t()) :: :ignore | :ok
def ignore_txs(:txs, address_hash), do: GenServer.cast(__MODULE__, {:ignore_txs, address_hash})
def ignore_txs(_counter_type, _address_hash), do: :ignore
def save_txs_counter_progress(address_hash, results) do
GenServer.cast(__MODULE__, {:set_txs_state, address_hash, results})
end
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(_opts) do
:ets.new(@cache_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
{:ok, %{}}
end
@impl true
def handle_cast({:ignore_txs, address_hash}, state) do
{:noreply, Map.put(state, lowercased_string(address_hash), {:updated, DateTime.utc_now()})}
end
@impl true
def handle_cast({:set_txs_state, address_hash, %{txs_types: txs_types} = results}, state) do
address_hash = lowercased_string(address_hash)
if is_ignored?(state[address_hash]) do
{:noreply, state}
else
address_state =
txs_types
|> Enum.reduce(state[address_hash] || %{}, fn tx_type, acc ->
Map.put(acc, tx_type, results[tx_type])
end)
|> (&Map.put(&1, :txs_types, (txs_types ++ (&1[:txs_types] || [])) |> Enum.uniq())).()
counter =
Counters.txs_types()
|> Enum.reduce([], fn type, acc ->
(address_state[type] || []) ++ acc
end)
|> Enum.uniq()
|> Enum.count()
|> min(Counters.counters_limit())
if counter == Counters.counters_limit() || Enum.count(address_state[:txs_types]) == 3 do
set_counter(:txs, address_hash, counter, false)
{:noreply, Map.put(state, address_hash, {:updated, DateTime.utc_now()})}
else
{:noreply, Map.put(state, address_hash, address_state)}
end
end
end
defp is_ignored?({:updated, datetime}), do: is_up_to_date?(datetime, ttl())
defp is_ignored?(_), do: false
defp check_staleness(nil), do: nil
defp check_staleness({datetime, counter}) when counter > 50, do: {datetime, counter, :limit_value}
defp check_staleness({datetime, counter}) do
status =
if is_up_to_date?(datetime, ttl()) do
:up_to_date
else
:stale
end
{datetime, counter, status}
end
defp is_up_to_date?(datetime, ttl) do
datetime
|> DateTime.add(ttl, :millisecond)
|> DateTime.compare(DateTime.utc_now()) != :lt
end
defp ttl, do: Application.get_env(:explorer, Explorer.Chain.Cache.AddressesTabsCounters)[:ttl]
defp lowercased_string(str), do: str |> to_string() |> String.downcase()
defp cache_key(address_hash, counter_type), do: {lowercased_string(address_hash), counter_type}
defp task_cache_key(address_hash, counter_type), do: {:task, lowercased_string(address_hash), counter_type}
end

@ -16,13 +16,13 @@ defmodule Explorer.Counters.Helper do
DateTime.to_unix(utc_now, :millisecond) DateTime.to_unix(utc_now, :millisecond)
end end
def fetch_from_cache(key, cache_name) do def fetch_from_cache(key, cache_name, default \\ 0) do
case :ets.lookup(cache_name, key) do case :ets.lookup(cache_name, key) do
[{_, value}] -> [{_, value}] ->
value value
[] -> [] ->
0 default
end end
end end

@ -434,6 +434,9 @@ config :explorer, Explorer.Chain.Transaction,
rootstock_remasc_address: System.get_env("ROOTSTOCK_REMASC_ADDRESS"), rootstock_remasc_address: System.get_env("ROOTSTOCK_REMASC_ADDRESS"),
rootstock_bridge_address: System.get_env("ROOTSTOCK_BRIDGE_ADDRESS") rootstock_bridge_address: System.get_env("ROOTSTOCK_BRIDGE_ADDRESS")
config :explorer, Explorer.Chain.Cache.AddressesTabsCounters,
ttl: ConfigHelper.parse_time_env_var("ADDRESSES_TABS_COUNTERS_TTL", "10m")
############### ###############
### Indexer ### ### Indexer ###
############### ###############

@ -243,3 +243,4 @@ EIP_1559_ELASTICITY_MULTIPLIER=2
# TOKEN_INSTANCE_OWNER_MIGRATION_BATCH_SIZE=50 # TOKEN_INSTANCE_OWNER_MIGRATION_BATCH_SIZE=50
# IPFS_GATEWAY_URL= # IPFS_GATEWAY_URL=
API_V2_ENABLED=true API_V2_ENABLED=true
# ADDRESSES_TABS_COUNTERS_TTL=10m
Loading…
Cancel
Save