From f88aa78c5ad1e3ff37e92a2ef46dc0c80ecf616a Mon Sep 17 00:00:00 2001 From: varasev <33550681+varasev@users.noreply.github.com> Date: Wed, 5 Apr 2023 18:06:58 +0300 Subject: [PATCH] Tx actions: remove excess `delete_all` calls and remake a cache (#7107) * Remove excess delete_all calls and remake tx actions cache * Update changelog * Simplify get_max_token_cache_size function --------- Co-authored-by: POA <33550681+poa@users.noreply.github.com> --- CHANGELOG.md | 1 + apps/explorer/config/config.exs | 4 + apps/explorer/lib/explorer/application.ex | 2 + .../cache/transaction_action_tokens_data.ex | 61 ++++++++++++ .../cache/transaction_action_uniswap_pools.ex | 44 +++++++++ .../import/runner/transaction_actions.ex | 35 ++++++- .../indexer/transform/transaction_actions.ex | 99 ++++--------------- config/runtime.exs | 3 + 8 files changed, 164 insertions(+), 85 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/cache/transaction_action_tokens_data.ex create mode 100644 apps/explorer/lib/explorer/chain/cache/transaction_action_uniswap_pools.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index b48b19b7dc..ebb46559b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ ### Chore +- [#7107](https://github.com/blockscout/blockscout/pull/7107) - Tx actions: remove excess delete_all calls and remake a cache - [#7201](https://github.com/blockscout/blockscout/pull/7201) - Remove rust, cargo from dependencies since the latest version of ex_keccak is using precompiled rust ## 5.1.2-beta diff --git a/apps/explorer/config/config.exs b/apps/explorer/config/config.exs index f2b6c54fab..7d6ee8d67f 100644 --- a/apps/explorer/config/config.exs +++ b/apps/explorer/config/config.exs @@ -70,6 +70,10 @@ config :explorer, Explorer.Chain.Cache.NewVerifiedContractsCounter, enable_consolidation: true, update_interval_in_milliseconds: update_interval_in_milliseconds_default +config :explorer, Explorer.Chain.Cache.TransactionActionTokensData, enabled: true + +config :explorer, Explorer.Chain.Cache.TransactionActionUniswapPools, enabled: true + config :explorer, Explorer.ExchangeRates, cache_period: ConfigHelper.parse_time_env_var("CACHE_EXCHANGE_RATES_PERIOD", "10m") diff --git a/apps/explorer/lib/explorer/application.ex b/apps/explorer/lib/explorer/application.ex index 4d72e88e04..04cd9939f7 100644 --- a/apps/explorer/lib/explorer/application.ex +++ b/apps/explorer/lib/explorer/application.ex @@ -90,6 +90,8 @@ defmodule Explorer.Application do configure(Explorer.Chain.Cache.NewContractsCounter), configure(Explorer.Chain.Cache.VerifiedContractsCounter), configure(Explorer.Chain.Cache.NewVerifiedContractsCounter), + configure(Explorer.Chain.Cache.TransactionActionTokensData), + configure(Explorer.Chain.Cache.TransactionActionUniswapPools), configure(Explorer.Chain.Transaction.History.Historian), configure(Explorer.Chain.Events.Listener), configure(Explorer.Counters.AddressesWithBalanceCounter), diff --git a/apps/explorer/lib/explorer/chain/cache/transaction_action_tokens_data.ex b/apps/explorer/lib/explorer/chain/cache/transaction_action_tokens_data.ex new file mode 100644 index 0000000000..27fc043c33 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/cache/transaction_action_tokens_data.ex @@ -0,0 +1,61 @@ +defmodule Explorer.Chain.Cache.TransactionActionTokensData do + @moduledoc """ + Caches tokens data for Indexer.Transform.TransactionActions. + """ + use GenServer + + @cache_name :tx_actions_tokens_data_cache + + @spec start_link(term()) :: GenServer.on_start() + def start_link(_) do + GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + end + + @impl true + def init(_args) do + create_cache_table() + {:ok, %{}} + end + + def create_cache_table do + if :ets.whereis(@cache_name) == :undefined do + :ets.new(@cache_name, [ + :set, + :named_table, + :public, + read_concurrency: true, + write_concurrency: true + ]) + end + end + + def fetch_from_cache(address) do + with info when info != :undefined <- :ets.info(@cache_name), + [{_, value}] <- :ets.lookup(@cache_name, address) do + value + else + _ -> %{symbol: nil, decimals: nil} + end + end + + def put_to_cache(address, data) do + if not :ets.member(@cache_name, address) do + # we need to add a new item to the cache, but don't exceed the limit + cache_size = :ets.info(@cache_name, :size) + + how_many_to_remove = cache_size - get_max_token_cache_size() + 1 + + range = Range.new(1, how_many_to_remove, 1) + + for _step <- range do + :ets.delete(@cache_name, :ets.first(@cache_name)) + end + end + + :ets.insert(@cache_name, {address, data}) + end + + defp get_max_token_cache_size do + Application.get_env(:explorer, __MODULE__)[:max_cache_size] + end +end diff --git a/apps/explorer/lib/explorer/chain/cache/transaction_action_uniswap_pools.ex b/apps/explorer/lib/explorer/chain/cache/transaction_action_uniswap_pools.ex new file mode 100644 index 0000000000..f254e95f38 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/cache/transaction_action_uniswap_pools.ex @@ -0,0 +1,44 @@ +defmodule Explorer.Chain.Cache.TransactionActionUniswapPools do + @moduledoc """ + Caches Uniswap pools for Indexer.Transform.TransactionActions. + """ + use GenServer + + @cache_name :tx_actions_uniswap_pools_cache + + @spec start_link(term()) :: GenServer.on_start() + def start_link(_) do + GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + end + + @impl true + def init(_args) do + create_cache_table() + {:ok, %{}} + end + + def create_cache_table do + if :ets.whereis(@cache_name) == :undefined do + :ets.new(@cache_name, [ + :set, + :named_table, + :public, + read_concurrency: true, + write_concurrency: true + ]) + end + end + + def fetch_from_cache(pool_address) do + with info when info != :undefined <- :ets.info(@cache_name), + [{_, value}] <- :ets.lookup(@cache_name, pool_address) do + value + else + _ -> nil + end + end + + def put_to_cache(address, value) do + :ets.insert(@cache_name, {address, value}) + end +end diff --git a/apps/explorer/lib/explorer/chain/import/runner/transaction_actions.ex b/apps/explorer/lib/explorer/chain/import/runner/transaction_actions.ex index 6c0d499732..a5e54c7763 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/transaction_actions.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/transaction_actions.ex @@ -5,6 +5,8 @@ defmodule Explorer.Chain.Import.Runner.TransactionActions do require Ecto.Query + import Ecto.Query, only: [from: 2] + alias Ecto.{Changeset, Multi, Repo} alias Explorer.Chain.{Import, TransactionAction} alias Explorer.Prometheus.Instrumenter @@ -55,7 +57,9 @@ defmodule Explorer.Chain.Import.Runner.TransactionActions do @spec insert(Repo.t(), [map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) :: {:ok, [TransactionAction.t()]} | {:error, [Changeset.t()]} - def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = _options) when is_list(changes_list) do + def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + # Enforce TransactionAction ShareLocks order (see docs: sharelock.md) ordered_changes_list = Enum.sort_by(changes_list, &{&1.hash, &1.log_index}) @@ -63,13 +67,38 @@ defmodule Explorer.Chain.Import.Runner.TransactionActions do Import.insert_changes_list( repo, ordered_changes_list, + conflict_target: [:hash, :log_index], + on_conflict: on_conflict, for: TransactionAction, returning: true, timeout: timeout, - timestamps: timestamps, - on_conflict: :nothing + timestamps: timestamps ) {:ok, inserted} end + + defp default_on_conflict do + from( + action in TransactionAction, + update: [ + set: [ + # Don't update `hash` as it is part of the composite primary key and used for the conflict target + # Don't update `log_index` as it is part of the composite primary key and used for the conflict target + protocol: fragment("EXCLUDED.protocol"), + data: fragment("EXCLUDED.data"), + type: fragment("EXCLUDED.type"), + inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", action.inserted_at), + updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", action.updated_at) + ] + ], + where: + fragment( + "(EXCLUDED.protocol, EXCLUDED.data, EXCLUDED.type) IS DISTINCT FROM (?, ? ,?)", + action.protocol, + action.data, + action.type + ) + ) + end end diff --git a/apps/indexer/lib/indexer/transform/transaction_actions.ex b/apps/indexer/lib/indexer/transform/transaction_actions.ex index 5b06bd813a..c0a266bb8b 100644 --- a/apps/indexer/lib/indexer/transform/transaction_actions.ex +++ b/apps/indexer/lib/indexer/transform/transaction_actions.ex @@ -9,6 +9,7 @@ defmodule Indexer.Transform.TransactionActions do alias ABI.TypeDecoder alias Explorer.Chain.Cache.NetVersion + alias Explorer.Chain.Cache.{TransactionActionTokensData, TransactionActionUniswapPools} alias Explorer.Chain.{Address, Data, Hash, Token, TransactionAction} alias Explorer.Repo alias Explorer.SmartContract.Reader @@ -19,7 +20,6 @@ defmodule Indexer.Transform.TransactionActions do @polygon 137 # @gnosis 100 - @default_max_token_cache_size 100_000 @burn_address "0x0000000000000000000000000000000000000000" @uniswap_v3_positions_nft "0xC36442b4a4522E871399CD717aBDD847Ab11FE88" @uniswap_v3_factory "0x1F98431c8aD98523631AE4a59f267346ea31F984" @@ -98,23 +98,26 @@ defmodule Indexer.Transform.TransactionActions do @doc """ Returns a list of transaction actions given a list of logs. """ - def parse(logs, protocols_to_rewrite \\ []) do + def parse(logs, protocols_to_rewrite \\ nil) do if Application.get_env(:indexer, Indexer.Fetcher.TransactionAction.Supervisor)[:enabled] do actions = [] chain_id = NetVersion.get_version() - logs - |> logs_group_by_txs() - |> clear_actions(protocols_to_rewrite) + if not is_nil(protocols_to_rewrite) do + logs + |> logs_group_by_txs() + |> clear_actions(protocols_to_rewrite) + end # create tokens cache if not exists - init_token_data_cache() + TransactionActionTokensData.create_cache_table() # handle uniswap v3 tx_actions = if Enum.member?([@mainnet, @goerli, @optimism, @polygon], chain_id) and - (Enum.empty?(protocols_to_rewrite) or Enum.member?(protocols_to_rewrite, "uniswap_v3")) do + (is_nil(protocols_to_rewrite) or Enum.empty?(protocols_to_rewrite) or + Enum.member?(protocols_to_rewrite, "uniswap_v3")) do logs |> uniswap_filter_logs() |> logs_group_by_txs() @@ -353,7 +356,7 @@ defmodule Indexer.Transform.TransactionActions do end defp uniswap_legitimate_pools(logs_grouped) do - init_uniswap_pools_cache() + TransactionActionUniswapPools.create_cache_table() {pools_to_request, pools_cached} = logs_grouped @@ -368,7 +371,7 @@ defmodule Indexer.Transform.TransactionActions do end) end) |> Enum.reduce({[], %{}}, fn {pool_address, _}, {to_request, cached} -> - value_from_cache = get_uniswap_pool_from_cache(pool_address) + value_from_cache = TransactionActionUniswapPools.fetch_from_cache(pool_address) if is_nil(value_from_cache) do {[pool_address | to_request], cached} @@ -385,7 +388,7 @@ defmodule Indexer.Transform.TransactionActions do |> Enum.zip(responses_get_pool) |> Enum.reduce(%{}, fn {request, {_status, response} = _resp}, acc -> value = uniswap_pool_is_legitimate(request, response) - put_uniswap_pool_to_cache(request.pool_address, value) + TransactionActionUniswapPools.put_to_cache(request.pool_address, value) Map.put(acc, request.pool_address, value) end) |> Map.merge(pools_cached) @@ -491,7 +494,7 @@ defmodule Indexer.Transform.TransactionActions do |> Enum.reduce([], fn {{status, _}, i}, acc -> if status == :error do pool_address = Enum.at(requests, i)[:contract_address] - put_uniswap_pool_to_cache(pool_address, []) + TransactionActionUniswapPools.put_to_cache(pool_address, []) [pool_address | acc] else acc @@ -550,19 +553,6 @@ defmodule Indexer.Transform.TransactionActions do |> Decimal.to_string(:normal) end - defp get_max_token_cache_size do - case Application.get_env(:indexer, __MODULE__)[:max_token_cache_size] do - nil -> - @default_max_token_cache_size - - "" -> - @default_max_token_cache_size - - max_cache_size -> - if is_binary(max_cache_size), do: String.to_integer(max_cache_size), else: max_cache_size - end - end - defp get_token_data(token_addresses) do # first, we're trying to read token data from the cache. # if the cache is empty, we read that from DB. @@ -588,12 +578,7 @@ defmodule Indexer.Transform.TransactionActions do Map.put( acc, address, - with info when info != :undefined <- :ets.info(:tx_actions_tokens_data_cache), - [{_, value}] <- :ets.lookup(:tx_actions_tokens_data_cache, address) do - value - else - _ -> %{symbol: nil, decimals: nil} - end + TransactionActionTokensData.fetch_from_cache(address) ) end) end @@ -634,7 +619,7 @@ defmodule Indexer.Transform.TransactionActions do new_data = %{symbol: symbol, decimals: decimals} - put_token_data_to_cache(contract_address_hash, new_data) + TransactionActionTokensData.put_to_cache(contract_address_hash, new_data) Map.put(token_data_acc, contract_address_hash, new_data) end) @@ -683,7 +668,7 @@ defmodule Indexer.Transform.TransactionActions do new_data = get_new_data(data, request, response) - put_token_data_to_cache(request.contract_address, new_data) + TransactionActionTokensData.put_to_cache(request.contract_address, new_data) Map.put(token_data_acc, request.contract_address, new_data) else @@ -734,35 +719,6 @@ defmodule Indexer.Transform.TransactionActions do {requests, responses} end - defp get_uniswap_pool_from_cache(pool_address) do - with info when info != :undefined <- :ets.info(:tx_actions_uniswap_pools_cache), - [{_, value}] <- :ets.lookup(:tx_actions_uniswap_pools_cache, pool_address) do - value - else - _ -> nil - end - end - - defp init_cache(table) do - if :ets.whereis(table) == :undefined do - :ets.new(table, [ - :set, - :named_table, - :public, - read_concurrency: true, - write_concurrency: true - ]) - end - end - - defp init_token_data_cache do - init_cache(:tx_actions_tokens_data_cache) - end - - defp init_uniswap_pools_cache do - init_cache(:tx_actions_uniswap_pools_cache) - end - defp is_address_correct?(address) do String.match?(address, ~r/^0x[[:xdigit:]]{40}$/i) end @@ -780,27 +736,6 @@ defmodule Indexer.Transform.TransactionActions do |> Enum.group_by(& &1.transaction_hash) end - defp put_token_data_to_cache(address, data) do - if not :ets.member(:tx_actions_tokens_data_cache, address) do - # we need to add a new item to the cache, but don't exceed the limit - cache_size = :ets.info(:tx_actions_tokens_data_cache, :size) - - how_many_to_remove = cache_size - get_max_token_cache_size() + 1 - - range = Range.new(1, how_many_to_remove, 1) - - for _step <- range do - :ets.delete(:tx_actions_tokens_data_cache, :ets.first(:tx_actions_tokens_data_cache)) - end - end - - :ets.insert(:tx_actions_tokens_data_cache, {address, data}) - end - - defp put_uniswap_pool_to_cache(address, value) do - :ets.insert(:tx_actions_uniswap_pools_cache, {address, value}) - end - defp read_contracts_with_retries(requests, abi, retries_left) when retries_left > 0 do responses = Reader.query_contracts(requests, abi) diff --git a/config/runtime.exs b/config/runtime.exs index 6517e06f45..788ca77922 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -362,6 +362,9 @@ config :explorer, :spandex, config :explorer, :datadog, port: ConfigHelper.parse_integer_env_var("DATADOG_PORT", 8126) +config :explorer, Explorer.Chain.Cache.TransactionActionTokensData, + max_cache_size: ConfigHelper.parse_integer_env_var("INDEXER_TX_ACTIONS_MAX_TOKEN_CACHE_SIZE", 100_000) + ############### ### Indexer ### ###############