diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dddd72858..403909a182 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ - [#6853](https://github.com/blockscout/blockscout/pull/6853) - Fix 503 page - [#6845](https://github.com/blockscout/blockscout/pull/6845) - Extract Docker-compose services into separate files +- [#6839](https://github.com/blockscout/blockscout/pull/6839) - Add cache to transaction actions parser - [#6834](https://github.com/blockscout/blockscout/pull/6834) - Take into account FIRST_BLOCK in "Total blocks" counter on the main page - [#6340](https://github.com/blockscout/blockscout/pull/6340) - Rollback to websocket_client 1.3.0 - [#6786](https://github.com/blockscout/blockscout/pull/6786) - Refactor `try rescue` statements to keep stacktrace diff --git a/apps/indexer/lib/indexer/fetcher/transaction_action.ex b/apps/indexer/lib/indexer/fetcher/transaction_action.ex index 9603e715bc..cd46e04eca 100644 --- a/apps/indexer/lib/indexer/fetcher/transaction_action.ex +++ b/apps/indexer/lib/indexer/fetcher/transaction_action.ex @@ -155,6 +155,7 @@ defmodule Indexer.Fetcher.TransactionAction do Logger.info( "Block #{block_number} handled successfully. Progress: #{progress_percentage}%. Initial block range: #{first_block}..#{last_block}." <> + " Actions found: #{Enum.count(tx_actions)}." <> if(next_block_new >= first_block, do: " Remaining block range: #{first_block}..#{next_block_new}", else: "") ) diff --git a/apps/indexer/lib/indexer/transform/transaction_actions.ex b/apps/indexer/lib/indexer/transform/transaction_actions.ex index ec426f497c..9a0b2340e8 100644 --- a/apps/indexer/lib/indexer/transform/transaction_actions.ex +++ b/apps/indexer/lib/indexer/transform/transaction_actions.ex @@ -109,15 +109,7 @@ defmodule Indexer.Transform.TransactionActions do |> clear_actions(protocols_to_rewrite) # create tokens cache if not exists - if :ets.whereis(:tx_actions_tokens_data_cache) == :undefined do - :ets.new(:tx_actions_tokens_data_cache, [ - :set, - :named_table, - :public, - read_concurrency: true, - write_concurrency: true - ]) - end + init_token_data_cache() # handle uniswap v3 tx_actions = @@ -359,51 +351,58 @@ defmodule Indexer.Transform.TransactionActions do end defp uniswap_legitimate_pools(logs_grouped) do - pools = + init_uniswap_pools_cache() + + {pools_to_request, pools_cached} = logs_grouped |> Enum.reduce(%{}, fn {_tx_hash, tx_logs}, addresses_acc -> tx_logs |> Enum.filter(fn log -> - first_topic = String.downcase(log.first_topic) - first_topic != @uniswap_v3_transfer_nft_event + String.downcase(log.first_topic) != @uniswap_v3_transfer_nft_event end) |> Enum.reduce(addresses_acc, fn log, acc -> pool_address = String.downcase(address_hash_to_string(log.address_hash)) - Map.put_new(acc, pool_address, true) + Map.put(acc, pool_address, true) end) end) + |> Enum.reduce({[], %{}}, fn {pool_address, _}, {to_request, cached} -> + value_from_cache = get_uniswap_pool_from_cache(pool_address) - req_resp = uniswap_request_tokens_and_fees(pools) + if is_nil(value_from_cache) do + {[pool_address | to_request], cached} + else + {to_request, Map.put(cached, pool_address, value_from_cache)} + end + end) - if req_resp === false do - %{} - else - case uniswap_request_get_pools(req_resp) do - {requests_get_pool, responses_get_pool} -> - requests_get_pool - |> Enum.zip(responses_get_pool) - |> Enum.reduce(%{}, fn {request, {_status, response} = _resp}, acc -> - response = - case response do - [item] -> item - items -> items - end - - Map.put( - acc, - request.pool_address, - if request.pool_address == String.downcase(response) do - [token0, token1, _] = request.args - [token0, token1] - else - [] - end - ) - end) + req_resp = uniswap_request_tokens_and_fees(pools_to_request) - _ -> - %{} - end + with true <- req_resp !== false, + {requests_get_pool, responses_get_pool} <- uniswap_request_get_pools(req_resp) do + requests_get_pool + |> Enum.zip(responses_get_pool) + |> Enum.reduce(%{}, fn {request, {_status, response} = _resp}, acc -> + response = + case response do + [item] -> item + items -> items + end + + value = + if request.pool_address == String.downcase(response) do + [token0, token1, _] = request.args + [token0, token1] + else + [] + end + + put_uniswap_pool_to_cache(request.pool_address, value) + + Map.put(acc, request.pool_address, value) + end) + |> Map.merge(pools_cached) + else + _ -> %{} end end @@ -455,7 +454,7 @@ defmodule Indexer.Transform.TransactionActions do defp uniswap_request_tokens_and_fees(pools) do requests = pools - |> Enum.map(fn {pool_address, _} -> + |> Enum.map(fn pool_address -> # we will call token0(), token1(), fee() public getters Enum.map(["0dfe1681", "d21220a7", "ddca3f43"], fn method_id -> %{ @@ -473,7 +472,7 @@ defmodule Indexer.Transform.TransactionActions do if !Enum.empty?(error_messages) or Enum.count(requests) != Enum.count(responses) do Logger.error( - "Cannot read Uniswap V3 Pool contract public getters: token0(), token1(), fee(). Error messages: #{Enum.join(error_messages, ", ")}. Pools: #{Enum.join(Map.keys(pools), ", ")}" + "Cannot read Uniswap V3 Pool contract public getters: token0(), token1(), fee(). Error messages: #{Enum.join(error_messages, ", ")}. Pools: #{Enum.join(pools, ", ")}" ) false @@ -712,6 +711,35 @@ defmodule Indexer.Transform.TransactionActions do end end + defp get_uniswap_pool_from_cache(pool_address) do + with info when info != :undefined <- :ets.info(:tx_actions_uniswap_pools_cache), + [{_, value}] <- :ets.lookup(:tx_actions_uniswap_pools_cache, pool_address) do + value + else + _ -> nil + end + end + + defp init_cache(table) do + if :ets.whereis(table) == :undefined do + :ets.new(table, [ + :set, + :named_table, + :public, + read_concurrency: true, + write_concurrency: true + ]) + end + end + + defp init_token_data_cache do + init_cache(:tx_actions_tokens_data_cache) + end + + defp init_uniswap_pools_cache do + init_cache(:tx_actions_uniswap_pools_cache) + end + defp is_address_correct?(address) do String.match?(address, ~r/^0x[[:xdigit:]]{40}$/i) end @@ -746,6 +774,10 @@ defmodule Indexer.Transform.TransactionActions do :ets.insert(:tx_actions_tokens_data_cache, {address, data}) end + defp put_uniswap_pool_to_cache(address, value) do + :ets.insert(:tx_actions_uniswap_pools_cache, {address, value}) + end + defp read_contracts_with_retries(requests, abi, retries_left) when retries_left > 0 do responses = Reader.query_contracts(requests, abi)