Add retry feature to the TokenBalance.Fetcher

As some token balances always will raise errors interacting with the
Smart Contract, we have to ignore them. Otherwise, we will accumulate a
lot of toke balances in the indexer memory that never will be fetched.
pull/1098/head
Felipe Renan 6 years ago
parent 19dd956bac
commit a79fa5b3e4
  1. 29
      apps/indexer/lib/indexer/token_balance/fetcher.ex
  2. 54
      apps/indexer/lib/indexer/token_balances.ex
  3. 52
      apps/indexer/test/indexer/token_balance/fetcher_test.exs

@ -18,6 +18,8 @@ defmodule Indexer.TokenBalance.Fetcher do
task_supervisor: Indexer.TokenBalance.TaskSupervisor
]
@max_retries 3
@spec async_fetch([]) :: :ok
def async_fetch(token_balances) do
formatted_params = Enum.map(token_balances, &entry/1)
@ -61,6 +63,7 @@ defmodule Indexer.TokenBalance.Fetcher do
result =
entries
|> Enum.map(&format_params/1)
|> Enum.map(&Map.put(&1, :retries_count, &1.retries_count + 1))
|> fetch_from_blockchain()
|> import_token_balances()
@ -72,7 +75,10 @@ defmodule Indexer.TokenBalance.Fetcher do
end
def fetch_from_blockchain(params_list) do
{:ok, token_balances} = TokenBalances.fetch_token_balances_from_blockchain(params_list)
{:ok, token_balances} =
params_list
|> Enum.filter(&(&1.retries_count <= @max_retries))
|> TokenBalances.fetch_token_balances_from_blockchain()
TokenBalances.log_fetching_errors(__MODULE__, token_balances)
@ -106,22 +112,27 @@ defmodule Indexer.TokenBalance.Fetcher do
|> Enum.uniq()
end
defp entry(%{
token_contract_address_hash: token_contract_address_hash,
address_hash: address_hash,
block_number: block_number
}) do
{address_hash.bytes, token_contract_address_hash.bytes, block_number}
defp entry(
%{
token_contract_address_hash: token_contract_address_hash,
address_hash: address_hash,
block_number: block_number
} = token_balance
) do
retries_count = Map.get(token_balance, :retries_count, 0)
{address_hash.bytes, token_contract_address_hash.bytes, block_number, retries_count}
end
defp format_params({address_hash_bytes, token_contract_address_hash_bytes, block_number}) do
defp format_params({address_hash_bytes, token_contract_address_hash_bytes, block_number, retries_count}) do
{:ok, token_contract_address_hash} = Hash.Address.cast(token_contract_address_hash_bytes)
{:ok, address_hash} = Hash.Address.cast(address_hash_bytes)
%{
token_contract_address_hash: to_string(token_contract_address_hash),
address_hash: to_string(address_hash),
block_number: block_number
block_number: block_number,
retries_count: retries_count
}
end
end

@ -25,14 +25,16 @@ defmodule Indexer.TokenBalances do
* `block_number` - The block number that the address_hash has the balance.
"""
def fetch_token_balances_from_blockchain(token_balances) do
fetched_token_balances =
requested_token_balances =
token_balances
|> Task.async_stream(&fetch_token_balance/1, on_timeout: :kill_task)
|> Stream.map(&format_task_results/1)
|> Enum.filter(&ignore_request_with_errors/1)
|> Enum.filter(&ignore_killed_task/1)
token_balances
|> MapSet.new()
fetched_token_balances = Enum.filter(requested_token_balances, &ignore_request_with_errors/1)
requested_token_balances
|> handle_killed_tasks(token_balances)
|> unfetched_token_balances(fetched_token_balances)
|> schedule_token_balances
@ -59,13 +61,19 @@ defmodule Indexer.TokenBalances do
Map.merge(token_balance, %{value: nil, value_fetched_at: nil, error: error_message})
end
defp schedule_token_balances([]), do: nil
defp schedule_token_balances(unfetched_token_balances) do
unfetched_token_balances
|> Enum.map(fn token_balance ->
{:ok, address_hash} = Chain.string_to_address_hash(token_balance.address_hash)
{:ok, token_hash} = Chain.string_to_address_hash(token_balance.token_contract_address_hash)
%{address_hash: address_hash, token_contract_address_hash: token_hash, block_number: token_balance.block_number}
Map.merge(token_balance, %{
address_hash: address_hash,
token_contract_address_hash: token_hash,
block_number: token_balance.block_number
})
end)
|> TokenBalance.Fetcher.async_fetch()
end
@ -73,10 +81,18 @@ defmodule Indexer.TokenBalances do
defp format_task_results({:exit, :timeout}), do: {:error, :timeout}
defp format_task_results({:ok, token_balance}), do: token_balance
defp ignore_request_with_errors({:error, :timeout}), do: false
defp ignore_killed_task({:error, :timeout}), do: false
defp ignore_killed_task(_token_balance), do: true
defp ignore_request_with_errors(%{value: nil, value_fetched_at: nil, error: _error}), do: false
defp ignore_request_with_errors(_token_balance), do: true
defp handle_killed_tasks(requested_token_balances, token_balances) do
token_balances
|> Enum.reject(&present?(requested_token_balances, &1))
|> Enum.map(&Map.merge(&1, %{value: nil, value_fetched_at: nil, error: :timeout}))
end
def log_fetching_errors(from, token_balances_params) do
error_messages =
token_balances_params
@ -102,18 +118,24 @@ defmodule Indexer.TokenBalances do
@doc """
Finds the unfetched token balances given all token balances and the ones that were fetched.
This function compares the two given lists using the `MapSet.difference/2` and return the difference.
* token_balances - all token balances that were received in this module.
* fetched_token_balances - only the token balances that were fetched without error from the Smart contract
This function compares the two given lists and return the difference.
"""
def unfetched_token_balances(token_balances, fetched_token_balances) do
fetched_token_balances_set =
MapSet.new(fetched_token_balances, fn token_balance ->
%{
address_hash: token_balance.address_hash,
token_contract_address_hash: token_balance.token_contract_address_hash,
block_number: token_balance.block_number
}
end)
if Enum.count(token_balances) == Enum.count(fetched_token_balances) do
[]
else
Enum.reject(token_balances, &present?(fetched_token_balances, &1))
end
end
MapSet.difference(token_balances, fetched_token_balances_set)
defp present?(list, token_balance) do
Enum.any?(list, fn item ->
token_balance.address_hash == item.address_hash &&
token_balance.token_contract_address_hash == item.token_contract_address_hash &&
token_balance.block_number == item.block_number
end)
end
end

@ -23,7 +23,7 @@ defmodule Indexer.TokenBalance.FetcherTest do
insert(:token_balance, value_fetched_at: DateTime.utc_now())
assert TokenBalance.Fetcher.init([], &[&1 | &2], nil) == [
{address_hash_bytes, token_contract_address_hash_bytes, block_number}
{address_hash_bytes, token_contract_address_hash_bytes, block_number, 0}
]
end
end
@ -57,14 +57,60 @@ defmodule Indexer.TokenBalance.FetcherTest do
end
)
assert TokenBalance.Fetcher.run([{address_hash_bytes, token_contract_address_hash_bytes, block_number}], nil) ==
:ok
assert TokenBalance.Fetcher.run(
[{address_hash_bytes, token_contract_address_hash_bytes, block_number, 0}],
nil
) == :ok
token_balance_updated = Explorer.Repo.get_by(Address.TokenBalance, address_hash: address_hash)
assert token_balance_updated.value == Decimal.new(1_000_000_000_000_000_000_000_000)
assert token_balance_updated.value_fetched_at != nil
end
test "does not try to fetch the token balance again if the retry is over" do
max_retries = 3
Application.put_env(:indexer, :token_balance_max_retries, max_retries)
token_balance_a = insert(:token_balance, value_fetched_at: nil, value: nil)
token_balance_b = insert(:token_balance, value_fetched_at: nil, value: nil)
expect(
EthereumJSONRPC.Mox,
:json_rpc,
1,
fn [%{id: _, method: _, params: [%{data: _, to: _}, _]}], _options ->
{:ok,
[
%{
error: %{code: -32015, data: "Reverted 0x", message: "VM execution error."},
id: "balanceOf",
jsonrpc: "2.0"
}
]}
end
)
token_balances = [
{
token_balance_a.address_hash.bytes,
token_balance_a.token_contract_address_hash.bytes,
token_balance_a.block_number,
# this token balance must be ignored
max_retries
},
{
token_balance_b.address_hash.bytes,
token_balance_b.token_contract_address_hash.bytes,
token_balance_b.block_number,
# this token balance still have to be retried
max_retries - 2
}
]
assert TokenBalance.Fetcher.run(token_balances, nil) == :ok
end
end
describe "import_token_balances/1" do

Loading…
Cancel
Save