Add cache to transaction actions parser (#6839)

* Fix tx actions handler

* Small change for mix credo

* Update changelog

* Add uniswap pools cache to tx actions parser

* Update changelog

* Refactor init cache functions

---------

Co-authored-by: POA <33550681+poa@users.noreply.github.com>
pull/6859/head
varasev 2 years ago committed by GitHub
parent bba28704da
commit b194ef0e48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 1
      apps/indexer/lib/indexer/fetcher/transaction_action.ex
  3. 120
      apps/indexer/lib/indexer/transform/transaction_actions.ex

@ -31,6 +31,7 @@
- [#6853](https://github.com/blockscout/blockscout/pull/6853) - Fix 503 page
- [#6845](https://github.com/blockscout/blockscout/pull/6845) - Extract Docker-compose services into separate files
- [#6839](https://github.com/blockscout/blockscout/pull/6839) - Add cache to transaction actions parser
- [#6834](https://github.com/blockscout/blockscout/pull/6834) - Take into account FIRST_BLOCK in "Total blocks" counter on the main page
- [#6340](https://github.com/blockscout/blockscout/pull/6340) - Rollback to websocket_client 1.3.0
- [#6786](https://github.com/blockscout/blockscout/pull/6786) - Refactor `try rescue` statements to keep stacktrace

@ -155,6 +155,7 @@ defmodule Indexer.Fetcher.TransactionAction do
Logger.info(
"Block #{block_number} handled successfully. Progress: #{progress_percentage}%. Initial block range: #{first_block}..#{last_block}." <>
" Actions found: #{Enum.count(tx_actions)}." <>
if(next_block_new >= first_block, do: " Remaining block range: #{first_block}..#{next_block_new}", else: "")
)

@ -109,15 +109,7 @@ defmodule Indexer.Transform.TransactionActions do
|> clear_actions(protocols_to_rewrite)
# create tokens cache if not exists
if :ets.whereis(:tx_actions_tokens_data_cache) == :undefined do
:ets.new(:tx_actions_tokens_data_cache, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
init_token_data_cache()
# handle uniswap v3
tx_actions =
@ -359,51 +351,58 @@ defmodule Indexer.Transform.TransactionActions do
end
defp uniswap_legitimate_pools(logs_grouped) do
pools =
init_uniswap_pools_cache()
{pools_to_request, pools_cached} =
logs_grouped
|> Enum.reduce(%{}, fn {_tx_hash, tx_logs}, addresses_acc ->
tx_logs
|> Enum.filter(fn log ->
first_topic = String.downcase(log.first_topic)
first_topic != @uniswap_v3_transfer_nft_event
String.downcase(log.first_topic) != @uniswap_v3_transfer_nft_event
end)
|> Enum.reduce(addresses_acc, fn log, acc ->
pool_address = String.downcase(address_hash_to_string(log.address_hash))
Map.put_new(acc, pool_address, true)
Map.put(acc, pool_address, true)
end)
end)
|> Enum.reduce({[], %{}}, fn {pool_address, _}, {to_request, cached} ->
value_from_cache = get_uniswap_pool_from_cache(pool_address)
req_resp = uniswap_request_tokens_and_fees(pools)
if is_nil(value_from_cache) do
{[pool_address | to_request], cached}
else
{to_request, Map.put(cached, pool_address, value_from_cache)}
end
end)
if req_resp === false do
%{}
else
case uniswap_request_get_pools(req_resp) do
{requests_get_pool, responses_get_pool} ->
requests_get_pool
|> Enum.zip(responses_get_pool)
|> Enum.reduce(%{}, fn {request, {_status, response} = _resp}, acc ->
response =
case response do
[item] -> item
items -> items
end
Map.put(
acc,
request.pool_address,
if request.pool_address == String.downcase(response) do
[token0, token1, _] = request.args
[token0, token1]
else
[]
end
)
end)
req_resp = uniswap_request_tokens_and_fees(pools_to_request)
_ ->
%{}
end
with true <- req_resp !== false,
{requests_get_pool, responses_get_pool} <- uniswap_request_get_pools(req_resp) do
requests_get_pool
|> Enum.zip(responses_get_pool)
|> Enum.reduce(%{}, fn {request, {_status, response} = _resp}, acc ->
response =
case response do
[item] -> item
items -> items
end
value =
if request.pool_address == String.downcase(response) do
[token0, token1, _] = request.args
[token0, token1]
else
[]
end
put_uniswap_pool_to_cache(request.pool_address, value)
Map.put(acc, request.pool_address, value)
end)
|> Map.merge(pools_cached)
else
_ -> %{}
end
end
@ -455,7 +454,7 @@ defmodule Indexer.Transform.TransactionActions do
defp uniswap_request_tokens_and_fees(pools) do
requests =
pools
|> Enum.map(fn {pool_address, _} ->
|> Enum.map(fn pool_address ->
# we will call token0(), token1(), fee() public getters
Enum.map(["0dfe1681", "d21220a7", "ddca3f43"], fn method_id ->
%{
@ -473,7 +472,7 @@ defmodule Indexer.Transform.TransactionActions do
if !Enum.empty?(error_messages) or Enum.count(requests) != Enum.count(responses) do
Logger.error(
"Cannot read Uniswap V3 Pool contract public getters: token0(), token1(), fee(). Error messages: #{Enum.join(error_messages, ", ")}. Pools: #{Enum.join(Map.keys(pools), ", ")}"
"Cannot read Uniswap V3 Pool contract public getters: token0(), token1(), fee(). Error messages: #{Enum.join(error_messages, ", ")}. Pools: #{Enum.join(pools, ", ")}"
)
false
@ -712,6 +711,35 @@ defmodule Indexer.Transform.TransactionActions do
end
end
defp get_uniswap_pool_from_cache(pool_address) do
with info when info != :undefined <- :ets.info(:tx_actions_uniswap_pools_cache),
[{_, value}] <- :ets.lookup(:tx_actions_uniswap_pools_cache, pool_address) do
value
else
_ -> nil
end
end
defp init_cache(table) do
if :ets.whereis(table) == :undefined do
:ets.new(table, [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
end
end
defp init_token_data_cache do
init_cache(:tx_actions_tokens_data_cache)
end
defp init_uniswap_pools_cache do
init_cache(:tx_actions_uniswap_pools_cache)
end
defp is_address_correct?(address) do
String.match?(address, ~r/^0x[[:xdigit:]]{40}$/i)
end
@ -746,6 +774,10 @@ defmodule Indexer.Transform.TransactionActions do
:ets.insert(:tx_actions_tokens_data_cache, {address, data})
end
defp put_uniswap_pool_to_cache(address, value) do
:ets.insert(:tx_actions_uniswap_pools_cache, {address, value})
end
defp read_contracts_with_retries(requests, abi, retries_left) when retries_left > 0 do
responses = Reader.query_contracts(requests, abi)

Loading…
Cancel
Save