Merge pull request #1098 from poanetwork/frg-improve-token-balance-fetcher

Improve TokenBalance.Fetcher to ignore the ones that always raise errors
pull/1100/head
Felipe Renan 6 years ago committed by GitHub
commit d3b0824f18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  2. 52
      apps/indexer/lib/indexer/token_balance/fetcher.ex
  3. 68
      apps/indexer/lib/indexer/token_balances.ex
  4. 52
      apps/indexer/test/indexer/token_balance/fetcher_test.exs
  5. 28
      apps/indexer/test/indexer/token_balances_test.exs

@ -100,8 +100,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
addresses_params: internal_transactions_addresses_params,
balances_params: address_coin_balances_params
}),
{:ok, address_token_balances} <-
TokenBalances.fetch_token_balances_from_blockchain(address_token_balances_params),
{:ok, address_token_balances} <- fetch_token_balances(address_token_balances_params),
chain_import_options =
options
|> Map.drop(@import_options)
@ -112,7 +111,6 @@ defmodule Indexer.Block.Realtime.Fetcher do
|> put_in([Access.key(:address_token_balances), :params], address_token_balances)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params),
{:ok, imported} = ok <- Chain.import(chain_import_options) do
TokenBalances.log_fetching_errors(__MODULE__, address_token_balances)
async_import_remaining_block_data(imported)
ok
end
@ -300,4 +298,10 @@ defmodule Indexer.Block.Realtime.Fetcher do
%{hash_data: address_hash, block_quantity: integer_to_quantity(block_number)}
end)
end
defp fetch_token_balances(address_token_balances_params) do
address_token_balances_params
|> MapSet.to_list()
|> TokenBalances.fetch_token_balances_from_blockchain()
end
end

@ -1,6 +1,17 @@
defmodule Indexer.TokenBalance.Fetcher do
@moduledoc """
Fetches the token balances values.
Fetches token balances and send the ones that were fetched to be imported in `Address.CurrentTokenBalance` and
`Address.TokenBalance`.
The module responsible for fetching token balances in the Smart Contract is the `Indexer.TokenBalances`. this module
only prepare the params, send they to `Indexer.TokenBalances` and relies on its return.
It behaves as a `BufferedTask`, so we can configure the `max_batch_size` and the `max_concurrency` to control how many
token balances will be fetched at the same time. Be aware that, for each token balance the indexer will make a request
to the Smart Contract.
Also, this module set a `retries_count` for each token balance and increment this number to avoid fetching the ones
that always raise errors interacting with the Smart Contract.
"""
require Logger
@ -18,6 +29,8 @@ defmodule Indexer.TokenBalance.Fetcher do
task_supervisor: Indexer.TokenBalance.TaskSupervisor
]
@max_retries 3
@spec async_fetch([]) :: :ok
def async_fetch(token_balances) do
formatted_params = Enum.map(token_balances, &entry/1)
@ -54,13 +67,18 @@ defmodule Indexer.TokenBalance.Fetcher do
final
end
@doc """
Fetches the given entries (token_balances) from the Smart Contract and import they in our database.
It also increments the `retries_count` to avoid fetching token balances that always raise errors
when reading their balance in the Smart Contract.
"""
@impl BufferedTask
def run(entries, _json_rpc_named_arguments) do
Logger.debug(fn -> "fetching #{length(entries)} token balances" end)
result =
entries
|> Enum.map(&format_params/1)
|> Enum.map(&Map.put(&1, :retries_count, &1.retries_count + 1))
|> fetch_from_blockchain()
|> import_token_balances()
@ -72,9 +90,10 @@ defmodule Indexer.TokenBalance.Fetcher do
end
def fetch_from_blockchain(params_list) do
{:ok, token_balances} = TokenBalances.fetch_token_balances_from_blockchain(params_list)
TokenBalances.log_fetching_errors(__MODULE__, token_balances)
{:ok, token_balances} =
params_list
|> Enum.filter(&(&1.retries_count <= @max_retries))
|> TokenBalances.fetch_token_balances_from_blockchain()
token_balances
end
@ -106,22 +125,27 @@ defmodule Indexer.TokenBalance.Fetcher do
|> Enum.uniq()
end
defp entry(%{
token_contract_address_hash: token_contract_address_hash,
address_hash: address_hash,
block_number: block_number
}) do
{address_hash.bytes, token_contract_address_hash.bytes, block_number}
defp entry(
%{
token_contract_address_hash: token_contract_address_hash,
address_hash: address_hash,
block_number: block_number
} = token_balance
) do
retries_count = Map.get(token_balance, :retries_count, 0)
{address_hash.bytes, token_contract_address_hash.bytes, block_number, retries_count}
end
defp format_params({address_hash_bytes, token_contract_address_hash_bytes, block_number}) do
defp format_params({address_hash_bytes, token_contract_address_hash_bytes, block_number, retries_count}) do
{:ok, token_contract_address_hash} = Hash.Address.cast(token_contract_address_hash_bytes)
{:ok, address_hash} = Hash.Address.cast(address_hash_bytes)
%{
token_contract_address_hash: to_string(token_contract_address_hash),
address_hash: to_string(address_hash),
block_number: block_number
block_number: block_number,
retries_count: retries_count
}
end
end

@ -24,15 +24,21 @@ defmodule Indexer.TokenBalances do
* `address_hash` - The address_hash that we want to know the balance.
* `block_number` - The block number that the address_hash has the balance.
"""
def fetch_token_balances_from_blockchain([]), do: {:ok, []}
def fetch_token_balances_from_blockchain(token_balances) do
fetched_token_balances =
Logger.debug(fn -> "fetching #{Enum.count(token_balances)} token balances" end)
requested_token_balances =
token_balances
|> Task.async_stream(&fetch_token_balance/1, on_timeout: :kill_task)
|> Stream.map(&format_task_results/1)
|> Enum.filter(&ignore_request_with_errors/1)
|> Enum.filter(&ignore_killed_task/1)
token_balances
|> MapSet.new()
fetched_token_balances = Enum.filter(requested_token_balances, &ignore_request_with_errors/1)
requested_token_balances
|> handle_killed_tasks(token_balances)
|> unfetched_token_balances(fetched_token_balances)
|> schedule_token_balances
@ -59,13 +65,23 @@ defmodule Indexer.TokenBalances do
Map.merge(token_balance, %{value: nil, value_fetched_at: nil, error: error_message})
end
defp schedule_token_balances([]), do: nil
defp schedule_token_balances(unfetched_token_balances) do
Logger.debug(fn -> "#{Enum.count(unfetched_token_balances)} token balances will be retried" end)
log_fetching_errors(unfetched_token_balances)
unfetched_token_balances
|> Enum.map(fn token_balance ->
{:ok, address_hash} = Chain.string_to_address_hash(token_balance.address_hash)
{:ok, token_hash} = Chain.string_to_address_hash(token_balance.token_contract_address_hash)
%{address_hash: address_hash, token_contract_address_hash: token_hash, block_number: token_balance.block_number}
Map.merge(token_balance, %{
address_hash: address_hash,
token_contract_address_hash: token_hash,
block_number: token_balance.block_number
})
end)
|> TokenBalance.Fetcher.async_fetch()
end
@ -73,24 +89,32 @@ defmodule Indexer.TokenBalances do
defp format_task_results({:exit, :timeout}), do: {:error, :timeout}
defp format_task_results({:ok, token_balance}), do: token_balance
defp ignore_request_with_errors({:error, :timeout}), do: false
defp ignore_killed_task({:error, :timeout}), do: false
defp ignore_killed_task(_token_balance), do: true
defp ignore_request_with_errors(%{value: nil, value_fetched_at: nil, error: _error}), do: false
defp ignore_request_with_errors(_token_balance), do: true
def log_fetching_errors(from, token_balances_params) do
defp handle_killed_tasks(requested_token_balances, token_balances) do
token_balances
|> Enum.reject(&present?(requested_token_balances, &1))
|> Enum.map(&Map.merge(&1, %{value: nil, value_fetched_at: nil, error: :timeout}))
end
def log_fetching_errors(token_balances_params) do
error_messages =
token_balances_params
|> Stream.filter(fn token_balance -> token_balance.error != nil end)
|> Enum.map(fn token_balance ->
"<address_hash: #{token_balance.address_hash}, " <>
"contract_address_hash: #{token_balance.token_contract_address_hash}, " <>
"block_number: #{token_balance.block_number}, " <> "error: #{token_balance.error}> \n"
"block_number: #{token_balance.block_number}, " <>
"error: #{token_balance.error}>, " <> "retried: #{Map.get(token_balance, :retries_count, 1)} times\n"
end)
if Enum.any?(error_messages) do
Logger.debug(
[
"<#{from}> ",
"Errors while fetching TokenBalances through Contract interaction: \n",
error_messages
],
@ -102,18 +126,24 @@ defmodule Indexer.TokenBalances do
@doc """
Finds the unfetched token balances given all token balances and the ones that were fetched.
This function compares the two given lists using the `MapSet.difference/2` and return the difference.
* token_balances - all token balances that were received in this module.
* fetched_token_balances - only the token balances that were fetched without error from the Smart contract
This function compares the two given lists and return the difference.
"""
def unfetched_token_balances(token_balances, fetched_token_balances) do
fetched_token_balances_set =
MapSet.new(fetched_token_balances, fn token_balance ->
%{
address_hash: token_balance.address_hash,
token_contract_address_hash: token_balance.token_contract_address_hash,
block_number: token_balance.block_number
}
end)
if Enum.count(token_balances) == Enum.count(fetched_token_balances) do
[]
else
Enum.reject(token_balances, &present?(fetched_token_balances, &1))
end
end
MapSet.difference(token_balances, fetched_token_balances_set)
defp present?(list, token_balance) do
Enum.any?(list, fn item ->
token_balance.address_hash == item.address_hash &&
token_balance.token_contract_address_hash == item.token_contract_address_hash &&
token_balance.block_number == item.block_number
end)
end
end

@ -23,7 +23,7 @@ defmodule Indexer.TokenBalance.FetcherTest do
insert(:token_balance, value_fetched_at: DateTime.utc_now())
assert TokenBalance.Fetcher.init([], &[&1 | &2], nil) == [
{address_hash_bytes, token_contract_address_hash_bytes, block_number}
{address_hash_bytes, token_contract_address_hash_bytes, block_number, 0}
]
end
end
@ -57,14 +57,60 @@ defmodule Indexer.TokenBalance.FetcherTest do
end
)
assert TokenBalance.Fetcher.run([{address_hash_bytes, token_contract_address_hash_bytes, block_number}], nil) ==
:ok
assert TokenBalance.Fetcher.run(
[{address_hash_bytes, token_contract_address_hash_bytes, block_number, 0}],
nil
) == :ok
token_balance_updated = Explorer.Repo.get_by(Address.TokenBalance, address_hash: address_hash)
assert token_balance_updated.value == Decimal.new(1_000_000_000_000_000_000_000_000)
assert token_balance_updated.value_fetched_at != nil
end
test "does not try to fetch the token balance again if the retry is over" do
max_retries = 3
Application.put_env(:indexer, :token_balance_max_retries, max_retries)
token_balance_a = insert(:token_balance, value_fetched_at: nil, value: nil)
token_balance_b = insert(:token_balance, value_fetched_at: nil, value: nil)
expect(
EthereumJSONRPC.Mox,
:json_rpc,
1,
fn [%{id: _, method: _, params: [%{data: _, to: _}, _]}], _options ->
{:ok,
[
%{
error: %{code: -32015, data: "Reverted 0x", message: "VM execution error."},
id: "balanceOf",
jsonrpc: "2.0"
}
]}
end
)
token_balances = [
{
token_balance_a.address_hash.bytes,
token_balance_a.token_contract_address_hash.bytes,
token_balance_a.block_number,
# this token balance must be ignored
max_retries
},
{
token_balance_b.address_hash.bytes,
token_balance_b.token_contract_address_hash.bytes,
token_balance_b.block_number,
# this token balance still have to be retried
max_retries - 2
}
]
assert TokenBalance.Fetcher.run(token_balances, nil) == :ok
end
end
describe "import_token_balances/1" do

@ -53,7 +53,8 @@ defmodule Indexer.TokenBalancesTest do
%{
address_hash: to_string(address.hash),
block_number: 1_000,
token_contract_address_hash: to_string(token.contract_address_hash)
token_contract_address_hash: to_string(token.contract_address_hash),
retries_count: 1
}
]
@ -71,12 +72,14 @@ defmodule Indexer.TokenBalancesTest do
%{
token_contract_address_hash: Hash.to_string(token.contract_address_hash),
address_hash: address_hash_string,
block_number: 1_000
block_number: 1_000,
retries_count: 1
},
%{
token_contract_address_hash: Hash.to_string(token.contract_address_hash),
address_hash: address_hash_string,
block_number: 1_001
block_number: 1_001,
retries_count: 1
}
]
@ -93,31 +96,28 @@ defmodule Indexer.TokenBalancesTest do
test "logs the given from argument in final message" do
token_balance_params_with_error = Map.put(build(:token_balance), :error, "Error")
params = [token_balance_params_with_error]
from = "Tests"
log_message_response =
capture_log(fn ->
TokenBalances.log_fetching_errors(from, params)
TokenBalances.log_fetching_errors(params)
end)
assert log_message_response =~ "<Tests"
assert log_message_response =~ "Error"
end
test "log when there is a token_balance param with errors" do
from = "Tests"
token_balance_params_with_error = Map.put(build(:token_balance), :error, "Error")
token_balance_params_with_error = Map.merge(build(:token_balance), %{error: "Error", retries_count: 1})
params = [token_balance_params_with_error]
log_message_response =
capture_log(fn ->
TokenBalances.log_fetching_errors(from, params)
TokenBalances.log_fetching_errors(params)
end)
assert log_message_response =~ "Error"
end
test "log multiple token balances params with errors" do
from = "Tests"
error_1 = "Error"
error_2 = "BadGateway"
@ -128,7 +128,7 @@ defmodule Indexer.TokenBalancesTest do
log_message_response =
capture_log(fn ->
TokenBalances.log_fetching_errors(from, params)
TokenBalances.log_fetching_errors(params)
end)
assert log_message_response =~ error_1
@ -136,13 +136,12 @@ defmodule Indexer.TokenBalancesTest do
end
test "doesn't log when there aren't errors after fetching token balances" do
from = "Tests"
token_balance_params = Map.put(build(:token_balance), :error, nil)
params = [token_balance_params]
log_message_response =
capture_log(fn ->
TokenBalances.log_fetching_errors(from, params)
TokenBalances.log_fetching_errors(params)
end)
assert log_message_response == ""
@ -169,9 +168,8 @@ defmodule Indexer.TokenBalancesTest do
token_balances = MapSet.new([token_balance_a, token_balance_b])
fetched_token_balances = MapSet.new([token_balance_a])
unfetched_token_balances = MapSet.new([token_balance_b])
assert TokenBalances.unfetched_token_balances(token_balances, fetched_token_balances) == unfetched_token_balances
assert TokenBalances.unfetched_token_balances(token_balances, fetched_token_balances) == [token_balance_b]
end
end

Loading…
Cancel
Save