Add batches to TokenInstance fetchers

pull/8313/head
Nikita Pozdniakov 1 year ago
parent 0f33172865
commit ba481afc8d
No known key found for this signature in database
GPG Key ID: F344106F9804FE5F
  1. 1
      CHANGELOG.md
  2. 4
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/encoder.ex
  3. 11
      apps/explorer/lib/explorer/chain.ex
  4. 201
      apps/explorer/test/explorer/token/instance_metadata_retriever_test.exs
  5. 170
      apps/indexer/lib/indexer/fetcher/token_instance/helper.ex
  6. 102
      apps/indexer/lib/indexer/fetcher/token_instance/metadata_retriever.ex
  7. 12
      apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex
  8. 16
      apps/indexer/lib/indexer/fetcher/token_instance/retry.ex
  9. 14
      apps/indexer/lib/indexer/fetcher/token_instance/sanitize.ex
  10. 178
      apps/indexer/test/indexer/fetcher/token_instance/helper_test.exs
  11. 1
      apps/indexer/test/test_helper.exs
  12. 7
      config/runtime.exs
  13. 3
      docker-compose/envs/common-blockscout.env
  14. 9
      docker/Makefile

@ -4,6 +4,7 @@
### Features
- [#8313](https://github.com/blockscout/blockscout/pull/8313) - Add batches to TokenInstance fetchers
- [#8181](https://github.com/blockscout/blockscout/pull/8181) - Insert current token balances placeholders along with historical
- [#8210](https://github.com/blockscout/blockscout/pull/8210) - Drop address foreign keys
- [#8292](https://github.com/blockscout/blockscout/pull/8292) - Add ETHEREUM_JSONRPC_WAIT_PER_TIMEOUT env var

@ -69,7 +69,7 @@ defmodule EthereumJSONRPC.Encoder do
end
end
def decode_result(result, selectors, _leave_error_as_map) when is_list(selectors) do
def decode_result(%{id: id, result: _result} = result, selectors, _leave_error_as_map) when is_list(selectors) do
selectors
|> Enum.map(fn selector ->
try do
@ -78,7 +78,7 @@ defmodule EthereumJSONRPC.Encoder do
_ -> :error
end
end)
|> Enum.find(fn decode ->
|> Enum.find({id, {:error, :unable_to_decode}}, fn decode ->
case decode do
{_id, {:ok, _}} -> true
_ -> false

@ -4690,6 +4690,9 @@ defmodule Explorer.Chain do
end
end
@doc """
Expects map of change params. Inserts using on_conflict: :replace_all
"""
@spec upsert_token_instance(map()) :: {:ok, Instance.t()} | {:error, Ecto.Changeset.t()}
def upsert_token_instance(params) do
changeset = Instance.changeset(%Instance{}, params)
@ -4700,6 +4703,14 @@ defmodule Explorer.Chain do
)
end
@doc """
Inserts list of token instances via upsert_token_instance/1.
"""
@spec upsert_token_instances_list([map()]) :: list()
def upsert_token_instances_list(instances) do
Enum.map(instances, &upsert_token_instance/1)
end
@doc """
Update a new `t:Token.t/0` record.

File diff suppressed because one or more lines are too long

@ -3,45 +3,171 @@ defmodule Indexer.Fetcher.TokenInstance.Helper do
Common functions for Indexer.Fetcher.TokenInstance fetchers
"""
alias Explorer.Chain
alias Explorer.Chain.{Hash, Token.Instance}
alias Explorer.SmartContract.Reader
alias Indexer.Fetcher.TokenInstance.MetadataRetriever
@spec fetch_instance(Hash.Address.t(), Decimal.t() | non_neg_integer()) :: {:ok, Instance.t()}
def fetch_instance(token_contract_address_hash, token_id) do
token_id = prepare_token_id(token_id)
@cryptokitties_address_hash "0x06012c8cf97bead5deae237070f9587f8e7a266d"
case MetadataRetriever.fetch_metadata(to_string(token_contract_address_hash), token_id) do
{:ok, %{metadata: metadata}} ->
params = %{
token_id: token_id,
token_contract_address_hash: token_contract_address_hash,
metadata: metadata,
error: nil
@token_uri "c87b56dd"
@uri "0e89341c"
@erc_721_1155_abi [
%{
"type" => "function",
"stateMutability" => "view",
"payable" => false,
"outputs" => [
%{"type" => "string", "name" => ""}
],
"name" => "tokenURI",
"inputs" => [
%{
"type" => "uint256",
"name" => "_tokenId"
}
],
"constant" => true
},
%{
"type" => "function",
"stateMutability" => "view",
"payable" => false,
"outputs" => [
%{
"type" => "string",
"name" => "",
"internalType" => "string"
}
],
"name" => "uri",
"inputs" => [
%{
"type" => "uint256",
"name" => "_id",
"internalType" => "uint256"
}
],
"constant" => true
}
]
{:ok, _result} = Chain.upsert_token_instance(params)
@spec batch_fetch_instances([%{}]) :: list()
def batch_fetch_instances(token_instances) do
token_instances =
Enum.map(token_instances, fn
%{contract_address_hash: hash, token_id: token_id} -> {hash, token_id}
{_, _} = tuple -> tuple
end)
splitted =
Enum.group_by(token_instances, fn {contract_address_hash, _token_id} ->
to_string(contract_address_hash) == @cryptokitties_address_hash
end)
cryptokitties =
(splitted[true] || [])
|> Enum.map(fn {contract_address_hash, token_id} ->
{{:ok, ["https://api.cryptokitties.co/kitties/{id}"]}, to_string(token_id), contract_address_hash, token_id}
end)
other = splitted[false] || []
token_types_map =
Enum.reduce(other, %{}, fn {contract_address_hash, _token_id}, acc ->
address_hash_string = to_string(contract_address_hash)
Map.put_new(acc, address_hash_string, Chain.get_token_type(contract_address_hash))
end)
contract_results =
(other
|> Enum.map(fn {contract_address_hash, token_id} ->
token_id = prepare_token_id(token_id)
contract_address_hash_string = to_string(contract_address_hash)
prepare_request(token_types_map[contract_address_hash_string], contract_address_hash_string, token_id)
end)
|> Reader.query_contracts(@erc_721_1155_abi, [], false)
|> Enum.zip_reduce(other, [], fn result, {contract_address_hash, token_id}, acc ->
token_id = prepare_token_id(token_id)
{:ok, %{error: error}} ->
upsert_token_instance_with_error(token_id, token_contract_address_hash, error)
[
{result, normalize_token_id(token_types_map[to_string(contract_address_hash)], token_id),
contract_address_hash, token_id}
| acc
]
end)
|> Enum.reverse()) ++
cryptokitties
{:error_code, code} ->
upsert_token_instance_with_error(token_id, token_contract_address_hash, "request error: #{code}")
contract_results
|> Enum.map(fn {result, normalized_token_id, _contract_address_hash, _token_id} ->
Task.async(fn -> MetadataRetriever.fetch_json(result, normalized_token_id) end)
end)
|> Task.yield_many(:infinity)
|> Enum.zip(contract_results)
|> Enum.map(fn {{_task, res}, {_result, _normalized_token_id, contract_address_hash, token_id}} ->
case res do
{:ok, result} ->
result_to_insert_params(result, contract_address_hash, token_id)
{:error, reason} ->
upsert_token_instance_with_error(token_id, token_contract_address_hash, reason)
{:exit, reason} ->
result_to_insert_params(
{:error, MetadataRetriever.truncate_error("Terminated:" <> inspect(reason))},
contract_address_hash,
token_id
)
end
end)
|> Chain.upsert_token_instances_list()
end
defp prepare_token_id(%Decimal{} = token_id), do: Decimal.to_integer(token_id)
defp prepare_token_id(token_id), do: token_id
defp upsert_token_instance_with_error(token_id, token_contract_address_hash, error) do
params = %{
defp prepare_request("ERC-721", contract_address_hash_string, token_id) do
%{
contract_address: contract_address_hash_string,
method_id: @token_uri,
args: [token_id],
block_number: nil
}
end
defp prepare_request(_token_type, contract_address_hash_string, token_id) do
%{
contract_address: contract_address_hash_string,
method_id: @uri,
args: [token_id],
block_number: nil
}
end
defp normalize_token_id("ERC-721", _token_id), do: nil
defp normalize_token_id(_token_type, token_id),
do: token_id |> Integer.to_string(16) |> String.downcase() |> String.pad_leading(64, "0")
defp result_to_insert_params({:ok, %{metadata: metadata}}, token_contract_address_hash, token_id) do
%{
token_id: token_id,
token_contract_address_hash: token_contract_address_hash,
error: error
metadata: metadata,
error: nil
}
end
defp result_to_insert_params({:error_code, code}, token_contract_address_hash, token_id),
do: token_instance_map_with_error(token_id, token_contract_address_hash, "request error: #{code}")
defp result_to_insert_params({:error, reason}, token_contract_address_hash, token_id),
do: token_instance_map_with_error(token_id, token_contract_address_hash, reason)
{:ok, _result} = Chain.upsert_token_instance(params)
defp token_instance_map_with_error(token_id, token_contract_address_hash, error) do
%{
token_id: token_id,
token_contract_address_hash: token_contract_address_hash,
error: error
}
end
end

@ -1,6 +1,6 @@
defmodule Indexer.Fetcher.TokenInstance.MetadataRetriever do
@moduledoc """
Fetches ERC721 token instance metadata.
Fetches ERC-721 & ERC-1155 token instance metadata.
"""
require Logger
@ -8,55 +8,6 @@ defmodule Indexer.Fetcher.TokenInstance.MetadataRetriever do
alias Explorer.SmartContract.Reader
alias HTTPoison.{Error, Response}
@token_uri "c87b56dd"
@abi [
%{
"type" => "function",
"stateMutability" => "view",
"payable" => false,
"outputs" => [
%{"type" => "string", "name" => ""}
],
"name" => "tokenURI",
"inputs" => [
%{
"type" => "uint256",
"name" => "_tokenId"
}
],
"constant" => true
}
]
@uri "0e89341c"
@abi_uri [
%{
"type" => "function",
"stateMutability" => "view",
"payable" => false,
"outputs" => [
%{
"type" => "string",
"name" => "",
"internalType" => "string"
}
],
"name" => "uri",
"inputs" => [
%{
"type" => "uint256",
"name" => "_id",
"internalType" => "uint256"
}
],
"constant" => true
}
]
@cryptokitties_address_hash "0x06012c8cf97bead5deae237070f9587f8e7a266d"
@no_uri_error "no uri"
@vm_execution_error "VM execution error"
@ipfs_protocol "ipfs://"
@ -69,54 +20,29 @@ defmodule Indexer.Fetcher.TokenInstance.MetadataRetriever do
@ignored_hosts ["localhost", "127.0.0.1", "0.0.0.0", "", nil]
def fetch_metadata(unquote(@cryptokitties_address_hash), token_id) do
%{@token_uri => {:ok, ["https://api.cryptokitties.co/kitties/{id}"]}}
|> fetch_json(to_string(token_id))
end
def fetch_metadata(contract_address_hash, token_id) do
# c87b56dd = keccak256(tokenURI(uint256))
contract_functions = %{@token_uri => [token_id]}
res =
contract_address_hash
|> query_contract(contract_functions, @abi)
|> fetch_json()
if res == {:ok, %{error: @vm_execution_error}} do
hex_normalized_token_id = token_id |> Integer.to_string(16) |> String.downcase() |> String.pad_leading(64, "0")
contract_functions_uri = %{@uri => [token_id]}
contract_address_hash
|> query_contract(contract_functions_uri, @abi_uri)
|> fetch_json(hex_normalized_token_id)
else
res
end
end
def query_contract(contract_address_hash, contract_functions, abi) do
Reader.query_contract(contract_address_hash, abi, contract_functions, false)
end
@doc """
Fetch/parse metadata using smart-contract's response
"""
@spec fetch_json(any, binary() | nil) :: {:error, binary} | {:error_code, any} | {:ok, %{metadata: any}}
def fetch_json(uri, hex_token_id \\ nil)
def fetch_json(uri, _hex_token_id) when uri in [%{@token_uri => {:ok, [""]}}, %{@uri => {:ok, [""]}}] do
{:ok, %{error: @no_uri_error}}
def fetch_json(uri, _hex_token_id) when uri in [{:ok, [""]}, {:ok, [""]}] do
{:error, @no_uri_error}
end
def fetch_json(%{@token_uri => uri}, hex_token_id) do
fetch_json_from_uri(uri, hex_token_id)
end
def fetch_json(%{@uri => uri}, hex_token_id) do
def fetch_json(uri, hex_token_id) do
fetch_json_from_uri(uri, hex_token_id)
end
defp fetch_json_from_uri({:error, error}, _hex_token_id) do
error = to_string(error)
if error =~ "execution reverted" or error =~ @vm_execution_error do
{:ok, %{error: @vm_execution_error}}
{:error, @vm_execution_error}
else
Logger.debug(["Unknown metadata format error #{inspect(error)}."], fetcher: :token_instances)
@ -351,5 +277,9 @@ defmodule Indexer.Fetcher.TokenInstance.MetadataRetriever do
String.replace(token_uri, @erc1155_token_id_placeholder, hex_token_id)
end
defp truncate_error(error), do: String.slice(error, 0, @max_error_length)
@doc """
Truncate error string to @max_error_length symbols
"""
@spec truncate_error(binary()) :: binary()
def truncate_error(error), do: String.slice(error, 0, @max_error_length)
end

@ -32,10 +32,12 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do
end
@impl BufferedTask
def run([%{contract_address_hash: hash, token_id: token_id}], _json_rpc_named_arguments) do
if not Chain.token_instance_exists?(token_id, hash) do
fetch_instance(hash, token_id)
end
def run(token_instances, _) when is_list(token_instances) do
token_instances
|> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} ->
not Chain.token_instance_exists?(token_id, hash)
end)
|> batch_fetch_instances()
:ok
end
@ -75,7 +77,7 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do
[
flush_interval: 100,
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: @default_max_batch_size,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
poll: false,
task_supervisor: __MODULE__.TaskSupervisor
]

@ -13,7 +13,7 @@ defmodule Indexer.Fetcher.TokenInstance.Retry do
@behaviour BufferedTask
@default_max_batch_size 1
@default_max_batch_size 10
@default_max_concurrency 10
@doc false
@ -40,14 +40,16 @@ defmodule Indexer.Fetcher.TokenInstance.Retry do
end
@impl BufferedTask
def run([%{contract_address_hash: hash, token_id: token_id, updated_at: updated_at}], _json_rpc_named_arguments) do
def run(token_instances, _json_rpc_named_arguments) when is_list(token_instances) do
refetch_interval = Application.get_env(:indexer, __MODULE__)[:refetch_interval]
if updated_at
token_instances
|> Enum.filter(fn %{contract_address_hash: _hash, token_id: _token_id, updated_at: updated_at} ->
updated_at
|> DateTime.add(refetch_interval, :millisecond)
|> DateTime.compare(DateTime.utc_now()) != :gt do
fetch_instance(hash, token_id)
end
|> DateTime.compare(DateTime.utc_now()) != :gt
end)
|> batch_fetch_instances()
:ok
end
@ -56,7 +58,7 @@ defmodule Indexer.Fetcher.TokenInstance.Retry do
[
flush_interval: :timer.minutes(10),
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: @default_max_batch_size,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
task_supervisor: __MODULE__.TaskSupervisor
]
end

@ -13,7 +13,7 @@ defmodule Indexer.Fetcher.TokenInstance.Sanitize do
@behaviour BufferedTask
@default_max_batch_size 1
@default_max_batch_size 10
@default_max_concurrency 10
@doc false
def child_spec([init_options, gen_server_options]) do
@ -36,10 +36,12 @@ defmodule Indexer.Fetcher.TokenInstance.Sanitize do
end
@impl BufferedTask
def run([%{contract_address_hash: hash, token_id: token_id}], _json_rpc_named_arguments) do
if not Chain.token_instance_exists?(token_id, hash) do
fetch_instance(hash, token_id)
end
def run(token_instances, _) when is_list(token_instances) do
token_instances
|> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} ->
not Chain.token_instance_exists?(token_id, hash)
end)
|> batch_fetch_instances()
:ok
end
@ -48,7 +50,7 @@ defmodule Indexer.Fetcher.TokenInstance.Sanitize do
[
flush_interval: :infinity,
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: @default_max_batch_size,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
poll: false,
task_supervisor: __MODULE__.TaskSupervisor
]

File diff suppressed because one or more lines are too long

@ -17,6 +17,7 @@ end
Mox.defmock(EthereumJSONRPC.Mox, for: EthereumJSONRPC.Transport)
Mox.defmock(Indexer.BufferedTaskTest.RetryableTask, for: Indexer.BufferedTask)
Mox.defmock(Indexer.BufferedTaskTest.ShrinkableTask, for: Indexer.BufferedTask)
Mox.defmock(Explorer.Mox.HTTPoison, for: HTTPoison.Base)
ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
ExUnit.start()

@ -507,13 +507,16 @@ config :indexer, Indexer.Fetcher.BlockReward,
config :indexer, Indexer.Fetcher.TokenInstance.Retry,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_RETRY_CONCURRENCY", 10),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_RETRY_BATCH_SIZE", 10),
refetch_interval: ConfigHelper.parse_time_env_var("INDEXER_TOKEN_INSTANCE_RETRY_REFETCH_INTERVAL", "24h")
config :indexer, Indexer.Fetcher.TokenInstance.Realtime,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY", 10)
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY", 10),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE", 1)
config :indexer, Indexer.Fetcher.TokenInstance.Sanitize,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY", 10)
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY", 10),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE", 10)
config :indexer, Indexer.Fetcher.InternalTransaction,
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_INTERNAL_TRANSACTIONS_BATCH_SIZE", 10),

@ -211,3 +211,6 @@ ACCOUNT_REDIS_URL=redis://redis_db:6379
EIP_1559_ELASTICITY_MULTIPLIER=2
# API_SENSITIVE_ENDPOINTS_KEY=
# ACCOUNT_VERIFICATION_EMAIL_RESEND_INTERVAL=
# INDEXER_TOKEN_INSTANCE_RETRY_BATCH_SIZE=10
# INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE=1
# INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE=10

@ -732,6 +732,15 @@ endif
ifdef ACCOUNT_VERIFICATION_EMAIL_RESEND_INTERVAL
BLOCKSCOUT_CONTAINER_PARAMS += -e 'ACCOUNT_VERIFICATION_EMAIL_RESEND_INTERVAL=$(ACCOUNT_VERIFICATION_EMAIL_RESEND_INTERVAL)'
endif
ifdef INDEXER_TOKEN_INSTANCE_RETRY_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_INSTANCE_RETRY_BATCH_SIZE=$(INDEXER_TOKEN_INSTANCE_RETRY_BATCH_SIZE)'
endif
ifdef INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE=$(INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE)'
endif
ifdef INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE
BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE=$(INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE)'
endif
HAS_BLOCKSCOUT_IMAGE := $(shell docker images | grep -sw "${BS_CONTAINER_IMAGE} ")
build:

Loading…
Cancel
Save