Tx actions: remove excess `delete_all` calls and remake a cache (#7107)

* Remove excess delete_all calls and remake tx actions cache

* Update changelog

* Simplify get_max_token_cache_size function

---------

Co-authored-by: POA <33550681+poa@users.noreply.github.com>
pull/7235/head
varasev 2 years ago committed by GitHub
parent 94cf98f8b5
commit f88aa78c5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      apps/explorer/config/config.exs
  3. 2
      apps/explorer/lib/explorer/application.ex
  4. 61
      apps/explorer/lib/explorer/chain/cache/transaction_action_tokens_data.ex
  5. 44
      apps/explorer/lib/explorer/chain/cache/transaction_action_uniswap_pools.ex
  6. 35
      apps/explorer/lib/explorer/chain/import/runner/transaction_actions.ex
  7. 93
      apps/indexer/lib/indexer/transform/transaction_actions.ex
  8. 3
      config/runtime.exs

@ -14,6 +14,7 @@
### Chore
- [#7107](https://github.com/blockscout/blockscout/pull/7107) - Tx actions: remove excess delete_all calls and remake a cache
- [#7201](https://github.com/blockscout/blockscout/pull/7201) - Remove rust, cargo from dependencies since the latest version of ex_keccak is using precompiled rust
## 5.1.2-beta

@ -70,6 +70,10 @@ config :explorer, Explorer.Chain.Cache.NewVerifiedContractsCounter,
enable_consolidation: true,
update_interval_in_milliseconds: update_interval_in_milliseconds_default
config :explorer, Explorer.Chain.Cache.TransactionActionTokensData, enabled: true
config :explorer, Explorer.Chain.Cache.TransactionActionUniswapPools, enabled: true
config :explorer, Explorer.ExchangeRates,
cache_period: ConfigHelper.parse_time_env_var("CACHE_EXCHANGE_RATES_PERIOD", "10m")

@ -90,6 +90,8 @@ defmodule Explorer.Application do
configure(Explorer.Chain.Cache.NewContractsCounter),
configure(Explorer.Chain.Cache.VerifiedContractsCounter),
configure(Explorer.Chain.Cache.NewVerifiedContractsCounter),
configure(Explorer.Chain.Cache.TransactionActionTokensData),
configure(Explorer.Chain.Cache.TransactionActionUniswapPools),
configure(Explorer.Chain.Transaction.History.Historian),
configure(Explorer.Chain.Events.Listener),
configure(Explorer.Counters.AddressesWithBalanceCounter),

@ -0,0 +1,61 @@
defmodule Explorer.Chain.Cache.TransactionActionTokensData do
@moduledoc """
Caches tokens data for Indexer.Transform.TransactionActions.
"""
use GenServer
@cache_name :tx_actions_tokens_data_cache
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(_args) do
create_cache_table()
{:ok, %{}}
end
def create_cache_table do
if :ets.whereis(@cache_name) == :undefined do
:ets.new(@cache_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
end
def fetch_from_cache(address) do
with info when info != :undefined <- :ets.info(@cache_name),
[{_, value}] <- :ets.lookup(@cache_name, address) do
value
else
_ -> %{symbol: nil, decimals: nil}
end
end
def put_to_cache(address, data) do
if not :ets.member(@cache_name, address) do
# we need to add a new item to the cache, but don't exceed the limit
cache_size = :ets.info(@cache_name, :size)
how_many_to_remove = cache_size - get_max_token_cache_size() + 1
range = Range.new(1, how_many_to_remove, 1)
for _step <- range do
:ets.delete(@cache_name, :ets.first(@cache_name))
end
end
:ets.insert(@cache_name, {address, data})
end
defp get_max_token_cache_size do
Application.get_env(:explorer, __MODULE__)[:max_cache_size]
end
end

@ -0,0 +1,44 @@
defmodule Explorer.Chain.Cache.TransactionActionUniswapPools do
@moduledoc """
Caches Uniswap pools for Indexer.Transform.TransactionActions.
"""
use GenServer
@cache_name :tx_actions_uniswap_pools_cache
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(_args) do
create_cache_table()
{:ok, %{}}
end
def create_cache_table do
if :ets.whereis(@cache_name) == :undefined do
:ets.new(@cache_name, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
end
def fetch_from_cache(pool_address) do
with info when info != :undefined <- :ets.info(@cache_name),
[{_, value}] <- :ets.lookup(@cache_name, pool_address) do
value
else
_ -> nil
end
end
def put_to_cache(address, value) do
:ets.insert(@cache_name, {address, value})
end
end

@ -5,6 +5,8 @@ defmodule Explorer.Chain.Import.Runner.TransactionActions do
require Ecto.Query
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Import, TransactionAction}
alias Explorer.Prometheus.Instrumenter
@ -55,7 +57,9 @@ defmodule Explorer.Chain.Import.Runner.TransactionActions do
@spec insert(Repo.t(), [map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) ::
{:ok, [TransactionAction.t()]}
| {:error, [Changeset.t()]}
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = _options) when is_list(changes_list) do
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Enforce TransactionAction ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, &{&1.hash, &1.log_index})
@ -63,13 +67,38 @@ defmodule Explorer.Chain.Import.Runner.TransactionActions do
Import.insert_changes_list(
repo,
ordered_changes_list,
conflict_target: [:hash, :log_index],
on_conflict: on_conflict,
for: TransactionAction,
returning: true,
timeout: timeout,
timestamps: timestamps,
on_conflict: :nothing
timestamps: timestamps
)
{:ok, inserted}
end
defp default_on_conflict do
from(
action in TransactionAction,
update: [
set: [
# Don't update `hash` as it is part of the composite primary key and used for the conflict target
# Don't update `log_index` as it is part of the composite primary key and used for the conflict target
protocol: fragment("EXCLUDED.protocol"),
data: fragment("EXCLUDED.data"),
type: fragment("EXCLUDED.type"),
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", action.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", action.updated_at)
]
],
where:
fragment(
"(EXCLUDED.protocol, EXCLUDED.data, EXCLUDED.type) IS DISTINCT FROM (?, ? ,?)",
action.protocol,
action.data,
action.type
)
)
end
end

@ -9,6 +9,7 @@ defmodule Indexer.Transform.TransactionActions do
alias ABI.TypeDecoder
alias Explorer.Chain.Cache.NetVersion
alias Explorer.Chain.Cache.{TransactionActionTokensData, TransactionActionUniswapPools}
alias Explorer.Chain.{Address, Data, Hash, Token, TransactionAction}
alias Explorer.Repo
alias Explorer.SmartContract.Reader
@ -19,7 +20,6 @@ defmodule Indexer.Transform.TransactionActions do
@polygon 137
# @gnosis 100
@default_max_token_cache_size 100_000
@burn_address "0x0000000000000000000000000000000000000000"
@uniswap_v3_positions_nft "0xC36442b4a4522E871399CD717aBDD847Ab11FE88"
@uniswap_v3_factory "0x1F98431c8aD98523631AE4a59f267346ea31F984"
@ -98,23 +98,26 @@ defmodule Indexer.Transform.TransactionActions do
@doc """
Returns a list of transaction actions given a list of logs.
"""
def parse(logs, protocols_to_rewrite \\ []) do
def parse(logs, protocols_to_rewrite \\ nil) do
if Application.get_env(:indexer, Indexer.Fetcher.TransactionAction.Supervisor)[:enabled] do
actions = []
chain_id = NetVersion.get_version()
if not is_nil(protocols_to_rewrite) do
logs
|> logs_group_by_txs()
|> clear_actions(protocols_to_rewrite)
end
# create tokens cache if not exists
init_token_data_cache()
TransactionActionTokensData.create_cache_table()
# handle uniswap v3
tx_actions =
if Enum.member?([@mainnet, @goerli, @optimism, @polygon], chain_id) and
(Enum.empty?(protocols_to_rewrite) or Enum.member?(protocols_to_rewrite, "uniswap_v3")) do
(is_nil(protocols_to_rewrite) or Enum.empty?(protocols_to_rewrite) or
Enum.member?(protocols_to_rewrite, "uniswap_v3")) do
logs
|> uniswap_filter_logs()
|> logs_group_by_txs()
@ -353,7 +356,7 @@ defmodule Indexer.Transform.TransactionActions do
end
defp uniswap_legitimate_pools(logs_grouped) do
init_uniswap_pools_cache()
TransactionActionUniswapPools.create_cache_table()
{pools_to_request, pools_cached} =
logs_grouped
@ -368,7 +371,7 @@ defmodule Indexer.Transform.TransactionActions do
end)
end)
|> Enum.reduce({[], %{}}, fn {pool_address, _}, {to_request, cached} ->
value_from_cache = get_uniswap_pool_from_cache(pool_address)
value_from_cache = TransactionActionUniswapPools.fetch_from_cache(pool_address)
if is_nil(value_from_cache) do
{[pool_address | to_request], cached}
@ -385,7 +388,7 @@ defmodule Indexer.Transform.TransactionActions do
|> Enum.zip(responses_get_pool)
|> Enum.reduce(%{}, fn {request, {_status, response} = _resp}, acc ->
value = uniswap_pool_is_legitimate(request, response)
put_uniswap_pool_to_cache(request.pool_address, value)
TransactionActionUniswapPools.put_to_cache(request.pool_address, value)
Map.put(acc, request.pool_address, value)
end)
|> Map.merge(pools_cached)
@ -491,7 +494,7 @@ defmodule Indexer.Transform.TransactionActions do
|> Enum.reduce([], fn {{status, _}, i}, acc ->
if status == :error do
pool_address = Enum.at(requests, i)[:contract_address]
put_uniswap_pool_to_cache(pool_address, [])
TransactionActionUniswapPools.put_to_cache(pool_address, [])
[pool_address | acc]
else
acc
@ -550,19 +553,6 @@ defmodule Indexer.Transform.TransactionActions do
|> Decimal.to_string(:normal)
end
defp get_max_token_cache_size do
case Application.get_env(:indexer, __MODULE__)[:max_token_cache_size] do
nil ->
@default_max_token_cache_size
"" ->
@default_max_token_cache_size
max_cache_size ->
if is_binary(max_cache_size), do: String.to_integer(max_cache_size), else: max_cache_size
end
end
defp get_token_data(token_addresses) do
# first, we're trying to read token data from the cache.
# if the cache is empty, we read that from DB.
@ -588,12 +578,7 @@ defmodule Indexer.Transform.TransactionActions do
Map.put(
acc,
address,
with info when info != :undefined <- :ets.info(:tx_actions_tokens_data_cache),
[{_, value}] <- :ets.lookup(:tx_actions_tokens_data_cache, address) do
value
else
_ -> %{symbol: nil, decimals: nil}
end
TransactionActionTokensData.fetch_from_cache(address)
)
end)
end
@ -634,7 +619,7 @@ defmodule Indexer.Transform.TransactionActions do
new_data = %{symbol: symbol, decimals: decimals}
put_token_data_to_cache(contract_address_hash, new_data)
TransactionActionTokensData.put_to_cache(contract_address_hash, new_data)
Map.put(token_data_acc, contract_address_hash, new_data)
end)
@ -683,7 +668,7 @@ defmodule Indexer.Transform.TransactionActions do
new_data = get_new_data(data, request, response)
put_token_data_to_cache(request.contract_address, new_data)
TransactionActionTokensData.put_to_cache(request.contract_address, new_data)
Map.put(token_data_acc, request.contract_address, new_data)
else
@ -734,35 +719,6 @@ defmodule Indexer.Transform.TransactionActions do
{requests, responses}
end
defp get_uniswap_pool_from_cache(pool_address) do
with info when info != :undefined <- :ets.info(:tx_actions_uniswap_pools_cache),
[{_, value}] <- :ets.lookup(:tx_actions_uniswap_pools_cache, pool_address) do
value
else
_ -> nil
end
end
defp init_cache(table) do
if :ets.whereis(table) == :undefined do
:ets.new(table, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
end
defp init_token_data_cache do
init_cache(:tx_actions_tokens_data_cache)
end
defp init_uniswap_pools_cache do
init_cache(:tx_actions_uniswap_pools_cache)
end
defp is_address_correct?(address) do
String.match?(address, ~r/^0x[[:xdigit:]]{40}$/i)
end
@ -780,27 +736,6 @@ defmodule Indexer.Transform.TransactionActions do
|> Enum.group_by(& &1.transaction_hash)
end
defp put_token_data_to_cache(address, data) do
if not :ets.member(:tx_actions_tokens_data_cache, address) do
# we need to add a new item to the cache, but don't exceed the limit
cache_size = :ets.info(:tx_actions_tokens_data_cache, :size)
how_many_to_remove = cache_size - get_max_token_cache_size() + 1
range = Range.new(1, how_many_to_remove, 1)
for _step <- range do
:ets.delete(:tx_actions_tokens_data_cache, :ets.first(:tx_actions_tokens_data_cache))
end
end
:ets.insert(:tx_actions_tokens_data_cache, {address, data})
end
defp put_uniswap_pool_to_cache(address, value) do
:ets.insert(:tx_actions_uniswap_pools_cache, {address, value})
end
defp read_contracts_with_retries(requests, abi, retries_left) when retries_left > 0 do
responses = Reader.query_contracts(requests, abi)

@ -362,6 +362,9 @@ config :explorer, :spandex,
config :explorer, :datadog, port: ConfigHelper.parse_integer_env_var("DATADOG_PORT", 8126)
config :explorer, Explorer.Chain.Cache.TransactionActionTokensData,
max_cache_size: ConfigHelper.parse_integer_env_var("INDEXER_TX_ACTIONS_MAX_TOKEN_CACHE_SIZE", 100_000)
###############
### Indexer ###
###############

Loading…
Cancel
Save