Buffer formatted params instead of %TokenBalance{}

```elixir
  test "memory" do
    TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: [])
    pid = Process.whereis(TokenBalance.Fetcher)

    10_000
    |> insert_list(:token_balance)
    |> TokenBalance.Fetcher.async_fetch()

    Process.info(pid, :memory) |> IO.inspect(label: "BEFORE GC")

    :erlang.garbage_collect(pid)

    Process.info(pid, :memory) |> IO.inspect(label: "AFTER GC")
  end
```

| Token Balances | Garbage Collection | `%TokenBalance{}`s (bytes) | `%{token_contract_address_hash: Hash.to_string(token_contract_address_hash),    address_hash: Hash.to_string(address_hash),    block_number: block_number}` (bytes) | Map / Struct (%) |
|----------------|--------------------|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------|
| 1,000          | Before             | 2,549,476                  | 233,500                                                                                                                                                             | 9.16             |
| 1,000          | After              | 2,546,468                  | 233,500                                                                                                                                                             | 9.17             |
| 10,000         | Before             | 19,903,172                 | 2,549,476                                                                                                                                                           | 12.81            |
| 10,000         | After              | 19,900,260                 | 2,546,468                                                                                                                                                           | 12.80            |
pull/917/head
Luke Imhoff 6 years ago
parent 753319de4a
commit 48b1de4617
  1. 30
      apps/indexer/lib/indexer/token_balance/fetcher.ex

@ -20,8 +20,9 @@ defmodule Indexer.TokenBalance.Fetcher do
]
@spec async_fetch([%TokenBalance{}]) :: :ok
def async_fetch(token_balances_params) do
BufferedTask.buffer(__MODULE__, token_balances_params, :infinity)
def async_fetch(token_balances) do
formatted_params = Enum.map(token_balances, &format_params/1)
BufferedTask.buffer(__MODULE__, formatted_params, :infinity)
end
@doc false
@ -45,34 +46,33 @@ defmodule Indexer.TokenBalance.Fetcher do
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
Chain.stream_unfetched_token_balances(initial, fn token_balances_params, acc ->
reducer.(token_balances_params, acc)
Chain.stream_unfetched_token_balances(initial, fn token_balance, acc ->
token_balance
|> format_params()
|> reducer.(acc)
end)
final
end
@impl BufferedTask
def run(token_balances, _retries, _json_rpc_named_arguments) do
Logger.debug(fn -> "fetching #{length(token_balances)} token balances" end)
def run(params_list, _retries, _json_rpc_named_arguments) do
Logger.debug(fn -> "fetching #{length(params_list)} token balances" end)
result =
token_balances
|> fetch_from_blockchain
|> import_token_balances
params_list
|> fetch_from_blockchain()
|> import_token_balances()
if result == :ok do
:ok
else
{:retry, token_balances}
{:retry, params_list}
end
end
def fetch_from_blockchain(token_balances) do
{:ok, token_balances} =
token_balances
|> Stream.map(&format_params/1)
|> TokenBalances.fetch_token_balances_from_blockchain()
def fetch_from_blockchain(params_list) do
{:ok, token_balances} = TokenBalances.fetch_token_balances_from_blockchain(params_list)
TokenBalances.log_fetching_errors(__MODULE__, token_balances)

Loading…
Cancel
Save