diff --git a/CHANGELOG.md b/CHANGELOG.md index 87ce66531b..f14e646f88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features +- [#7286](https://github.com/blockscout/blockscout/pull/7286) - Split token instance fetcher - [#7246](https://github.com/blockscout/blockscout/pull/7246) - Fallback JSON RPC option ### Fixes diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 18fad4e57a..42660861f7 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -5007,7 +5007,11 @@ defmodule Explorer.Chain do def stream_token_instances_with_error(initial, reducer) when is_function(reducer, 2) do Instance |> where([instance], not is_nil(instance.error)) - |> select([instance], %{contract_address_hash: instance.token_contract_address_hash, token_id: instance.token_id}) + |> select([instance], %{ + contract_address_hash: instance.token_contract_address_hash, + token_id: instance.token_id, + updated_at: instance.updated_at + }) |> Repo.stream_reduce(initial, reducer) end @@ -5310,8 +5314,7 @@ defmodule Explorer.Chain do @spec erc721_or_erc1155_token_instance_from_token_id_and_token_address(non_neg_integer(), Hash.Address.t(), [api?]) :: {:ok, Instance.t()} | {:error, :not_found} def erc721_or_erc1155_token_instance_from_token_id_and_token_address(token_id, token_contract_address, options \\ []) do - query = - from(i in Instance, where: i.token_contract_address_hash == ^token_contract_address and i.token_id == ^token_id) + query = Instance.token_instance_query(token_id, token_contract_address) case select_repo(options).one(query) do nil -> {:error, :not_found} @@ -5319,6 +5322,13 @@ defmodule Explorer.Chain do end end + @spec token_instance_exists?(non_neg_integer, Hash.Address.t(), [api?]) :: boolean + def token_instance_exists?(token_id, token_contract_address, options \\ []) do + query = Instance.token_instance_query(token_id, token_contract_address) + + select_repo(options).exists?(query) + end + defp fetch_coin_balances(address_hash, paging_options) do address = Repo.get_by(Address, hash: address_hash) diff --git a/apps/explorer/lib/explorer/chain/token/instance.ex b/apps/explorer/lib/explorer/chain/token/instance.ex index d55d60c7ba..ea91796e32 100644 --- a/apps/explorer/lib/explorer/chain/token/instance.ex +++ b/apps/explorer/lib/explorer/chain/token/instance.ex @@ -89,4 +89,8 @@ defmodule Explorer.Chain.Token.Instance do select: to_address ) end + + @spec token_instance_query(non_neg_integer(), Hash.Address.t()) :: Ecto.Query.t() + def token_instance_query(token_id, token_contract_address), + do: from(i in Instance, where: i.token_contract_address_hash == ^token_contract_address and i.token_id == ^token_id) end diff --git a/apps/explorer/lib/explorer/token/instance_metadata_retriever.ex b/apps/explorer/lib/explorer/token/instance_metadata_retriever.ex index fd53c07b3b..b3d38281c0 100644 --- a/apps/explorer/lib/explorer/token/instance_metadata_retriever.ex +++ b/apps/explorer/lib/explorer/token/instance_metadata_retriever.ex @@ -65,6 +65,10 @@ defmodule Explorer.Token.InstanceMetadataRetriever do # https://eips.ethereum.org/EIPS/eip-1155#metadata @erc1155_token_id_placeholder "{id}" + @max_error_length 100 + + @ignored_hosts ["localhost", "127.0.0.1", "0.0.0.0", "", nil] + def fetch_metadata(unquote(@cryptokitties_address_hash), token_id) do %{@token_uri => {:ok, ["https://api.cryptokitties.co/kitties/{id}"]}} |> fetch_json(to_string(token_id)) @@ -110,15 +114,14 @@ defmodule Explorer.Token.InstanceMetadataRetriever do fetch_json_from_uri(uri, hex_token_id) end - def fetch_json(result, hex_token_id) do - case URI.parse(result) do - %URI{host: nil} -> - Logger.debug(["Unknown metadata format #{inspect(result)}."], fetcher: :token_instances) - - {:error, result} + defp fetch_json_from_uri({:error, error}, _hex_token_id) do + if error =~ "execution reverted" or error =~ @vm_execution_error do + {:ok, %{error: @vm_execution_error}} + else + Logger.debug(["Unknown metadata format error #{inspect(error)}."], fetcher: :token_instances) - _ -> - fetch_json_from_uri({:ok, [result]}, hex_token_id) + # truncate error since it will be stored in DB + {:error, truncate_error(error)} end end @@ -129,17 +132,7 @@ defmodule Explorer.Token.InstanceMetadataRetriever do else Logger.debug(["Unknown metadata format result #{inspect(result)}."], fetcher: :token_instances) - {:error, result} - end - end - - defp fetch_json_from_uri({:error, error}, _hex_token_id) do - if error =~ "execution reverted" or error =~ @vm_execution_error do - {:ok, %{error: @vm_execution_error}} - else - Logger.debug(["Unknown metadata format error #{inspect(error)}."], fetcher: :token_instances) - - {:error, error} + {:error, truncate_error(result)} end end @@ -175,7 +168,7 @@ defmodule Explorer.Token.InstanceMetadataRetriever do fetch_json_from_uri({:ok, [base64_decoded]}, hex_token_id) _ -> - {:error, base64_encoded_json} + {:error, "invalid data:application/json;base64"} end rescue e -> @@ -187,7 +180,7 @@ defmodule Explorer.Token.InstanceMetadataRetriever do fetcher: :token_instances ) - {:error, base64_encoded_json} + {:error, "invalid data:application/json;base64"} end defp fetch_json_from_uri({:ok, ["#{@ipfs_protocol}ipfs/" <> right]}, hex_token_id) do @@ -208,13 +201,13 @@ defmodule Explorer.Token.InstanceMetadataRetriever do fetcher: :token_instances ) - {:error, :invalid_json} + {:error, "invalid json"} end defp fetch_json_from_uri(uri, _hex_token_id) do Logger.debug(["Unknown metadata uri format #{inspect(uri)}."], fetcher: :token_instances) - {:error, :unknown_metadata_uri_format} + {:error, "unknown metadata uri format"} end defp fetch_from_ipfs(ipfs_uid, hex_token_id) do @@ -232,10 +225,20 @@ defmodule Explorer.Token.InstanceMetadataRetriever do fetcher: :token_instances ) - {:error, :request_error} + {:error, "preparation error"} end def fetch_metadata_from_uri(uri, hex_token_id \\ nil) do + case Mix.env() != :test && URI.parse(uri) do + %URI{host: host} when host in @ignored_hosts -> + {:error, "ignored host #{host}"} + + _ -> + fetch_metadata_from_uri_inner(uri, hex_token_id) + end + end + + def fetch_metadata_from_uri_inner(uri, hex_token_id) do case Application.get_env(:explorer, :http_adapter).get(uri, [], timeout: 60_000, recv_timeout: 60_000, @@ -247,10 +250,20 @@ defmodule Explorer.Token.InstanceMetadataRetriever do check_content_type(content_type, uri, hex_token_id, body) {:ok, %Response{body: body, status_code: code}} -> - {:error, code, body} + Logger.debug( + ["Request to token uri: #{inspect(uri)} failed with code #{code}. Body:", inspect(body)], + fetcher: :token_instances + ) + + {:error_code, code} {:error, %Error{reason: reason}} -> - {:error, reason} + Logger.debug( + ["Request to token uri failed: #{inspect(uri)}.", inspect(reason)], + fetcher: :token_instances + ) + + {:error, reason |> inspect(reason) |> truncate_error()} end rescue e -> @@ -259,7 +272,7 @@ defmodule Explorer.Token.InstanceMetadataRetriever do fetcher: :token_instances ) - {:error, :request_error} + {:error, "request error"} end defp check_content_type(content_type, uri, hex_token_id, body) do @@ -325,7 +338,7 @@ defmodule Explorer.Token.InstanceMetadataRetriever do end defp check_type(_, _) do - {:error, :wrong_metadata_type} + {:error, "wrong metadata type"} end defp substitute_token_id_to_token_uri(token_uri, empty_token_id) when empty_token_id in [nil, ""], do: token_uri @@ -333,4 +346,6 @@ defmodule Explorer.Token.InstanceMetadataRetriever do defp substitute_token_id_to_token_uri(token_uri, hex_token_id) do String.replace(token_uri, @erc1155_token_id_placeholder, hex_token_id) end + + defp truncate_error(error), do: String.slice(error, 0, @max_error_length) end diff --git a/apps/explorer/test/explorer/token/instance_metadata_retriever_test.exs b/apps/explorer/test/explorer/token/instance_metadata_retriever_test.exs index acf67b50b5..d6a5fb2880 100644 --- a/apps/explorer/test/explorer/token/instance_metadata_retriever_test.exs +++ b/apps/explorer/test/explorer/token/instance_metadata_retriever_test.exs @@ -507,7 +507,7 @@ defmodule Explorer.Token.InstanceMetadataRetrieverTest do "name" => "asda", "salePrice" => 34 } - }} == InstanceMetadataRetriever.fetch_json(data) + }} == InstanceMetadataRetriever.fetch_json(%{"0e89341c" => {:ok, [data]}}) Application.put_env(:explorer, :http_adapter, HTTPoison) end @@ -552,7 +552,8 @@ defmodule Explorer.Token.InstanceMetadataRetrieverTest do assert {:ok, %{ metadata: Jason.decode!(json) - }} == InstanceMetadataRetriever.fetch_json("http://localhost:#{bypass.port}#{path}") + }} == + InstanceMetadataRetriever.fetch_json(%{"0e89341c" => {:ok, ["http://localhost:#{bypass.port}#{path}"]}}) end end end diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index b75d352356..cfa42e49f5 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -22,10 +22,10 @@ defmodule Indexer.Block.Fetcher do CoinBalance, ContractCode, InternalTransaction, + Realtime.TokenInstance, ReplacedTransaction, Token, TokenBalance, - Realtime, UncleBlock } @@ -253,7 +253,7 @@ defmodule Indexer.Block.Fetcher do end def async_import_token_instances(%{token_transfers: token_transfers}) do - Realtime.TokenInstance.async_fetch(token_transfers) + TokenInstance.async_fetch(token_transfers) end def async_import_token_instances(_), do: :ok diff --git a/apps/indexer/lib/indexer/buffered_task.ex b/apps/indexer/lib/indexer/buffered_task.ex index 574aa05d9d..7f2565ffe2 100644 --- a/apps/indexer/lib/indexer/buffered_task.ex +++ b/apps/indexer/lib/indexer/buffered_task.ex @@ -466,8 +466,12 @@ defmodule Indexer.BufferedTask do end defp schedule_next_buffer_flush(state) do - timer = Process.send_after(self(), :flush, state.flush_interval) - %{state | flush_timer: timer} + if state.flush_interval == :infinity do + state + else + timer = Process.send_after(self(), :flush, state.flush_interval) + %{state | flush_timer: timer} + end end defp shrinkable(options) do diff --git a/apps/indexer/lib/indexer/fetcher/catchup/token_instance.ex b/apps/indexer/lib/indexer/fetcher/catchup/token_instance.ex deleted file mode 100644 index 6dde74475e..0000000000 --- a/apps/indexer/lib/indexer/fetcher/catchup/token_instance.ex +++ /dev/null @@ -1,98 +0,0 @@ -defmodule Indexer.Fetcher.Catchup.TokenInstance do - @moduledoc """ - Fetches information about a token instance. - """ - - use Indexer.Fetcher, restart: :permanent - use Spandex.Decorators - - require Logger - - alias Explorer.Chain - alias Explorer.Token.InstanceMetadataRetriever - alias Indexer.BufferedTask - - @behaviour BufferedTask - - @default_max_batch_size 1 - @default_max_concurrency 10 - - @doc false - def child_spec([init_options, gen_server_options]) do - {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) - - unless state do - raise ArgumentError, - ":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> - "to allow for json_rpc calls when running." - end - - merged_init_opts = - defaults() - |> Keyword.merge(mergeable_init_options) - |> Keyword.put(:state, state) - - Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__) - end - - @impl BufferedTask - def init(initial_acc, reducer, _) do - {:ok, acc} = - Chain.stream_unfetched_token_instances(initial_acc, fn data, acc -> - reducer.(data, acc) - end) - - acc - end - - @impl BufferedTask - def run([%{contract_address_hash: hash, token_id: token_id}], _json_rpc_named_arguments) do - fetch_instance(hash, token_id) - - :ok - end - - defp fetch_instance(token_contract_address_hash, token_id) do - case InstanceMetadataRetriever.fetch_metadata(to_string(token_contract_address_hash), Decimal.to_integer(token_id)) do - {:ok, %{metadata: metadata}} -> - params = %{ - token_id: token_id, - token_contract_address_hash: token_contract_address_hash, - metadata: metadata, - error: nil - } - - {:ok, _result} = Chain.upsert_token_instance(params) - - {:ok, %{error: error}} -> - params = %{ - token_id: token_id, - token_contract_address_hash: token_contract_address_hash, - error: error - } - - {:ok, _result} = Chain.upsert_token_instance(params) - - result -> - Logger.debug( - [ - "failed to fetch token instance metadata for #{inspect({to_string(token_contract_address_hash), Decimal.to_integer(token_id)})}: ", - inspect(result) - ], - fetcher: :token_instances - ) - - :ok - end - end - - defp defaults do - [ - flush_interval: :timer.seconds(3), - max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, - max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size, - poll: true, - task_supervisor: __MODULE__.TaskSupervisor - ] - end -end diff --git a/apps/indexer/lib/indexer/fetcher/realtime/token_instance.ex b/apps/indexer/lib/indexer/fetcher/realtime/token_instance.ex index 2b047b8631..8d0185ff6f 100644 --- a/apps/indexer/lib/indexer/fetcher/realtime/token_instance.ex +++ b/apps/indexer/lib/indexer/fetcher/realtime/token_instance.ex @@ -6,10 +6,9 @@ defmodule Indexer.Fetcher.Realtime.TokenInstance do use Indexer.Fetcher, restart: :permanent use Spandex.Decorators - require Logger + import Indexer.Fetcher.TokenInstance alias Explorer.Chain - alias Explorer.Token.InstanceMetadataRetriever alias Indexer.BufferedTask @behaviour BufferedTask @@ -42,45 +41,13 @@ defmodule Indexer.Fetcher.Realtime.TokenInstance do @impl BufferedTask def run([%{contract_address_hash: hash, token_id: token_id}], _json_rpc_named_arguments) do - fetch_instance(hash, token_id) + if not Chain.token_instance_exists?(token_id, hash) do + fetch_instance(hash, token_id, false) + end :ok end - defp fetch_instance(token_contract_address_hash, token_id) do - case InstanceMetadataRetriever.fetch_metadata(to_string(token_contract_address_hash), Decimal.to_integer(token_id)) do - {:ok, %{metadata: metadata}} -> - params = %{ - token_id: token_id, - token_contract_address_hash: token_contract_address_hash, - metadata: metadata, - error: nil - } - - {:ok, _result} = Chain.upsert_token_instance(params) - - {:ok, %{error: error}} -> - params = %{ - token_id: token_id, - token_contract_address_hash: token_contract_address_hash, - error: error - } - - {:ok, _result} = Chain.upsert_token_instance(params) - - result -> - Logger.debug( - [ - "failed to fetch token instance metadata for #{inspect({to_string(token_contract_address_hash), Decimal.to_integer(token_id)})}: ", - inspect(result) - ], - fetcher: :token_instances - ) - - :ok - end - end - @doc """ Fetches token instance data asynchronously. """ @@ -114,10 +81,10 @@ defmodule Indexer.Fetcher.Realtime.TokenInstance do defp defaults do [ - flush_interval: :timer.seconds(3), + flush_interval: 100, max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, - max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size, - poll: true, + max_batch_size: @default_max_batch_size, + poll: false, task_supervisor: __MODULE__.TaskSupervisor ] end diff --git a/apps/indexer/lib/indexer/fetcher/retry/token_instance.ex b/apps/indexer/lib/indexer/fetcher/retry/token_instance.ex new file mode 100644 index 0000000000..7a4f643e8d --- /dev/null +++ b/apps/indexer/lib/indexer/fetcher/retry/token_instance.ex @@ -0,0 +1,69 @@ +defmodule Indexer.Fetcher.Retry.TokenInstance do + @moduledoc """ + Fetches information about a token instance. + """ + + use Indexer.Fetcher, restart: :permanent + use Spandex.Decorators + + import Indexer.Fetcher.TokenInstance + + alias Explorer.Chain + alias Indexer.BufferedTask + + @behaviour BufferedTask + + @default_max_batch_size 1 + @default_max_concurrency 10 + + @doc false + def child_spec([init_options, gen_server_options]) do + {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) + + unless state do + raise ArgumentError, + ":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> + "to allow for json_rpc calls when running." + end + + merged_init_opts = + defaults() + |> Keyword.merge(mergeable_init_options) + |> Keyword.put(:state, state) + + Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__) + end + + @impl BufferedTask + def init(initial_acc, reducer, _) do + {:ok, acc} = + Chain.stream_token_instances_with_error(initial_acc, fn data, acc -> + reducer.(data, acc) + end) + + acc + end + + @impl BufferedTask + def run([%{contract_address_hash: hash, token_id: token_id, updated_at: updated_at}], _json_rpc_named_arguments) do + refetch_interval = Application.get_env(:indexer, __MODULE__)[:refetch_interval] + + if updated_at + |> DateTime.add(refetch_interval, :millisecond) + |> DateTime.compare(DateTime.utc_now()) != :gt do + fetch_instance(hash, token_id, true) + end + + :ok + end + + defp defaults do + [ + flush_interval: :timer.minutes(10), + max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, + max_batch_size: @default_max_batch_size, + poll: true, + task_supervisor: __MODULE__.TaskSupervisor + ] + end +end diff --git a/apps/indexer/lib/indexer/fetcher/sanitize/token_instance.ex b/apps/indexer/lib/indexer/fetcher/sanitize/token_instance.ex index 1f08b17286..9b355cb3b2 100644 --- a/apps/indexer/lib/indexer/fetcher/sanitize/token_instance.ex +++ b/apps/indexer/lib/indexer/fetcher/sanitize/token_instance.ex @@ -6,17 +6,15 @@ defmodule Indexer.Fetcher.Sanitize.TokenInstance do use Indexer.Fetcher, restart: :permanent use Spandex.Decorators - require Logger + import Indexer.Fetcher.TokenInstance alias Explorer.Chain - alias Explorer.Token.InstanceMetadataRetriever alias Indexer.BufferedTask @behaviour BufferedTask @default_max_batch_size 1 @default_max_concurrency 10 - @doc false def child_spec([init_options, gen_server_options]) do {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) @@ -38,7 +36,7 @@ defmodule Indexer.Fetcher.Sanitize.TokenInstance do @impl BufferedTask def init(initial_acc, reducer, _) do {:ok, acc} = - Chain.stream_token_instances_with_error(initial_acc, fn data, acc -> + Chain.stream_unfetched_token_instances(initial_acc, fn data, acc -> reducer.(data, acc) end) @@ -47,51 +45,19 @@ defmodule Indexer.Fetcher.Sanitize.TokenInstance do @impl BufferedTask def run([%{contract_address_hash: hash, token_id: token_id}], _json_rpc_named_arguments) do - fetch_instance(hash, token_id) + if not Chain.token_instance_exists?(token_id, hash) do + fetch_instance(hash, token_id, false) + end :ok end - defp fetch_instance(token_contract_address_hash, token_id) do - case InstanceMetadataRetriever.fetch_metadata(to_string(token_contract_address_hash), Decimal.to_integer(token_id)) do - {:ok, %{metadata: metadata}} -> - params = %{ - token_id: token_id, - token_contract_address_hash: token_contract_address_hash, - metadata: metadata, - error: nil - } - - {:ok, _result} = Chain.upsert_token_instance(params) - - {:ok, %{error: error}} -> - params = %{ - token_id: token_id, - token_contract_address_hash: token_contract_address_hash, - error: error - } - - {:ok, _result} = Chain.upsert_token_instance(params) - - result -> - Logger.debug( - [ - "failed to fetch token instance metadata for #{inspect({to_string(token_contract_address_hash), Decimal.to_integer(token_id)})}: ", - inspect(result) - ], - fetcher: :token_instances - ) - - :ok - end - end - defp defaults do [ - flush_interval: :timer.seconds(3), + flush_interval: :infinity, max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, - max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size, - poll: true, + max_batch_size: @default_max_batch_size, + poll: false, task_supervisor: __MODULE__.TaskSupervisor ] end diff --git a/apps/indexer/lib/indexer/fetcher/token_instance.ex b/apps/indexer/lib/indexer/fetcher/token_instance.ex index f8f93a3f6a..4a275e22e8 100644 --- a/apps/indexer/lib/indexer/fetcher/token_instance.ex +++ b/apps/indexer/lib/indexer/fetcher/token_instance.ex @@ -1,11 +1,14 @@ defmodule Indexer.Fetcher.TokenInstance do - require Logger - + @moduledoc """ + Common functions for Indexer.Fetcher.TokenInstance fetchers + """ alias Explorer.Chain alias Explorer.Token.InstanceMetadataRetriever - def fetch_instance(token_contract_address_hash, token_id) do - case InstanceMetadataRetriever.fetch_metadata(to_string(token_contract_address_hash), Decimal.to_integer(token_id)) do + def fetch_instance(token_contract_address_hash, token_id, is_retry?) do + token_id = prepare_token_id(token_id) + + case InstanceMetadataRetriever.fetch_metadata(to_string(token_contract_address_hash), token_id) do {:ok, %{metadata: metadata}} -> params = %{ token_id: token_id, @@ -17,37 +20,22 @@ defmodule Indexer.Fetcher.TokenInstance do {:ok, _result} = Chain.upsert_token_instance(params) {:ok, %{error: error}} -> - upsert_token_instance_with_error(token_id, token_contract_address_hash, error) - - {:error, code, body} -> - # Logger.debug( - # [ - # "failed to fetch token instance metadata for #{inspect({to_string(token_contract_address_hash), Decimal.to_integer(token_id)})}: ", - # "http code: #{code}", - # inspect(result) - # ], - # fetcher: :token_instances - # ) - upsert_token_instance_with_error(token_id, token_contract_address_hash, "request error: #{code}") - - {:error, reason} -> - nil + upsert_token_instance_with_error(token_id, token_contract_address_hash, error, is_retry?) - # result -> + {:error_code, code} -> + upsert_token_instance_with_error(token_id, token_contract_address_hash, "request error: #{code}", is_retry?) - # Logger.debug( - # [ - # "failed to fetch token instance metadata for #{inspect({to_string(token_contract_address_hash), Decimal.to_integer(token_id)})}: ", - # inspect(result) - # ], - # fetcher: :token_instances - # ) - - # :ok + {:error, reason} -> + upsert_token_instance_with_error(token_id, token_contract_address_hash, reason, is_retry?) end end - defp upsert_token_instance_with_error(token_id, token_contract_address_hash, error) do + defp prepare_token_id(%Decimal{} = token_id), do: Decimal.to_integer(token_id) + defp prepare_token_id(token_id), do: token_id + + defp upsert_token_instance_with_error(_token_id, _token_contract_address_hash, _error, true), do: :ignore + + defp upsert_token_instance_with_error(token_id, token_contract_address_hash, error, _is_retry?) do params = %{ token_id: token_id, token_contract_address_hash: token_contract_address_hash, diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index 5394b9c622..f34c27460c 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -16,7 +16,6 @@ defmodule Indexer.Supervisor do alias Indexer.Fetcher.{ BlockReward, - Catchup, CoinBalance, ContractCode, EmptyBlocksSanitizer, @@ -25,6 +24,7 @@ defmodule Indexer.Supervisor do PendingTransaction, Realtime, ReplacedTransaction, + Retry, Sanitize, Token, TokenBalance, @@ -109,7 +109,7 @@ defmodule Indexer.Supervisor do {CoinBalance.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, {Token.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, - {Catchup.TokenInstance.Supervisor, + {Retry.TokenInstance.Supervisor, [ [json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor] ]}, diff --git a/config/runtime.exs b/config/runtime.exs index 15a99570fb..d9bdd2c310 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -458,17 +458,15 @@ config :indexer, Indexer.Fetcher.BlockReward, batch_size: ConfigHelper.parse_integer_env_var("INDEXER_BLOCK_REWARD_BATCH_SIZE", 10), concurrency: ConfigHelper.parse_integer_env_var("INDEXER_BLOCK_REWARD_CONCURRENCY", 4) -config :indexer, Indexer.Fetcher.Catchup.TokenInstance, - batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_BATCH_SIZE", 1), - concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_CONCURRENCY", 10) +config :indexer, Indexer.Fetcher.Retry.TokenInstance, + concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_RETRY_CONCURRENCY", 10), + refetch_interval: ConfigHelper.parse_time_env_var("INDEXER_TOKEN_INSTANCE_RETRY_REFETCH_INTERVAL", "24h") config :indexer, Indexer.Fetcher.Realtime.TokenInstance, - batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_BATCH_SIZE", 1), - concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_CONCURRENCY", 10) + concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY", 10) config :indexer, Indexer.Fetcher.Sanitize.TokenInstance, - batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_BATCH_SIZE", 1), - concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_CONCURRENCY", 10) + concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY", 10) config :indexer, Indexer.Fetcher.InternalTransaction, batch_size: ConfigHelper.parse_integer_env_var("INDEXER_INTERNAL_TRANSACTIONS_BATCH_SIZE", 10), diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index dccc0ef53b..77e246ebce 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -97,8 +97,10 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false # INDEXER_INTERNAL_TRANSACTIONS_CONCURRENCY= # INDEXER_BLOCK_REWARD_BATCH_SIZE= # INDEXER_BLOCK_REWARD_CONCURRENCY= -# INDEXER_TOKEN_INSTANCE_BATCH_SIZE= -# INDEXER_TOKEN_INSTANCE_CONCURRENCY= +# INDEXER_TOKEN_INSTANCE_RETRY_REFETCH_INTERVAL= +# INDEXER_TOKEN_INSTANCE_RETRY_CONCURRENCY= +# INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY= +# INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY= # INDEXER_COIN_BALANCES_BATCH_SIZE= # INDEXER_COIN_BALANCES_CONCURRENCY= # INDEXER_RECEIPTS_BATCH_SIZE= diff --git a/docker/Makefile b/docker/Makefile index b9f05cffb8..a18ca27648 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -504,11 +504,17 @@ endif ifdef INDEXER_BLOCK_REWARD_CONCURRENCY BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_BLOCK_REWARD_CONCURRENCY=$(INDEXER_BLOCK_REWARD_CONCURRENCY)' endif -ifdef INDEXER_TOKEN_INSTANCE_BATCH_SIZE - BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_INSTANCE_BATCH_SIZE=$(INDEXER_TOKEN_INSTANCE_BATCH_SIZE)' +ifdef INDEXER_TOKEN_INSTANCE_RETRY_REFETCH_INTERVAL + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_INSTANCE_RETRY_REFETCH_INTERVAL=$(INDEXER_TOKEN_INSTANCE_RETRY_REFETCH_INTERVAL)' endif -ifdef INDEXER_TOKEN_INSTANCE_CONCURRENCY - BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_INSTANCE_CONCURRENCY=$(INDEXER_TOKEN_INSTANCE_CONCURRENCY)' +ifdef INDEXER_TOKEN_INSTANCE_RETRY_CONCURRENCY + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_INSTANCE_RETRY_CONCURRENCY=$(INDEXER_TOKEN_INSTANCE_RETRY_CONCURRENCY)' +endif +ifdef INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY=$(INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY)' +endif +ifdef INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY + BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY=$(INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY)' endif ifdef INDEXER_COIN_BALANCES_BATCH_SIZE BLOCKSCOUT_CONTAINER_PARAMS += -e 'INDEXER_RECEIPTS_BATCH_SIZE=$(INDEXER_RECEIPTS_BATCH_SIZE)'