Merge pull request #717 from poanetwork/ams-fix-token-balance-fetcher

Increase Indexer.TokenBalance.Fetcher timeout and add treatment error
pull/733/head
Andrew Cravenho 6 years ago committed by GitHub
commit 2a577161ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      apps/explorer/lib/explorer/chain/import.ex
  2. 3
      apps/indexer/lib/indexer/internal_transaction/fetcher.ex
  3. 34
      apps/indexer/lib/indexer/token_balance/fetcher.ex
  4. 22
      apps/indexer/test/indexer/token_balance/fetcher_test.exs

@ -195,6 +195,9 @@ defmodule Explorer.Chain.Import do
* `:timeout` - the timeout for inserting all transactions found in the params lists across all
types. Defaults to `#{@insert_transactions_timeout}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`.
* `:token_balances`
* `:params` - `list` of params for `Explorer.Chain.TokenBalance.changeset/2`
* `:timeout` - the timeout for `Repo.transaction`. Defaults to `#{@transaction_timeout}` milliseconds.
"""
@spec all(all_options()) :: all_result()
def all(options) when is_map(options) do

@ -102,7 +102,8 @@ defmodule Indexer.InternalTransaction.Fetcher do
with {:ok, %{addresses: address_hashes}} <-
Chain.import(%{
addresses: %{params: addresses_params},
internal_transactions: %{params: internal_transactions_params}
internal_transactions: %{params: internal_transactions_params},
timeout: :infinity
}) do
address_hashes
|> Enum.map(fn address_hash ->

@ -3,6 +3,8 @@ defmodule Indexer.TokenBalance.Fetcher do
Fetches the token balances values.
"""
require Logger
alias Indexer.{BufferedTask, TokenBalances}
alias Explorer.Chain
alias Explorer.Chain.{Hash, Address.TokenBalance}
@ -17,8 +19,9 @@ defmodule Indexer.TokenBalance.Fetcher do
task_supervisor: Indexer.TokenBalance.TaskSupervisor
]
@spec async_fetch([%TokenBalance{}]) :: :ok
def async_fetch(token_balances_params) do
BufferedTask.buffer(__MODULE__, token_balances_params)
BufferedTask.buffer(__MODULE__, token_balances_params, :infinity)
end
@doc false
@ -51,14 +54,39 @@ defmodule Indexer.TokenBalance.Fetcher do
@impl BufferedTask
def run(token_balances, _retries, _json_rpc_named_arguments) do
Logger.debug(fn -> "fetching #{length(token_balances)} token balances" end)
result =
token_balances
|> fetch_from_blockchain
|> import_token_balances
if result == :ok do
:ok
else
{:retry, token_balances}
end
end
def fetch_from_blockchain(token_balances) do
{:ok, token_balances_params} =
token_balances
|> Stream.map(&format_params/1)
|> TokenBalances.fetch_token_balances_from_blockchain()
{:ok, %{token_balances: [_]}} = Chain.import(%{token_balances: %{params: token_balances_params}})
token_balances_params
end
def import_token_balances(token_balances_params) do
case Chain.import(%{token_balances: %{params: token_balances_params}, timeout: :infinity}) do
{:ok, _} ->
:ok
:ok
{:error, reason} ->
Logger.debug(fn -> "failed to import #{length(token_balances_params)} token balances, #{inspect(reason)}" end)
:error
end
end
defp format_params(%TokenBalance{

@ -50,4 +50,26 @@ defmodule Indexer.TokenBalance.FetcherTest do
assert token_balance_updated.value_fetched_at != nil
end
end
describe "import_token_balances/1" do
test "ignores when it receives a empty list" do
assert TokenBalance.Fetcher.import_token_balances([]) == :ok
end
test "returns :error when the token balances has invalid data" do
token_balance = insert(:token_balance, value_fetched_at: nil, value: nil)
token_balances_params = [
%{
address_hash: nil,
block_number: nil,
token_contract_address_hash: to_string(token_balance.token_contract_address_hash),
value: nil,
value_fetched_at: nil
}
]
assert TokenBalance.Fetcher.import_token_balances(token_balances_params) == :error
end
end
end

Loading…
Cancel
Save