Indexer.CoinBalance.Fetcher bytes and integer instead of string in

buffers

```elixir
  test "memory" do
    CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: [])
    pid = Process.whereis(CoinBalance.Fetcher)

    Stream.repeatedly(fn -> %{address_hash: address_hash(), block_number: block_number()} end)
    |> Enum.take(100)
    |> CoinBalance.Fetcher.async_fetch_balances()

    Process.info(pid, :memory) |> elem(1) |> IO.inspect(label: "BEFORE GC")

    :erlang.garbage_collect(pid)

    Process.info(pid, :memory) |> elem(1) |> IO.inspect(label: "AFTER GC")
  end
```

| Balances | Garbage Collection | String (bytes) | Bytes and Integer (bytes) | Bytes / String (%) |
|----------|--------------------|----------------|---------------------------|--------------------|
| 100      | Before             | 55,244         | 34,548                    | 62.54              |
| 100      | After              | 21,764         | 13,860                    | 63.68              |
| 1,000    | Before             | 601,972        | 372,444                   | 61.87              |
| 1,000    | After              | 230,596        | 142,924                   | 61.98              |
| 10,000   | Before             | 4,119,668      | 2,546,460                 | 61.81              |
| 10,000   | After              | 1,574,172      | 973,260                   | 61.83              |
| 100,000  | Before             | 16,586,644     | 11,519,724                | 69.45              |
| 100,000  | After              | 16,583,636     | 11,516,716                | 69.45              |
pull/917/head
Luke Imhoff 6 years ago
parent 77d32df7c2
commit f04ea541d0
  1. 4
      apps/indexer/lib/indexer/block/fetcher.ex
  2. 28
      apps/indexer/lib/indexer/coin_balance/fetcher.ex
  3. 19
      apps/indexer/test/indexer/coin_balance/fetcher_test.exs

@ -5,7 +5,7 @@ defmodule Indexer.Block.Fetcher do
require Logger
alias Explorer.Chain.{Block, Import}
alias Explorer.Chain.{Address, Block, Import}
alias Indexer.{CoinBalance, AddressExtraction, Token, TokenTransfers}
alias Indexer.Address.{CoinBalances, TokenBalances}
alias Indexer.Block.Fetcher.Receipts
@ -164,7 +164,7 @@ defmodule Indexer.Block.Fetcher do
address_hash_to_fetched_balance_block_number: address_hash_to_block_number
}) do
addresses
|> Enum.map(fn address_hash ->
|> Enum.map(fn %Address{hash: address_hash} ->
block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash))
%{address_hash: address_hash, block_number: block_number}
end)

@ -29,9 +29,9 @@ defmodule Indexer.CoinBalance.Fetcher do
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}
]) :: :ok
def async_fetch_balances(balance_fields) when is_list(balance_fields) do
params_list = Enum.map(balance_fields, &balance_fields_to_params/1)
entries = Enum.map(balance_fields, &entry/1)
BufferedTask.buffer(__MODULE__, params_list)
BufferedTask.buffer(__MODULE__, entries)
end
@doc false
@ -57,7 +57,7 @@ defmodule Indexer.CoinBalance.Fetcher do
{:ok, final} =
Chain.stream_unfetched_balances(initial, fn address_fields, acc ->
address_fields
|> balance_fields_to_params()
|> entry()
|> reducer.(acc)
end)
@ -65,14 +65,17 @@ defmodule Indexer.CoinBalance.Fetcher do
end
@impl BufferedTask
def run(params_list, _retries, json_rpc_named_arguments) do
def run(entries, _retries, json_rpc_named_arguments) do
# the same address may be used more than once in the same block, but we only want one `Balance` for a given
# `{address, block}`, so take unique params only
unique_params_list = Enum.uniq(params_list)
unique_entries = Enum.uniq(entries)
Logger.debug(fn -> "fetching #{length(unique_params_list)} balances" end)
Logger.debug(fn -> "fetching #{length(unique_entries)} balances" end)
case EthereumJSONRPC.fetch_balances(unique_params_list, json_rpc_named_arguments) do
unique_entries
|> Enum.map(&entry_to_params/1)
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments)
|> case do
{:ok, balances_params} ->
value_fetched_at = DateTime.utc_now()
@ -89,16 +92,21 @@ defmodule Indexer.CoinBalance.Fetcher do
:ok
{:error, reason} ->
Logger.debug(fn -> "failed to fetch #{length(unique_params_list)} balances, #{inspect(reason)}" end)
{:retry, unique_params_list}
Logger.debug(fn -> "failed to fetch #{length(unique_entries)} balances, #{inspect(reason)}" end)
{:retry, unique_entries}
end
end
defp balance_fields_to_params(%{address_hash: address_hash, block_number: block_number})
defp entry_to_params(%{address_hash_bytes: address_hash_bytes, block_number: block_number})
when is_integer(block_number) do
{:ok, address_hash} = Hash.Address.cast(address_hash_bytes)
%{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address_hash)}
end
defp entry(%{address_hash: %Hash{bytes: address_hash_bytes}, block_number: block_number}) do
%{address_hash_bytes: address_hash_bytes, block_number: block_number}
end
# We want to record all historical balances for an address, but have the address itself have balance from the
# `Balance` with the greatest block_number for that address.
def balances_params_to_address_params(balances_params) do

@ -251,9 +251,10 @@ defmodule Indexer.CoinBalance.FetcherTest do
end)
end
params_list = Enum.map(block_quantities, &%{block_quantity: &1, hash_data: hash_data})
{:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data)
entries = Enum.map(block_quantities, &%{address_hash_bytes: address_hash_bytes, block_number: quantity_to_integer(&1)})
case CoinBalance.Fetcher.run(params_list, 0, json_rpc_named_arguments) do
case CoinBalance.Fetcher.run(entries, 0, json_rpc_named_arguments) do
:ok ->
balances = Repo.all(from(balance in Address.CoinBalance, where: balance.address_hash == ^hash_data))
@ -283,12 +284,14 @@ defmodule Indexer.CoinBalance.FetcherTest do
other ->
# not all nodes behind the `https://mainnet.infura.io` pool are fully-synced. Node that aren't fully-synced
# won't have historical address balances.
assert {:retry, ^params_list} = other
assert {:retry, ^entries} = other
end
end
test "duplicate params retry unique params", %{json_rpc_named_arguments: json_rpc_named_arguments} do
hash_data = "0x000000000000000000000000000000000"
test "duplicate entries retry unique entries", %{json_rpc_named_arguments: json_rpc_named_arguments} do
hash_data = "0x0000000000000000000000000000000000000000"
{:ok, %Hash{bytes: bytes}} = Hash.Address.cast(hash_data)
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
EthereumJSONRPC.Mox
@ -298,15 +301,15 @@ defmodule Indexer.CoinBalance.FetcherTest do
end
assert CoinBalance.Fetcher.run(
[%{block_quantity: "0x1", hash_data: hash_data}, %{block_quantity: "0x1", hash_data: hash_data}],
[%{address_hash_bytes: bytes, block_number: 1}, %{address_hash_bytes: bytes, block_number: 1}],
0,
json_rpc_named_arguments
) ==
{:retry,
[
%{
block_quantity: "0x1",
hash_data: "0x000000000000000000000000000000000"
address_hash_bytes: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>,
block_number: 1
}
]}
end

Loading…
Cancel
Save