From 379e81afc223c8f1da3d3b7ca49b31b384f13f3b Mon Sep 17 00:00:00 2001 From: nikitosing <32202610+nikitosing@users.noreply.github.com> Date: Tue, 14 May 2024 18:31:00 +0300 Subject: [PATCH] =?UTF-8?q?feat:=20Add=20optional=20retry=20of=20NFT=20met?= =?UTF-8?q?adata=20fetch=20in=20Indexer.Fetcher.Tok=E2=80=A6=20(#10036)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Add optional retry of NFT metadata fetch in Indexer.Fetcher.TokenInstance.Realtime * Rename env * Process review comments * Rename var --- apps/indexer/lib/indexer/buffered_task.ex | 4 + .../indexer/fetcher/token_instance/helper.ex | 8 +- .../fetcher/token_instance/realtime.ex | 56 +++++++- .../fetcher/token_instance/helper_test.exs | 1 + .../fetcher/token_instance/realtime_test.exs | 123 ++++++++++++++++++ config/runtime.exs | 4 +- 6 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 apps/indexer/test/indexer/fetcher/token_instance/realtime_test.exs diff --git a/apps/indexer/lib/indexer/buffered_task.ex b/apps/indexer/lib/indexer/buffered_task.ex index e728e802c6..51b9d995af 100644 --- a/apps/indexer/lib/indexer/buffered_task.ex +++ b/apps/indexer/lib/indexer/buffered_task.ex @@ -277,6 +277,10 @@ defmodule Indexer.BufferedTask do {:noreply, drop_task_and_retry(state, ref)} end + def handle_info({:buffer, entries}, state) do + {:noreply, buffer_entries(state, entries)} + end + def handle_call({:buffer, entries}, _from, state) do {:reply, :ok, buffer_entries(state, entries)} end diff --git a/apps/indexer/lib/indexer/fetcher/token_instance/helper.ex b/apps/indexer/lib/indexer/fetcher/token_instance/helper.ex index 83c405af27..2608d377ac 100644 --- a/apps/indexer/lib/indexer/fetcher/token_instance/helper.ex +++ b/apps/indexer/lib/indexer/fetcher/token_instance/helper.ex @@ -286,7 +286,13 @@ defmodule Indexer.Fetcher.TokenInstance.Helper do rescue error in Postgrex.Error -> if retrying? do - Logger.warn(["Failed to upsert token instance: #{inspect(error)}"], fetcher: :token_instances) + Logger.warn( + [ + "Failed to upsert token instance: {#{to_string(token_contract_address_hash)}, #{token_id}}, error: #{inspect(error)}" + ], + fetcher: :token_instances + ) + nil else token_id diff --git a/apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex b/apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex index 14375b1601..3c5eabc0b4 100644 --- a/apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex +++ b/apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex @@ -9,6 +9,7 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do import Indexer.Fetcher.TokenInstance.Helper alias Explorer.Chain + alias Explorer.Chain.Token.Instance alias Indexer.BufferedTask @behaviour BufferedTask @@ -16,6 +17,8 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do @default_max_batch_size 1 @default_max_concurrency 10 + @errors_whitelisted_for_retry ["request error: 404", "request error: 500"] + @doc false def child_spec([init_options, gen_server_options]) do merged_init_opts = @@ -33,11 +36,16 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do @impl BufferedTask def run(token_instances, _) when is_list(token_instances) do + retry? = Application.get_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime)[:retry_with_cooldown?] + + token_instances_retry_map = token_instance_to_retry_map(retry?, token_instances) + token_instances - |> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} -> - Chain.token_instance_with_unfetched_metadata?(token_id, hash) + |> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} = instance -> + instance[:retry?] || Chain.token_instance_with_unfetched_metadata?(token_id, hash) end) |> batch_fetch_instances() + |> retry_some_instances(retry?, token_instances_retry_map) :ok end @@ -73,6 +81,50 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do BufferedTask.buffer(__MODULE__, data) end + @spec retry_some_instances([map()], boolean(), map()) :: any() + defp retry_some_instances(token_instances, true, token_instances_retry_map) do + token_instances_to_refetch = + Enum.flat_map(token_instances, fn + {:ok, %Instance{metadata: nil, error: error} = instance} + when error in @errors_whitelisted_for_retry -> + if token_instances_retry_map[{instance.token_contract_address_hash.bytes, instance.token_id}] do + [] + else + [ + %{ + contract_address_hash: instance.token_contract_address_hash, + token_id: instance.token_id, + retry?: true + } + ] + end + + _ -> + [] + end) + + if token_instances_to_refetch != [] do + timeout = Application.get_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime)[:retry_timeout] + Process.send_after(__MODULE__, {:buffer, token_instances_to_refetch}, timeout) + end + end + + defp retry_some_instances(_, _, _), do: nil + + defp token_instance_to_retry_map(false, _token_instances), do: nil + + defp token_instance_to_retry_map(true, token_instances) do + token_instances + |> Enum.flat_map(fn + %{contract_address_hash: hash, token_id: token_id, retry?: true} -> + [{{hash.bytes, token_id}, true}] + + _ -> + [] + end) + |> Enum.into(%{}) + end + defp defaults do [ flush_interval: 100, diff --git a/apps/indexer/test/indexer/fetcher/token_instance/helper_test.exs b/apps/indexer/test/indexer/fetcher/token_instance/helper_test.exs index 2b250a6d59..3c13ce2abe 100644 --- a/apps/indexer/test/indexer/fetcher/token_instance/helper_test.exs +++ b/apps/indexer/test/indexer/fetcher/token_instance/helper_test.exs @@ -16,6 +16,7 @@ defmodule Indexer.Fetcher.TokenInstance.HelperTest do bypass = Bypass.open() on_exit(fn -> Bypass.down(bypass) end) + {:ok, bypass: bypass} end diff --git a/apps/indexer/test/indexer/fetcher/token_instance/realtime_test.exs b/apps/indexer/test/indexer/fetcher/token_instance/realtime_test.exs new file mode 100644 index 0000000000..542b2954ad --- /dev/null +++ b/apps/indexer/test/indexer/fetcher/token_instance/realtime_test.exs @@ -0,0 +1,123 @@ +defmodule Indexer.Fetcher.TokenInstance.RealtimeTest do + use EthereumJSONRPC.Case + use Explorer.DataCase + + import Mox + + alias Explorer.Repo + alias Explorer.Chain.Token.Instance + alias Indexer.Fetcher.TokenInstance.Realtime, as: TokenInstanceRealtime + alias Plug.Conn + + setup :verify_on_exit! + setup :set_mox_global + + describe "Check how works retry in realtime" do + setup do + config = Application.get_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime) + new_config = config |> Keyword.put(:retry_with_cooldown?, true) |> Keyword.put(:retry_timeout, 100) + + Application.put_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime, new_config) + + on_exit(fn -> + Application.put_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime, config) + end) + + :ok + end + + test "retry once after timeout" do + bypass = Bypass.open() + + [] + |> TokenInstanceRealtime.Supervisor.child_spec() + |> ExUnit.Callbacks.start_supervised!() + + json = """ + { + "name": "name" + } + """ + + encoded_url = + "0x" <> + (ABI.TypeEncoder.encode(["http://localhost:#{bypass.port}/api/card/{id}"], %ABI.FunctionSelector{ + function: nil, + types: [ + :string + ] + }) + |> Base.encode16(case: :lower)) + + EthereumJSONRPC.Mox + |> expect(:json_rpc, fn [ + %{ + id: 0, + jsonrpc: "2.0", + method: "eth_call", + params: [ + %{ + data: + "0x0e89341c0000000000000000000000000000000000000000000000000000000000000309", + to: "0x5caebd3b32e210e85ce3e9d51638b9c445481567" + }, + "latest" + ] + } + ], + _options -> + {:ok, + [ + %{ + id: 0, + jsonrpc: "2.0", + result: encoded_url + } + ]} + end) + + Bypass.expect_once( + bypass, + "GET", + "/api/card/0000000000000000000000000000000000000000000000000000000000000309", + fn conn -> + Conn.resp(conn, 404, "Not found") + end + ) + + Bypass.expect_once( + bypass, + "GET", + "/api/card/0000000000000000000000000000000000000000000000000000000000000309", + fn conn -> + Conn.resp(conn, 200, json) + end + ) + + token = + insert(:token, + contract_address: build(:address, hash: "0x5caebd3b32e210e85ce3e9d51638b9c445481567"), + type: "ERC-1155" + ) + + insert(:token_instance, + token_id: 777, + token_contract_address_hash: token.contract_address_hash, + metadata: nil, + error: nil + ) + + TokenInstanceRealtime.async_fetch([ + %{token_contract_address_hash: token.contract_address_hash, token_ids: [Decimal.new(777)]} + ]) + + :timer.sleep(150) + + [instance] = Repo.all(Instance) + + assert is_nil(instance.error) + assert instance.metadata == %{"name" => "name"} + Bypass.down(bypass) + end + end +end diff --git a/config/runtime.exs b/config/runtime.exs index e56f62b810..bd08ca5b8b 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -728,7 +728,9 @@ config :indexer, Indexer.Fetcher.TokenInstance.Retry, config :indexer, Indexer.Fetcher.TokenInstance.Realtime, concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY", 10), - batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE", 1) + batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE", 1), + retry_with_cooldown?: ConfigHelper.parse_bool_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_RETRY_ENABLED"), + retry_timeout: ConfigHelper.parse_time_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_RETRY_TIMEOUT", "5s") config :indexer, Indexer.Fetcher.TokenInstance.Sanitize, concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY", 10),