diff --git a/apps/indexer/lib/indexer/token_balance/fetcher.ex b/apps/indexer/lib/indexer/token_balance/fetcher.ex index ae1974e13b..5fe54583dd 100644 --- a/apps/indexer/lib/indexer/token_balance/fetcher.ex +++ b/apps/indexer/lib/indexer/token_balance/fetcher.ex @@ -20,8 +20,9 @@ defmodule Indexer.TokenBalance.Fetcher do ] @spec async_fetch([%TokenBalance{}]) :: :ok - def async_fetch(token_balances_params) do - BufferedTask.buffer(__MODULE__, token_balances_params, :infinity) + def async_fetch(token_balances) do + formatted_params = Enum.map(token_balances, &format_params/1) + BufferedTask.buffer(__MODULE__, formatted_params, :infinity) end @doc false @@ -45,34 +46,33 @@ defmodule Indexer.TokenBalance.Fetcher do @impl BufferedTask def init(initial, reducer, _) do {:ok, final} = - Chain.stream_unfetched_token_balances(initial, fn token_balances_params, acc -> - reducer.(token_balances_params, acc) + Chain.stream_unfetched_token_balances(initial, fn token_balance, acc -> + token_balance + |> format_params() + |> reducer.(acc) end) final end @impl BufferedTask - def run(token_balances, _retries, _json_rpc_named_arguments) do - Logger.debug(fn -> "fetching #{length(token_balances)} token balances" end) + def run(params_list, _retries, _json_rpc_named_arguments) do + Logger.debug(fn -> "fetching #{length(params_list)} token balances" end) result = - token_balances - |> fetch_from_blockchain - |> import_token_balances + params_list + |> fetch_from_blockchain() + |> import_token_balances() if result == :ok do :ok else - {:retry, token_balances} + {:retry, params_list} end end - def fetch_from_blockchain(token_balances) do - {:ok, token_balances} = - token_balances - |> Stream.map(&format_params/1) - |> TokenBalances.fetch_token_balances_from_blockchain() + def fetch_from_blockchain(params_list) do + {:ok, token_balances} = TokenBalances.fetch_token_balances_from_blockchain(params_list) TokenBalances.log_fetching_errors(__MODULE__, token_balances)