From 2f4db2e9aa1d72132fea23a3bbed9dbcf5c86402 Mon Sep 17 00:00:00 2001 From: Nikita Pozdniakov Date: Wed, 4 Oct 2023 14:00:53 +0300 Subject: [PATCH 1/3] Improve `/tabs-counters` and add cache --- CHANGELOG.md | 1 + .../controllers/api/v2/address_controller.ex | 7 +- .../api/v2/address_controller_test.exs | 233 ++++++++++++ apps/explorer/lib/explorer/application.ex | 2 + .../lib/explorer/chain/address/counters.ex | 358 ++++++++++-------- .../chain/cache/addresses_tabs_counters.ex | 131 +++++++ apps/explorer/lib/explorer/counters/helper.ex | 4 +- config/runtime.exs | 3 + 8 files changed, 568 insertions(+), 171 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/cache/addresses_tabs_counters.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index 58f26fe682..fe51760c57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features +- [#8512](https://github.com/blockscout/blockscout/pull/8512) - Add caching and improve `/tabs-counters` performance - [#8472](https://github.com/blockscout/blockscout/pull/8472) - Integrate `/api/v2/bytecodes/sources:search-all` of `eth_bytecode_db` - [#8589](https://github.com/blockscout/blockscout/pull/8589) - DefiLlama TVL source - [#8544](https://github.com/blockscout/blockscout/pull/8544) - Fix `nil` `"structLogs"` diff --git a/apps/block_scout_web/lib/block_scout_web/controllers/api/v2/address_controller.ex b/apps/block_scout_web/lib/block_scout_web/controllers/api/v2/address_controller.ex index 160fd28cf6..5512f086f1 100644 --- a/apps/block_scout_web/lib/block_scout_web/controllers/api/v2/address_controller.ex +++ b/apps/block_scout_web/lib/block_scout_web/controllers/api/v2/address_controller.ex @@ -414,8 +414,8 @@ defmodule BlockScoutWeb.API.V2.AddressController do with {:format, {:ok, address_hash}} <- {:format, Chain.string_to_address_hash(address_hash_string)}, {:ok, false} <- AccessHelper.restricted_access?(address_hash_string, params), {:not_found, {:ok, _address}} <- {:not_found, Chain.hash_to_address(address_hash, @api_true, false)} do - {validations, transactions, token_transfers, token_balances, logs, withdrawals, internal_txs, coin_balances} = - Counters.address_limited_counters(address_hash_string, @api_true) + {validations, transactions, token_transfers, token_balances, logs, withdrawals, internal_txs} = + Counters.address_limited_counters(address_hash, @api_true) conn |> put_status(200) @@ -426,8 +426,7 @@ defmodule BlockScoutWeb.API.V2.AddressController do token_balances_count: token_balances, logs_count: logs, withdrawals_count: withdrawals, - internal_txs_count: internal_txs, - coin_balances_count: coin_balances + internal_txs_count: internal_txs }) end end diff --git a/apps/block_scout_web/test/block_scout_web/controllers/api/v2/address_controller_test.exs b/apps/block_scout_web/test/block_scout_web/controllers/api/v2/address_controller_test.exs index ee6f156fa5..06013076b2 100644 --- a/apps/block_scout_web/test/block_scout_web/controllers/api/v2/address_controller_test.exs +++ b/apps/block_scout_web/test/block_scout_web/controllers/api/v2/address_controller_test.exs @@ -1670,6 +1670,239 @@ defmodule BlockScoutWeb.API.V2.AddressControllerTest do end end + describe "/addresses/{address_hash}/tabs-counters" do + test "get 404 on non existing address", %{conn: conn} do + address = build(:address) + + request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters") + + assert %{"message" => "Not found"} = json_response(request, 404) + end + + test "get 422 on invalid address", %{conn: conn} do + request = get(conn, "/api/v2/addresses/0x/tabs-counters") + + assert %{"message" => "Invalid parameter(s)"} = json_response(request, 422) + end + + test "get counters with 0s", %{conn: conn} do + address = insert(:address) + + request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters") + + assert %{ + "validations_count" => 0, + "transactions_count" => 0, + "token_transfers_count" => 0, + "token_balances_count" => 0, + "logs_count" => 0, + "withdrawals_count" => 0, + "internal_txs_count" => 0 + } = json_response(request, 200) + end + + test "get counters and check that cache works", %{conn: conn} do + address = insert(:address, withdrawals: insert_list(60, :withdrawal)) + + insert(:transaction, from_address: address) |> with_block() + insert(:transaction, to_address: address) |> with_block() + another_tx = insert(:transaction) |> with_block() + + insert(:token_transfer, + from_address: address, + transaction: another_tx, + block: another_tx.block, + block_number: another_tx.block_number + ) + + insert(:token_transfer, + to_address: address, + transaction: another_tx, + block: another_tx.block, + block_number: another_tx.block_number + ) + + insert(:block, miner: address) + + tx = + :transaction + |> insert() + |> with_block() + + for x <- 1..2 do + insert(:internal_transaction, + transaction: tx, + index: x, + block_number: tx.block_number, + transaction_index: tx.index, + block_hash: tx.block_hash, + block_index: x, + from_address: address + ) + end + + for _ <- 0..60 do + insert(:address_current_token_balance_with_token_id, address: address) + end + + for x <- 0..60 do + tx = + :transaction + |> insert() + |> with_block() + + insert(:log, + transaction: tx, + index: x, + block: tx.block, + block_number: tx.block_number, + address: address + ) + end + + request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters") + + assert %{ + "validations_count" => 1, + "transactions_count" => 2, + "token_transfers_count" => 2, + "token_balances_count" => 51, + "logs_count" => 51, + "withdrawals_count" => 51, + "internal_txs_count" => 2 + } = json_response(request, 200) + + for x <- 3..4 do + insert(:internal_transaction, + transaction: tx, + index: x, + block_number: tx.block_number, + transaction_index: tx.index, + block_hash: tx.block_hash, + block_index: x, + from_address: address + ) + end + + request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters") + + assert %{ + "validations_count" => 1, + "transactions_count" => 2, + "token_transfers_count" => 2, + "token_balances_count" => 51, + "logs_count" => 51, + "withdrawals_count" => 51, + "internal_txs_count" => 2 + } = json_response(request, 200) + end + + test "check counters cache ttl", %{conn: conn} do + address = insert(:address, withdrawals: insert_list(60, :withdrawal)) + + insert(:transaction, from_address: address) |> with_block() + insert(:transaction, to_address: address) |> with_block() + another_tx = insert(:transaction) |> with_block() + + insert(:token_transfer, + from_address: address, + transaction: another_tx, + block: another_tx.block, + block_number: another_tx.block_number + ) + + insert(:token_transfer, + to_address: address, + transaction: another_tx, + block: another_tx.block, + block_number: another_tx.block_number + ) + + insert(:block, miner: address) + + tx = + :transaction + |> insert() + |> with_block() + + for x <- 1..2 do + insert(:internal_transaction, + transaction: tx, + index: x, + block_number: tx.block_number, + transaction_index: tx.index, + block_hash: tx.block_hash, + block_index: x, + from_address: address + ) + end + + for _ <- 0..60 do + insert(:address_current_token_balance_with_token_id, address: address) + end + + for x <- 0..60 do + tx = + :transaction + |> insert() + |> with_block() + + insert(:log, + transaction: tx, + index: x, + block: tx.block, + block_number: tx.block_number, + address: address + ) + end + + request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters") + + assert %{ + "validations_count" => 1, + "transactions_count" => 2, + "token_transfers_count" => 2, + "token_balances_count" => 51, + "logs_count" => 51, + "withdrawals_count" => 51, + "internal_txs_count" => 2 + } = json_response(request, 200) + + old_env = Application.get_env(:explorer, Explorer.Chain.Cache.AddressesTabsCounters) + Application.put_env(:explorer, Explorer.Chain.Cache.AddressesTabsCounters, ttl: 200) + :timer.sleep(200) + + for x <- 3..4 do + insert(:internal_transaction, + transaction: tx, + index: x, + block_number: tx.block_number, + transaction_index: tx.index, + block_hash: tx.block_hash, + block_index: x, + from_address: address + ) + end + + insert(:transaction, from_address: address) |> with_block() + insert(:transaction, to_address: address) |> with_block() + + request = get(conn, "/api/v2/addresses/#{address.hash}/tabs-counters") + + assert %{ + "validations_count" => 1, + "transactions_count" => 4, + "token_transfers_count" => 2, + "token_balances_count" => 51, + "logs_count" => 51, + "withdrawals_count" => 51, + "internal_txs_count" => 4 + } = json_response(request, 200) + + Application.put_env(:explorer, Explorer.Chain.Cache.AddressesTabsCounters, old_env) + end + end + defp compare_item(%Address{} = address, json) do assert Address.checksum(address.hash) == json["hash"] assert to_string(address.nonce + 1) == json["tx_count"] diff --git a/apps/explorer/lib/explorer/application.ex b/apps/explorer/lib/explorer/application.ex index 1799fc2882..b46894d80c 100644 --- a/apps/explorer/lib/explorer/application.ex +++ b/apps/explorer/lib/explorer/application.ex @@ -9,6 +9,7 @@ defmodule Explorer.Application do alias Explorer.Chain.Cache.{ Accounts, + AddressesTabsCounters, AddressSum, AddressSumMinusBurnt, Block, @@ -77,6 +78,7 @@ defmodule Explorer.Application do Transactions, TransactionsApiV2, Uncles, + AddressesTabsCounters, con_cache_child_spec(MarketHistoryCache.cache_name()), con_cache_child_spec(RSK.cache_name(), ttl_check_interval: :timer.minutes(1), global_ttl: :timer.minutes(30)), {Redix, redix_opts()}, diff --git a/apps/explorer/lib/explorer/chain/address/counters.ex b/apps/explorer/lib/explorer/chain/address/counters.ex index a8d2b78b6d..3538f062dc 100644 --- a/apps/explorer/lib/explorer/chain/address/counters.ex +++ b/apps/explorer/lib/explorer/chain/address/counters.ex @@ -2,7 +2,7 @@ defmodule Explorer.Chain.Address.Counters do @moduledoc """ Functions related to Explorer.Chain.Address counters """ - import Ecto.Query, only: [from: 2, limit: 2, select: 3, subquery: 1, union: 2, where: 3] + import Ecto.Query, only: [from: 2, limit: 2, select: 3, union: 2, where: 3] import Explorer.Chain, only: [select_repo: 1, wrapped_union_subquery: 1] @@ -19,7 +19,6 @@ defmodule Explorer.Chain.Address.Counters do alias Explorer.Chain.{ Address, - Address.CoinBalance, Address.CurrentTokenBalance, Block, Hash, @@ -30,10 +29,17 @@ defmodule Explorer.Chain.Address.Counters do Withdrawal } + alias Explorer.Chain.Cache.AddressesTabsCounters alias Explorer.Chain.Cache.Helper, as: CacheHelper require Logger + @typep counter :: non_neg_integer() | nil + + @counters_limit 51 + @types [:validations, :txs, :token_transfers, :token_balances, :logs, :withdrawals, :internal_txs] + @txs_types [:txs_from, :txs_to, :txs_contract] + defp address_hash_to_logs_query(address_hash) do from(l in Log, where: l.address_hash == ^address_hash) end @@ -50,22 +56,6 @@ defmodule Explorer.Chain.Address.Counters do select_repo(options).exists?(address_hash_to_logs_query(address_hash)) end - defp address_hash_to_coin_balances(address_hash) do - query = - from( - cb in CoinBalance, - where: cb.address_hash == ^address_hash, - where: not is_nil(cb.value), - select_merge: %{ - delta: fragment("? - coalesce(lead(?, 1) over (order by ? desc), 0)", cb.value, cb.value, cb.block_number) - } - ) - - from(balance in subquery(query), - where: balance.delta != 0 - ) - end - def check_if_token_transfers_at_address(address_hash, options \\ []) do select_repo(options).exists?(from(tt in TokenTransfer, where: tt.from_address_hash == ^address_hash)) || select_repo(options).exists?(from(tt in TokenTransfer, where: tt.to_address_hash == ^address_hash)) @@ -255,6 +245,39 @@ defmodule Explorer.Chain.Address.Counters do end end + def address_hash_to_internal_txs_limited_count_query(address_hash) do + query_to_address_hash_wrapped = + InternalTransaction + |> InternalTransaction.where_nonpending_block() + |> InternalTransaction.where_address_fields_match(address_hash, :to_address_hash) + |> InternalTransaction.where_is_different_from_parent_transaction() + |> limit(@counters_limit) + |> wrapped_union_subquery() + + query_from_address_hash_wrapped = + InternalTransaction + |> InternalTransaction.where_nonpending_block() + |> InternalTransaction.where_address_fields_match(address_hash, :from_address_hash) + |> InternalTransaction.where_is_different_from_parent_transaction() + |> limit(@counters_limit) + |> wrapped_union_subquery() + + query_created_contract_address_hash_wrapped = + InternalTransaction + |> InternalTransaction.where_nonpending_block() + |> InternalTransaction.where_address_fields_match(address_hash, :created_contract_address_hash) + |> InternalTransaction.where_is_different_from_parent_transaction() + |> limit(@counters_limit) + |> wrapped_union_subquery() + + query_to_address_hash_wrapped + |> union(^query_from_address_hash_wrapped) + |> union(^query_created_contract_address_hash_wrapped) + |> wrapped_union_subquery() + |> InternalTransaction.where_is_different_from_parent_transaction() + |> limit(@counters_limit) + end + def address_counters(address, options \\ []) do validation_count_task = Task.async(fn -> @@ -304,28 +327,33 @@ defmodule Explorer.Chain.Address.Counters do AddressTransactionsGasUsageCounter.fetch(address) end - @counters_limit 51 - + @spec address_limited_counters(Hash.t(), Keyword.t()) :: + {counter(), counter(), counter(), counter(), counter(), counter(), counter()} def address_limited_counters(address_hash, options) do + cached_counters = + Enum.reduce(@types, %{}, fn type, acc -> + case AddressesTabsCounters.get_counter(type, address_hash) do + {_datetime, counter, status} -> + Map.put(acc, type, {status, counter}) + + _ -> + acc + end + end) + start = Time.utc_now() validations_count_task = - Task.async(fn -> - result = - address_hash - |> address_hash_to_validated_blocks_query() - |> limit(@counters_limit) - |> select_repo(options).aggregate(:count) - - Logger.info( - "Time consumed for validations_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) - - result - end) + configure_task( + :validations, + cached_counters, + address_hash_to_validated_blocks_query(address_hash), + address_hash, + options + ) transactions_from_count_task = - Task.async(fn -> + run_or_ignore(cached_counters[:txs], :txs_from, address_hash, fn -> result = Transaction |> where([t], t.from_address_hash == ^address_hash) @@ -338,11 +366,14 @@ defmodule Explorer.Chain.Address.Counters do "Time consumed for transactions_from_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" ) - result + AddressesTabsCounters.save_txs_counter_progress(address_hash, %{txs_types: [:txs_from], txs_from: result}) + AddressesTabsCounters.drop_task(:txs_from, address_hash) + + {:txs_from, result} end) transactions_to_count_task = - Task.async(fn -> + run_or_ignore(cached_counters[:txs], :txs_to, address_hash, fn -> result = Transaction |> where([t], t.to_address_hash == ^address_hash) @@ -355,11 +386,14 @@ defmodule Explorer.Chain.Address.Counters do "Time consumed for transactions_to_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" ) - result + AddressesTabsCounters.save_txs_counter_progress(address_hash, %{txs_types: [:txs_to], txs_to: result}) + AddressesTabsCounters.drop_task(:txs_to, address_hash) + + {:txs_to, result} end) transactions_created_contract_count_task = - Task.async(fn -> + run_or_ignore(cached_counters[:txs], :txs_contract, address_hash, fn -> result = Transaction |> where([t], t.created_contract_address_hash == ^address_hash) @@ -372,172 +406,166 @@ defmodule Explorer.Chain.Address.Counters do "Time consumed for transactions_created_contract_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" ) - result - end) - - token_transfer_count_task = - Task.async(fn -> - result = - address_hash - |> address_to_token_transfer_count_query() - |> limit(@counters_limit) - |> select_repo(options).aggregate(:count) + AddressesTabsCounters.save_txs_counter_progress(address_hash, %{ + txs_types: [:txs_contract], + txs_contract: result + }) - Logger.info( - "Time consumed for token_transfer_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) + AddressesTabsCounters.drop_task(:txs_contract, address_hash) - result + {:txs_contract, result} end) - token_balances_count_task = - Task.async(fn -> - result = - address_hash - |> address_hash_to_token_balances_query() - |> limit(@counters_limit) - |> select_repo(options).aggregate(:count) - - Logger.info( - "Time consumed for token_balances_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) + token_transfers_count_task = + configure_task( + :token_transfers, + cached_counters, + address_to_token_transfer_count_query(address_hash), + address_hash, + options + ) - result - end) + token_balances_count_task = + configure_task( + :token_balances, + cached_counters, + address_hash_to_token_balances_query(address_hash), + address_hash, + options + ) logs_count_task = - Task.async(fn -> - result = - address_hash - |> address_hash_to_logs_query() - |> limit(@counters_limit) - |> select_repo(options).aggregate(:count) - - Logger.info( - "Time consumed for logs_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) - - result - end) + configure_task( + :logs, + cached_counters, + address_hash_to_logs_query(address_hash), + address_hash, + options + ) withdrawals_count_task = - Task.async(fn -> - result = - address_hash - |> Withdrawal.address_hash_to_withdrawals_unordered_query() - |> limit(@counters_limit) - |> select_repo(options).aggregate(:count) - - Logger.info( - "Time consumed for withdrawals_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) - - result - end) + configure_task( + :withdrawals, + cached_counters, + Withdrawal.address_hash_to_withdrawals_unordered_query(address_hash), + address_hash, + options + ) internal_txs_count_task = - Task.async(fn -> - query_to_address_hash_wrapped = - InternalTransaction - |> InternalTransaction.where_nonpending_block() - |> InternalTransaction.where_address_fields_match(address_hash, :to_address_hash) - |> InternalTransaction.where_is_different_from_parent_transaction() - |> limit(@counters_limit) - |> wrapped_union_subquery() - - query_from_address_hash_wrapped = - InternalTransaction - |> InternalTransaction.where_nonpending_block() - |> InternalTransaction.where_address_fields_match(address_hash, :from_address_hash) - |> InternalTransaction.where_is_different_from_parent_transaction() - |> limit(@counters_limit) - |> wrapped_union_subquery() - - query_created_contract_address_hash_wrapped = - InternalTransaction - |> InternalTransaction.where_nonpending_block() - |> InternalTransaction.where_address_fields_match(address_hash, :created_contract_address_hash) - |> InternalTransaction.where_is_different_from_parent_transaction() - |> limit(@counters_limit) - |> wrapped_union_subquery() - - result = - query_to_address_hash_wrapped - |> union(^query_from_address_hash_wrapped) - |> union(^query_created_contract_address_hash_wrapped) - |> wrapped_union_subquery() - |> InternalTransaction.where_is_different_from_parent_transaction() - |> limit(@counters_limit) - |> select_repo(options).aggregate(:count) - - Logger.info( - "Time consumed for internal_txs_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) - - result - end) - - coin_balances_count_task = - Task.async(fn -> - result = - address_hash - |> address_hash_to_coin_balances() - |> limit(@counters_limit) - |> select_repo(options).aggregate(:count) - - Logger.info( - "Time consumed for coin_balances_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) - - result - end) + configure_task( + :internal_txs, + cached_counters, + address_hash_to_internal_txs_limited_count_query(address_hash), + address_hash, + options + ) - {validations, txs_from, txs_to, txs_contract, token_transfers, token_balances, logs, withdrawals, internal_txs, - coin_balances} = + map = [ validations_count_task, transactions_from_count_task, transactions_to_count_task, transactions_created_contract_count_task, - token_transfer_count_task, + token_transfers_count_task, token_balances_count_task, logs_count_task, withdrawals_count_task, - internal_txs_count_task, - coin_balances_count_task + internal_txs_count_task ] - |> Task.yield_many(:timer.seconds(30)) - |> Enum.map(fn {_task, res} -> + |> Enum.reject(&is_nil/1) + |> Task.yield_many(:timer.seconds(1)) + |> Enum.reduce(Map.merge(prepare_cache_values(cached_counters), %{txs_types: [], txs_hashes: []}), fn {task, res}, + acc -> case res do - {:ok, result} -> - result + {:ok, {txs_type, txs_hashes}} when txs_type in @txs_types -> + acc + |> (&Map.put(&1, :txs_types, [txs_type | &1[:txs_types] || []])).() + |> (&Map.put(&1, :txs_hashes, &1[:txs_hashes] ++ txs_hashes)).() + + {:ok, {type, counter}} -> + Map.put(acc, type, counter) {:exit, reason} -> Logger.warn(fn -> [ - "Query fetching address counters terminated: #{inspect(reason)}" + "Query fetching address counters for #{address_hash} terminated: #{inspect(reason)}" ] end) - nil + acc nil -> Logger.warn(fn -> [ - "Query fetching address counters timed out." + "Query fetching address counters for #{address_hash} timed out." ] end) - nil + Task.ignore(task) + + acc end end) - |> List.to_tuple() + |> process_txs_counter() + + {map[:validations], map[:txs], map[:token_transfers], map[:token_balances], map[:logs], map[:withdrawals], + map[:internal_txs]} + end + + defp run_or_ignore({ok, _counter}, _type, _address_hash, _fun) when ok in [:up_to_date, :limit_value], do: nil + + defp run_or_ignore(_, type, address_hash, fun) do + if !AddressesTabsCounters.get_task(type, address_hash) do + AddressesTabsCounters.set_task(type, address_hash) + + Task.async(fun) + end + end + + defp configure_task(counter_type, cache, query, address_hash, options) do + address_hash = to_string(address_hash) + start = Time.utc_now() + + run_or_ignore(cache[counter_type], counter_type, address_hash, fn -> + result = + query + |> limit(@counters_limit) + |> select_repo(options).aggregate(:count) - {validations, - (sanitize_list(txs_from) ++ sanitize_list(txs_to) ++ sanitize_list(txs_contract)) |> Enum.dedup() |> Enum.count(), - token_transfers, token_balances, logs, withdrawals, internal_txs, coin_balances} + Logger.info( + "Time consumed for #{counter_type} counter task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" + ) + + AddressesTabsCounters.set_counter(counter_type, address_hash, result) + AddressesTabsCounters.drop_task(counter_type, address_hash) + + {counter_type, result} + end) + end + + defp process_txs_counter(%{txs_types: [_ | _] = txs_types, txs_hashes: hashes} = map) do + counter = hashes |> Enum.uniq() |> Enum.count() |> min(@counters_limit) + + if Enum.count(txs_types) == 3 || counter == @counters_limit do + map |> Map.put(:txs, counter) + else + map + end + end + + defp process_txs_counter(map), do: map + + defp prepare_cache_values(cached_counters) do + Enum.reduce(cached_counters, %{}, fn + {k, {_, counter}}, acc -> + Map.put(acc, k, counter) + + {k, v}, acc -> + Map.put(acc, k, v) + end) end - defp sanitize_list(nil), do: [] - defp sanitize_list(other), do: other + def txs_types, do: @txs_types + def counters_limit, do: @counters_limit end diff --git a/apps/explorer/lib/explorer/chain/cache/addresses_tabs_counters.ex b/apps/explorer/lib/explorer/chain/cache/addresses_tabs_counters.ex new file mode 100644 index 0000000000..00aaf5c64f --- /dev/null +++ b/apps/explorer/lib/explorer/chain/cache/addresses_tabs_counters.ex @@ -0,0 +1,131 @@ +defmodule Explorer.Chain.Cache.AddressesTabsCounters do + @moduledoc """ + Cache for tabs counters on address + """ + + use GenServer + + import Explorer.Counters.Helper, only: [fetch_from_cache: 3] + + alias Explorer.Chain.Address.Counters + + @cache_name :addresses_tabs_counters + + @typep counter_type :: :validations | :txs | :token_transfers | :token_balances | :logs | :withdrawals | :internal_txs + @typep response_status :: :limit_value | :stale | :up_to_date + + @spec get_counter(counter_type, String.t()) :: {DateTime.t(), non_neg_integer(), response_status} | nil + def get_counter(counter_type, address_hash) do + address_hash |> cache_key(counter_type) |> fetch_from_cache(@cache_name, nil) |> check_staleness() + end + + @spec set_counter(counter_type, String.t(), non_neg_integer()) :: :ok + def set_counter(counter_type, address_hash, counter, need_to_modify_state? \\ true) do + :ets.insert(@cache_name, {cache_key(address_hash, counter_type), {DateTime.utc_now(), counter}}) + if need_to_modify_state?, do: ignore_txs(counter_type, address_hash) + + :ok + end + + def set_task(counter_type, address_hash) do + :ets.insert(@cache_name, {task_cache_key(address_hash, counter_type), true}) + end + + def drop_task(counter_type, address_hash) do + :ets.delete(@cache_name, task_cache_key(address_hash, counter_type)) + end + + def get_task(counter_type, address_hash) do + address_hash |> task_cache_key(counter_type) |> fetch_from_cache(@cache_name, nil) + end + + def ignore_txs(:txs, address_hash), do: GenServer.cast(__MODULE__, {:ignore_txs, address_hash}) + def ignore_txs(_counter_type, _address_hash), do: :ignore + + def save_txs_counter_progress(address_hash, results) do + GenServer.cast(__MODULE__, {:set_txs_state, address_hash, results}) + end + + def start_link(_) do + GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + end + + @impl true + def init(_opts) do + :ets.new(@cache_name, [ + :set, + :named_table, + :public, + read_concurrency: true, + write_concurrency: true + ]) + + {:ok, %{}} + end + + @impl true + def handle_cast({:ignore_txs, address_hash}, state) do + {:noreply, Map.put(state, lowercased_string(address_hash), {:updated, DateTime.utc_now()})} + end + + @impl true + def handle_cast({:set_txs_state, address_hash, %{txs_types: txs_types} = results}, state) do + address_hash = lowercased_string(address_hash) + + if is_ignored?(state[address_hash]) do + {:noreply, state} + else + address_state = + txs_types + |> Enum.reduce(state[address_hash] || %{}, fn tx_type, acc -> + Map.put(acc, tx_type, results[tx_type]) + end) + |> (&Map.put(&1, :txs_types, (txs_types ++ (&1[:txs_types] || [])) |> Enum.uniq())).() + + counter = + Counters.txs_types() + |> Enum.reduce([], fn type, acc -> + (address_state[type] || []) ++ acc + end) + |> Enum.uniq() + |> Enum.count() + |> min(Counters.counters_limit()) + + if counter == Counters.counters_limit() || Enum.count(address_state[:txs_types]) == 3 do + set_counter(:txs, address_hash, counter, false) + {:noreply, Map.put(state, address_hash, {:updated, DateTime.utc_now()})} + else + {:noreply, Map.put(state, address_hash, address_state)} + end + end + end + + defp is_ignored?({:updated, datetime}), do: is_up_to_date?(datetime, ttl()) + defp is_ignored?(_), do: false + + def check_staleness(nil), do: nil + def check_staleness({datetime, counter}) when counter > 50, do: {datetime, counter, :limit_value} + + def check_staleness({datetime, counter}) do + status = + if is_up_to_date?(datetime, ttl()) do + :up_to_date + else + :stale + end + + {datetime, counter, status} + end + + defp is_up_to_date?(datetime, ttl) do + datetime + |> DateTime.add(ttl, :millisecond) + |> DateTime.compare(DateTime.utc_now()) != :lt + end + + defp ttl, do: Application.get_env(:explorer, Explorer.Chain.Cache.AddressesTabsCounters)[:ttl] + defp lowercased_string(str), do: str |> to_string() |> String.downcase() + + defp cache_key(address_hash, counter_type), do: {lowercased_string(address_hash), counter_type} + defp task_cache_key(address_hash, counter_type), do: {:task, lowercased_string(address_hash), counter_type} +end diff --git a/apps/explorer/lib/explorer/counters/helper.ex b/apps/explorer/lib/explorer/counters/helper.ex index 4b202f4242..fbcccf037a 100644 --- a/apps/explorer/lib/explorer/counters/helper.ex +++ b/apps/explorer/lib/explorer/counters/helper.ex @@ -16,13 +16,13 @@ defmodule Explorer.Counters.Helper do DateTime.to_unix(utc_now, :millisecond) end - def fetch_from_cache(key, cache_name) do + def fetch_from_cache(key, cache_name, default \\ 0) do case :ets.lookup(cache_name, key) do [{_, value}] -> value [] -> - 0 + default end end diff --git a/config/runtime.exs b/config/runtime.exs index 0fe6e7377e..5d767b6e29 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -429,6 +429,9 @@ config :explorer, Explorer.Chain.Transaction, rootstock_remasc_address: System.get_env("ROOTSTOCK_REMASC_ADDRESS"), rootstock_bridge_address: System.get_env("ROOTSTOCK_BRIDGE_ADDRESS") +config :explorer, Explorer.Chain.Cache.AddressesTabsCounters, + ttl: ConfigHelper.parse_time_env_var("ADDRESSES_TABS_COUNTERS_TTL", "10m") + ############### ### Indexer ### ############### From 6896b092dfc5f2c354387e96adc321083b1bc787 Mon Sep 17 00:00:00 2001 From: Nikita Pozdniakov Date: Thu, 19 Oct 2023 14:51:49 +0300 Subject: [PATCH 2/3] Change counting time function --- .../lib/explorer/chain/address/counters.ex | 32 +++++++++++-------- docker-compose/envs/common-blockscout.env | 3 +- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/apps/explorer/lib/explorer/chain/address/counters.ex b/apps/explorer/lib/explorer/chain/address/counters.ex index 3538f062dc..71b11b078d 100644 --- a/apps/explorer/lib/explorer/chain/address/counters.ex +++ b/apps/explorer/lib/explorer/chain/address/counters.ex @@ -341,7 +341,7 @@ defmodule Explorer.Chain.Address.Counters do end end) - start = Time.utc_now() + start = System.monotonic_time() validations_count_task = configure_task( @@ -362,9 +362,10 @@ defmodule Explorer.Chain.Address.Counters do |> limit(@counters_limit) |> select_repo(options).all() - Logger.info( - "Time consumed for transactions_from_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) + stop = System.monotonic_time() + diff = System.convert_time_unit(stop - start, :native, :millisecond) + + Logger.info("Time consumed for transactions_from_count_task for #{address_hash} is #{diff}ms") AddressesTabsCounters.save_txs_counter_progress(address_hash, %{txs_types: [:txs_from], txs_from: result}) AddressesTabsCounters.drop_task(:txs_from, address_hash) @@ -382,9 +383,10 @@ defmodule Explorer.Chain.Address.Counters do |> limit(@counters_limit) |> select_repo(options).all() - Logger.info( - "Time consumed for transactions_to_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) + stop = System.monotonic_time() + diff = System.convert_time_unit(stop - start, :native, :millisecond) + + Logger.info("Time consumed for transactions_to_count_task for #{address_hash} is #{diff}ms") AddressesTabsCounters.save_txs_counter_progress(address_hash, %{txs_types: [:txs_to], txs_to: result}) AddressesTabsCounters.drop_task(:txs_to, address_hash) @@ -402,9 +404,10 @@ defmodule Explorer.Chain.Address.Counters do |> limit(@counters_limit) |> select_repo(options).all() - Logger.info( - "Time consumed for transactions_created_contract_count_task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) + stop = System.monotonic_time() + diff = System.convert_time_unit(stop - start, :native, :millisecond) + + Logger.info("Time consumed for transactions_created_contract_count_task for #{address_hash} is #{diff}ms") AddressesTabsCounters.save_txs_counter_progress(address_hash, %{ txs_types: [:txs_contract], @@ -525,7 +528,7 @@ defmodule Explorer.Chain.Address.Counters do defp configure_task(counter_type, cache, query, address_hash, options) do address_hash = to_string(address_hash) - start = Time.utc_now() + start = System.monotonic_time() run_or_ignore(cache[counter_type], counter_type, address_hash, fn -> result = @@ -533,9 +536,10 @@ defmodule Explorer.Chain.Address.Counters do |> limit(@counters_limit) |> select_repo(options).aggregate(:count) - Logger.info( - "Time consumed for #{counter_type} counter task for #{address_hash} is #{Time.diff(Time.utc_now(), start, :millisecond)}ms" - ) + stop = System.monotonic_time() + diff = System.convert_time_unit(stop - start, :native, :millisecond) + + Logger.info("Time consumed for #{counter_type} counter task for #{address_hash} is #{diff}ms") AddressesTabsCounters.set_counter(counter_type, address_hash, result) AddressesTabsCounters.drop_task(counter_type, address_hash) diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index bc1a081199..3124347c80 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -240,4 +240,5 @@ EIP_1559_ELASTICITY_MULTIPLIER=2 # TOKEN_INSTANCE_OWNER_MIGRATION_CONCURRENCY=5 # TOKEN_INSTANCE_OWNER_MIGRATION_BATCH_SIZE=50 # IPFS_GATEWAY_URL= -API_V2_ENABLED=true \ No newline at end of file +API_V2_ENABLED=true +# ADDRESSES_TABS_COUNTERS_TTL=10m \ No newline at end of file From 4614878507f523746fc9d1ab304e896a6f5b655c Mon Sep 17 00:00:00 2001 From: Nikita Pozdniakov Date: Fri, 20 Oct 2023 12:23:44 +0300 Subject: [PATCH 3/3] Process review comments --- apps/explorer/lib/explorer/chain/address/counters.ex | 11 ++++++++++- .../explorer/chain/cache/addresses_tabs_counters.ex | 10 +++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/apps/explorer/lib/explorer/chain/address/counters.ex b/apps/explorer/lib/explorer/chain/address/counters.ex index 71b11b078d..9f0aea47d8 100644 --- a/apps/explorer/lib/explorer/chain/address/counters.ex +++ b/apps/explorer/lib/explorer/chain/address/counters.ex @@ -245,7 +245,7 @@ defmodule Explorer.Chain.Address.Counters do end end - def address_hash_to_internal_txs_limited_count_query(address_hash) do + defp address_hash_to_internal_txs_limited_count_query(address_hash) do query_to_address_hash_wrapped = InternalTransaction |> InternalTransaction.where_nonpending_block() @@ -570,6 +570,15 @@ defmodule Explorer.Chain.Address.Counters do end) end + @doc """ + Returns all possible transactions type + """ + @spec txs_types :: list(atom) def txs_types, do: @txs_types + + @doc """ + Returns max counter value + """ + @spec counters_limit :: integer() def counters_limit, do: @counters_limit end diff --git a/apps/explorer/lib/explorer/chain/cache/addresses_tabs_counters.ex b/apps/explorer/lib/explorer/chain/cache/addresses_tabs_counters.ex index 00aaf5c64f..ab79fbf0a5 100644 --- a/apps/explorer/lib/explorer/chain/cache/addresses_tabs_counters.ex +++ b/apps/explorer/lib/explorer/chain/cache/addresses_tabs_counters.ex @@ -27,18 +27,22 @@ defmodule Explorer.Chain.Cache.AddressesTabsCounters do :ok end + @spec set_task(atom, String.t()) :: true def set_task(counter_type, address_hash) do :ets.insert(@cache_name, {task_cache_key(address_hash, counter_type), true}) end + @spec drop_task(atom, String.t()) :: true def drop_task(counter_type, address_hash) do :ets.delete(@cache_name, task_cache_key(address_hash, counter_type)) end + @spec get_task(atom, String.t()) :: true | nil def get_task(counter_type, address_hash) do address_hash |> task_cache_key(counter_type) |> fetch_from_cache(@cache_name, nil) end + @spec ignore_txs(atom, String.t()) :: :ignore | :ok def ignore_txs(:txs, address_hash), do: GenServer.cast(__MODULE__, {:ignore_txs, address_hash}) def ignore_txs(_counter_type, _address_hash), do: :ignore @@ -103,10 +107,10 @@ defmodule Explorer.Chain.Cache.AddressesTabsCounters do defp is_ignored?({:updated, datetime}), do: is_up_to_date?(datetime, ttl()) defp is_ignored?(_), do: false - def check_staleness(nil), do: nil - def check_staleness({datetime, counter}) when counter > 50, do: {datetime, counter, :limit_value} + defp check_staleness(nil), do: nil + defp check_staleness({datetime, counter}) when counter > 50, do: {datetime, counter, :limit_value} - def check_staleness({datetime, counter}) do + defp check_staleness({datetime, counter}) do status = if is_up_to_date?(datetime, ttl()) do :up_to_date