Add cache size limit to tx actions transformer

pull/6582/head
POA 2 years ago
parent b18295241d
commit dfb8c859a8
  1. 78
      apps/indexer/lib/indexer/transform/transaction_actions.ex
  2. 3
      config/runtime.exs

@ -18,6 +18,7 @@ defmodule Indexer.Transform.TransactionActions do
@polygon 137 @polygon 137
# @gnosis 100 # @gnosis 100
@default_max_token_cache_size 10000
@null_address "0x0000000000000000000000000000000000000000" @null_address "0x0000000000000000000000000000000000000000"
@uniswap_v3_positions_nft "0xC36442b4a4522E871399CD717aBDD847Ab11FE88" @uniswap_v3_positions_nft "0xC36442b4a4522E871399CD717aBDD847Ab11FE88"
@uniswap_v3_factory "0x1F98431c8aD98523631AE4a59f267346ea31F984" @uniswap_v3_factory "0x1F98431c8aD98523631AE4a59f267346ea31F984"
@ -91,8 +92,14 @@ defmodule Indexer.Transform.TransactionActions do
|> clear_actions(protocols_to_rewrite) |> clear_actions(protocols_to_rewrite)
# create tokens cache if not exists # create tokens cache if not exists
if :ets.whereis(:tokens_data_cache) == :undefined do if :ets.whereis(:tx_actions_tokens_data_cache) == :undefined do
:ets.new(:tokens_data_cache, [:set, :named_table, :public, read_concurrency: true, write_concurrency: true]) :ets.new(:tx_actions_tokens_data_cache, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end end
# handle uniswap v3 # handle uniswap v3
@ -269,6 +276,26 @@ defmodule Indexer.Transform.TransactionActions do
uniswap_handle_event("swap", amount0, amount1, log, token_address, token_data, chain_id) uniswap_handle_event("swap", amount0, amount1, log, token_address, token_data, chain_id)
end end
defp uniswap_handle_swap_amounts(log, amount0, amount1, symbol0, symbol1, address0, address1) do
cond do
String.first(amount0) === "-" and String.first(amount1) !== "-" ->
{amount1, symbol1, address1, String.slice(amount0, 1, String.length(amount0) - 1), symbol0, address0, false}
String.first(amount1) === "-" and String.first(amount0) !== "-" ->
{amount0, symbol0, address0, String.slice(amount1, 1, String.length(amount1) - 1), symbol1, address1, false}
amount1 === "0" and String.first(amount0) !== "-" ->
{amount0, symbol0, address0, amount1, symbol1, address1, false}
true ->
Logger.error(
"Invalid Swap event in tx #{log.transaction_hash}. Log index: #{log.index}. amount0 = #{amount0}, amount1 = #{amount1}"
)
{amount0, symbol0, address0, amount1, symbol1, address1, true}
end
end
defp uniswap_handle_event(type, amount0, amount1, log, token_address, token_data, chain_id) do defp uniswap_handle_event(type, amount0, amount1, log, token_address, token_data, chain_id) do
address0 = Enum.at(token_address, 0) address0 = Enum.at(token_address, 0)
decimals0 = token_data[address0].decimals decimals0 = token_data[address0].decimals
@ -282,17 +309,7 @@ defmodule Indexer.Transform.TransactionActions do
{new_amount0, new_symbol0, new_address0, new_amount1, new_symbol1, new_address1, is_error} = {new_amount0, new_symbol0, new_address0, new_amount1, new_symbol1, new_address1, is_error} =
if type == "swap" do if type == "swap" do
cond do uniswap_handle_swap_amounts(log, amount0, amount1, symbol0, symbol1, address0, address1)
String.first(amount0) === "-" and String.first(amount1) !== "-" ->
{amount1, symbol1, address1, String.slice(amount0, 1, String.length(amount0) - 1), symbol0, address0, false}
String.first(amount1) === "-" and String.first(amount0) !== "-" ->
{amount0, symbol0, address0, String.slice(amount1, 1, String.length(amount1) - 1), symbol1, address1, false}
true ->
Logger.error("Invalid Swap event in tx #{log.transaction_hash}. Log index: #{log.index}")
{amount0, symbol0, address0, amount1, symbol1, address1, true}
end
else else
{amount0, symbol0, address0, amount1, symbol1, address1, false} {amount0, symbol0, address0, amount1, symbol1, address1, false}
end end
@ -494,6 +511,16 @@ defmodule Indexer.Transform.TransactionActions do
|> Decimal.to_string(:normal) |> Decimal.to_string(:normal)
end end
defp get_max_token_cache_size do
case Application.get_env(:indexer, :tx_actions_max_token_cache_size, @default_max_token_cache_size) do
nil ->
@default_max_token_cache_size
max_cache_size ->
if is_binary(max_cache_size), do: String.to_integer(max_cache_size), else: max_cache_size
end
end
defp get_token_data(token_addresses) do defp get_token_data(token_addresses) do
# first, we're trying to read token data from the cache. # first, we're trying to read token data from the cache.
# if the cache is empty, we read that from DB. # if the cache is empty, we read that from DB.
@ -519,8 +546,8 @@ defmodule Indexer.Transform.TransactionActions do
Map.put( Map.put(
acc, acc,
address, address,
with info when info != :undefined <- :ets.info(:tokens_data_cache), with info when info != :undefined <- :ets.info(:tx_actions_tokens_data_cache),
[{_, value}] <- :ets.lookup(:tokens_data_cache, address) do [{_, value}] <- :ets.lookup(:tx_actions_tokens_data_cache, address) do
value value
else else
_ -> %{symbol: nil, decimals: nil} _ -> %{symbol: nil, decimals: nil}
@ -576,7 +603,7 @@ defmodule Indexer.Transform.TransactionActions do
new_data = %{symbol: symbol, decimals: decimals} new_data = %{symbol: symbol, decimals: decimals}
:ets.insert(:tokens_data_cache, {contract_address_hash, new_data}) put_token_data_to_cache(contract_address_hash, new_data)
Map.put(token_data_acc, contract_address_hash, new_data) Map.put(token_data_acc, contract_address_hash, new_data)
end) end)
@ -622,7 +649,7 @@ defmodule Indexer.Transform.TransactionActions do
%{data | decimals: response} %{data | decimals: response}
end end
:ets.insert(:tokens_data_cache, {request.contract_address, new_data}) put_token_data_to_cache(request.contract_address, new_data)
Map.put(token_data_acc, request.contract_address, new_data) Map.put(token_data_acc, request.contract_address, new_data)
end) end)
@ -682,6 +709,23 @@ defmodule Indexer.Transform.TransactionActions do
# end) # end)
end end
defp put_token_data_to_cache(address, data) do
if not :ets.member(:tx_actions_tokens_data_cache, address) do
# we need to add a new item to the cache, but don't exceed the limit
cache_size = :ets.info(:tx_actions_tokens_data_cache, :size)
how_many_to_remove = cache_size - get_max_token_cache_size() + 1
range = Range.new(1, how_many_to_remove, 1)
for _step <- range do
:ets.delete(:tx_actions_tokens_data_cache, :ets.first(:tx_actions_tokens_data_cache))
end
end
:ets.insert(:tx_actions_tokens_data_cache, {address, data})
end
defp read_contracts_with_retries(requests, abi, retries_left) when retries_left > 0 do defp read_contracts_with_retries(requests, abi, retries_left) when retries_left > 0 do
responses = Reader.query_contracts(requests, abi) responses = Reader.query_contracts(requests, abi)

@ -418,7 +418,8 @@ config :indexer,
fetch_rewards_way: System.get_env("FETCH_REWARDS_WAY", "trace_block"), fetch_rewards_way: System.get_env("FETCH_REWARDS_WAY", "trace_block"),
tx_actions_reindex_first_block: System.get_env("INDEXER_TX_ACTIONS_REINDEX_FIRST_BLOCK"), tx_actions_reindex_first_block: System.get_env("INDEXER_TX_ACTIONS_REINDEX_FIRST_BLOCK"),
tx_actions_reindex_last_block: System.get_env("INDEXER_TX_ACTIONS_REINDEX_LAST_BLOCK"), tx_actions_reindex_last_block: System.get_env("INDEXER_TX_ACTIONS_REINDEX_LAST_BLOCK"),
tx_actions_reindex_protocols: System.get_env("INDEXER_TX_ACTIONS_REINDEX_PROTOCOLS", "") tx_actions_reindex_protocols: System.get_env("INDEXER_TX_ACTIONS_REINDEX_PROTOCOLS", ""),
tx_actions_max_token_cache_size: System.get_env("INDEXER_TX_ACTIONS_MAX_TOKEN_CACHE_SIZE")
{receipts_batch_size, _} = Integer.parse(System.get_env("INDEXER_RECEIPTS_BATCH_SIZE", "250")) {receipts_batch_size, _} = Integer.parse(System.get_env("INDEXER_RECEIPTS_BATCH_SIZE", "250"))
{receipts_concurrency, _} = Integer.parse(System.get_env("INDEXER_RECEIPTS_CONCURRENCY", "10")) {receipts_concurrency, _} = Integer.parse(System.get_env("INDEXER_RECEIPTS_CONCURRENCY", "10"))

Loading…
Cancel
Save