feat: Add optional retry of NFT metadata fetch in Indexer.Fetcher.Tok… (#10036)

* feat: Add optional retry of NFT metadata fetch in Indexer.Fetcher.TokenInstance.Realtime

* Rename env

* Process review comments

* Rename var
mf-only-health-webapp
nikitosing 6 months ago committed by GitHub
parent 2e4e2ec051
commit 379e81afc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      apps/indexer/lib/indexer/buffered_task.ex
  2. 8
      apps/indexer/lib/indexer/fetcher/token_instance/helper.ex
  3. 56
      apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex
  4. 1
      apps/indexer/test/indexer/fetcher/token_instance/helper_test.exs
  5. 123
      apps/indexer/test/indexer/fetcher/token_instance/realtime_test.exs
  6. 4
      config/runtime.exs

@ -277,6 +277,10 @@ defmodule Indexer.BufferedTask do
{:noreply, drop_task_and_retry(state, ref)} {:noreply, drop_task_and_retry(state, ref)}
end end
def handle_info({:buffer, entries}, state) do
{:noreply, buffer_entries(state, entries)}
end
def handle_call({:buffer, entries}, _from, state) do def handle_call({:buffer, entries}, _from, state) do
{:reply, :ok, buffer_entries(state, entries)} {:reply, :ok, buffer_entries(state, entries)}
end end

@ -286,7 +286,13 @@ defmodule Indexer.Fetcher.TokenInstance.Helper do
rescue rescue
error in Postgrex.Error -> error in Postgrex.Error ->
if retrying? do if retrying? do
Logger.warn(["Failed to upsert token instance: #{inspect(error)}"], fetcher: :token_instances) Logger.warn(
[
"Failed to upsert token instance: {#{to_string(token_contract_address_hash)}, #{token_id}}, error: #{inspect(error)}"
],
fetcher: :token_instances
)
nil nil
else else
token_id token_id

@ -9,6 +9,7 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do
import Indexer.Fetcher.TokenInstance.Helper import Indexer.Fetcher.TokenInstance.Helper
alias Explorer.Chain alias Explorer.Chain
alias Explorer.Chain.Token.Instance
alias Indexer.BufferedTask alias Indexer.BufferedTask
@behaviour BufferedTask @behaviour BufferedTask
@ -16,6 +17,8 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do
@default_max_batch_size 1 @default_max_batch_size 1
@default_max_concurrency 10 @default_max_concurrency 10
@errors_whitelisted_for_retry ["request error: 404", "request error: 500"]
@doc false @doc false
def child_spec([init_options, gen_server_options]) do def child_spec([init_options, gen_server_options]) do
merged_init_opts = merged_init_opts =
@ -33,11 +36,16 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do
@impl BufferedTask @impl BufferedTask
def run(token_instances, _) when is_list(token_instances) do def run(token_instances, _) when is_list(token_instances) do
retry? = Application.get_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime)[:retry_with_cooldown?]
token_instances_retry_map = token_instance_to_retry_map(retry?, token_instances)
token_instances token_instances
|> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} -> |> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} = instance ->
Chain.token_instance_with_unfetched_metadata?(token_id, hash) instance[:retry?] || Chain.token_instance_with_unfetched_metadata?(token_id, hash)
end) end)
|> batch_fetch_instances() |> batch_fetch_instances()
|> retry_some_instances(retry?, token_instances_retry_map)
:ok :ok
end end
@ -73,6 +81,50 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do
BufferedTask.buffer(__MODULE__, data) BufferedTask.buffer(__MODULE__, data)
end end
@spec retry_some_instances([map()], boolean(), map()) :: any()
defp retry_some_instances(token_instances, true, token_instances_retry_map) do
token_instances_to_refetch =
Enum.flat_map(token_instances, fn
{:ok, %Instance{metadata: nil, error: error} = instance}
when error in @errors_whitelisted_for_retry ->
if token_instances_retry_map[{instance.token_contract_address_hash.bytes, instance.token_id}] do
[]
else
[
%{
contract_address_hash: instance.token_contract_address_hash,
token_id: instance.token_id,
retry?: true
}
]
end
_ ->
[]
end)
if token_instances_to_refetch != [] do
timeout = Application.get_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime)[:retry_timeout]
Process.send_after(__MODULE__, {:buffer, token_instances_to_refetch}, timeout)
end
end
defp retry_some_instances(_, _, _), do: nil
defp token_instance_to_retry_map(false, _token_instances), do: nil
defp token_instance_to_retry_map(true, token_instances) do
token_instances
|> Enum.flat_map(fn
%{contract_address_hash: hash, token_id: token_id, retry?: true} ->
[{{hash.bytes, token_id}, true}]
_ ->
[]
end)
|> Enum.into(%{})
end
defp defaults do defp defaults do
[ [
flush_interval: 100, flush_interval: 100,

@ -16,6 +16,7 @@ defmodule Indexer.Fetcher.TokenInstance.HelperTest do
bypass = Bypass.open() bypass = Bypass.open()
on_exit(fn -> Bypass.down(bypass) end) on_exit(fn -> Bypass.down(bypass) end)
{:ok, bypass: bypass} {:ok, bypass: bypass}
end end

@ -0,0 +1,123 @@
defmodule Indexer.Fetcher.TokenInstance.RealtimeTest do
use EthereumJSONRPC.Case
use Explorer.DataCase
import Mox
alias Explorer.Repo
alias Explorer.Chain.Token.Instance
alias Indexer.Fetcher.TokenInstance.Realtime, as: TokenInstanceRealtime
alias Plug.Conn
setup :verify_on_exit!
setup :set_mox_global
describe "Check how works retry in realtime" do
setup do
config = Application.get_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime)
new_config = config |> Keyword.put(:retry_with_cooldown?, true) |> Keyword.put(:retry_timeout, 100)
Application.put_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime, new_config)
on_exit(fn ->
Application.put_env(:indexer, Indexer.Fetcher.TokenInstance.Realtime, config)
end)
:ok
end
test "retry once after timeout" do
bypass = Bypass.open()
[]
|> TokenInstanceRealtime.Supervisor.child_spec()
|> ExUnit.Callbacks.start_supervised!()
json = """
{
"name": "name"
}
"""
encoded_url =
"0x" <>
(ABI.TypeEncoder.encode(["http://localhost:#{bypass.port}/api/card/{id}"], %ABI.FunctionSelector{
function: nil,
types: [
:string
]
})
|> Base.encode16(case: :lower))
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: 0,
jsonrpc: "2.0",
method: "eth_call",
params: [
%{
data:
"0x0e89341c0000000000000000000000000000000000000000000000000000000000000309",
to: "0x5caebd3b32e210e85ce3e9d51638b9c445481567"
},
"latest"
]
}
],
_options ->
{:ok,
[
%{
id: 0,
jsonrpc: "2.0",
result: encoded_url
}
]}
end)
Bypass.expect_once(
bypass,
"GET",
"/api/card/0000000000000000000000000000000000000000000000000000000000000309",
fn conn ->
Conn.resp(conn, 404, "Not found")
end
)
Bypass.expect_once(
bypass,
"GET",
"/api/card/0000000000000000000000000000000000000000000000000000000000000309",
fn conn ->
Conn.resp(conn, 200, json)
end
)
token =
insert(:token,
contract_address: build(:address, hash: "0x5caebd3b32e210e85ce3e9d51638b9c445481567"),
type: "ERC-1155"
)
insert(:token_instance,
token_id: 777,
token_contract_address_hash: token.contract_address_hash,
metadata: nil,
error: nil
)
TokenInstanceRealtime.async_fetch([
%{token_contract_address_hash: token.contract_address_hash, token_ids: [Decimal.new(777)]}
])
:timer.sleep(150)
[instance] = Repo.all(Instance)
assert is_nil(instance.error)
assert instance.metadata == %{"name" => "name"}
Bypass.down(bypass)
end
end
end

@ -728,7 +728,9 @@ config :indexer, Indexer.Fetcher.TokenInstance.Retry,
config :indexer, Indexer.Fetcher.TokenInstance.Realtime, config :indexer, Indexer.Fetcher.TokenInstance.Realtime,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY", 10), concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY", 10),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE", 1) batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE", 1),
retry_with_cooldown?: ConfigHelper.parse_bool_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_RETRY_ENABLED"),
retry_timeout: ConfigHelper.parse_time_env_var("INDEXER_TOKEN_INSTANCE_REALTIME_RETRY_TIMEOUT", "5s")
config :indexer, Indexer.Fetcher.TokenInstance.Sanitize, config :indexer, Indexer.Fetcher.TokenInstance.Sanitize,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY", 10), concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY", 10),

Loading…
Cancel
Save