Merge pull request #960 from poanetwork/frg-improve-token-balances-fetcher

Improve token balances fetcher
pull/987/head
Luke Imhoff 6 years ago committed by GitHub
commit 6a4d69d383
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      apps/indexer/lib/indexer/token_balance/fetcher.ex
  2. 43
      apps/indexer/lib/indexer/token_balances.ex
  3. 4
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs
  4. 6
      apps/indexer/test/indexer/token_balance/fetcher_test.exs
  5. 33
      apps/indexer/test/indexer/token_balances_test.exs

@ -7,18 +7,18 @@ defmodule Indexer.TokenBalance.Fetcher do
alias Indexer.{BufferedTask, TokenBalances}
alias Explorer.Chain
alias Explorer.Chain.{Hash, Address.TokenBalance}
alias Explorer.Chain.Hash
@behaviour BufferedTask
@defaults [
flush_interval: 300,
max_batch_size: 1,
max_batch_size: 100,
max_concurrency: 10,
task_supervisor: Indexer.TokenBalance.TaskSupervisor
]
@spec async_fetch([%TokenBalance{}]) :: :ok
@spec async_fetch([]) :: :ok
def async_fetch(token_balances) do
formatted_params = Enum.map(token_balances, &entry/1)
BufferedTask.buffer(__MODULE__, formatted_params, :infinity)
@ -91,7 +91,7 @@ defmodule Indexer.TokenBalance.Fetcher do
end
end
defp entry(%TokenBalance{
defp entry(%{
token_contract_address_hash: token_contract_address_hash,
address_hash: address_hash,
block_number: block_number

@ -6,12 +6,15 @@ defmodule Indexer.TokenBalances do
require Logger
alias Explorer.Token.BalanceReader
alias Indexer.TokenBalance
alias Explorer.Chain
@doc """
Fetches TokenBalances from specific Addresses and Blocks in the Blockchain
Every `TokenBalance` is fetched asynchronously, but in case an exception is raised (such as a
timeout) during the RPC call the particular TokenBalance request is ignored.
timeout) during the RPC call the particular TokenBalance request is ignored and sent to
`TokenBalance.Fetcher` to be fetched again.
## token_balances
@ -22,13 +25,18 @@ defmodule Indexer.TokenBalances do
* `block_number` - The block number that the address_hash has the balance.
"""
def fetch_token_balances_from_blockchain(token_balances) do
result =
fetched_token_balances =
token_balances
|> Task.async_stream(&fetch_token_balance/1, on_timeout: :kill_task)
|> Stream.map(&format_task_results/1)
|> Enum.filter(&ignore_request_with_timeouts/1)
{:ok, result}
token_balances
|> MapSet.new()
|> unfetched_token_balances(fetched_token_balances)
|> schedule_token_balances
{:ok, fetched_token_balances}
end
defp fetch_token_balance(
@ -51,6 +59,17 @@ defmodule Indexer.TokenBalances do
Map.merge(token_balance, %{value: nil, value_fetched_at: nil, error: error_message})
end
defp schedule_token_balances(unfetched_token_balances) do
unfetched_token_balances
|> Enum.map(fn token_balance ->
{:ok, address_hash} = Chain.string_to_address_hash(token_balance.address_hash)
{:ok, token_hash} = Chain.string_to_address_hash(token_balance.token_contract_address_hash)
%{address_hash: address_hash, token_contract_address_hash: token_hash, block_number: token_balance.block_number}
end)
|> TokenBalance.Fetcher.async_fetch()
end
def format_task_results({:exit, :timeout}), do: {:error, :timeout}
def format_task_results({:ok, token_balance}), do: token_balance
@ -78,4 +97,22 @@ defmodule Indexer.TokenBalances do
)
end
end
@doc """
Finds the unfetched token balances given all token balances and the ones that were fetched.
This function compares the two given lists using the `MapSet.difference/2` and return the difference.
"""
def unfetched_token_balances(token_balances, fetched_token_balances) do
fetched_token_balances_set =
MapSet.new(fetched_token_balances, fn token_balance ->
%{
address_hash: token_balance.address_hash,
token_contract_address_hash: token_balance.token_contract_address_hash,
block_number: token_balance.block_number
}
end)
MapSet.difference(token_balances, fetched_token_balances_set)
end
end

@ -6,7 +6,7 @@ defmodule Indexer.Block.Realtime.FetcherTest do
alias Explorer.Chain
alias Explorer.Chain.Address
alias Indexer.{Sequence, Token}
alias Indexer.{Sequence, Token, TokenBalance}
alias Indexer.Block.{Realtime, Uncle}
@moduletag capture_log: true
@ -34,6 +34,8 @@ defmodule Indexer.Block.Realtime.FetcherTest do
json_rpc_named_arguments: core_json_rpc_named_arguments
}
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
%{block_fetcher: block_fetcher, json_rpc_named_arguments: core_json_rpc_named_arguments}
end

@ -29,6 +29,12 @@ defmodule Indexer.TokenBalance.FetcherTest do
end
describe "run/3" do
setup %{json_rpc_named_arguments: json_rpc_named_arguments} do
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
:ok
end
test "imports the given token balances" do
%Address.TokenBalance{
address_hash: %Hash{bytes: address_hash_bytes} = address_hash,

@ -5,6 +5,7 @@ defmodule Indexer.TokenBalancesTest do
doctest Indexer.TokenBalances
alias Indexer.TokenBalances
alias Indexer.TokenBalance
alias Explorer.Chain.Hash
import Mox
@ -14,6 +15,12 @@ defmodule Indexer.TokenBalancesTest do
setup :set_mox_global
describe "fetch_token_balances_from_blockchain/2" do
setup %{json_rpc_named_arguments: json_rpc_named_arguments} do
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
:ok
end
test "fetches balances of tokens given the address hash" do
address = insert(:address)
token = insert(:token, contract_address: build(:contract_address))
@ -149,6 +156,32 @@ defmodule Indexer.TokenBalancesTest do
end
end
describe "unfetched_token_balances/2" do
test "finds unfetched token balances given all token balances" do
address = insert(:address)
token = insert(:token, contract_address: build(:contract_address))
address_hash_string = Hash.to_string(address.hash)
token_balance_a = %{
token_contract_address_hash: Hash.to_string(token.contract_address_hash),
address_hash: address_hash_string,
block_number: 1_000
}
token_balance_b = %{
token_contract_address_hash: Hash.to_string(token.contract_address_hash),
address_hash: address_hash_string,
block_number: 1_001
}
token_balances = MapSet.new([token_balance_a, token_balance_b])
fetched_token_balances = MapSet.new([token_balance_a])
unfetched_token_balances = MapSet.new([token_balance_b])
assert TokenBalances.unfetched_token_balances(token_balances, fetched_token_balances) == unfetched_token_balances
end
end
defp get_balance_from_blockchain() do
expect(
EthereumJSONRPC.Mox,

Loading…
Cancel
Save