From d907bd3f75c20ccb17fe15e19d6686beaa137595 Mon Sep 17 00:00:00 2001 From: Chris McCord Date: Thu, 3 May 2018 10:31:42 -0400 Subject: [PATCH] Bump docs --- .../lib/explorer/indexer/address_fetcher.ex | 19 ++++++++++++------- .../lib/explorer/indexer/block_fetcher.ex | 2 +- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/apps/explorer/lib/explorer/indexer/address_fetcher.ex b/apps/explorer/lib/explorer/indexer/address_fetcher.ex index da22f4ac96..41e711780e 100644 --- a/apps/explorer/lib/explorer/indexer/address_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/address_fetcher.ex @@ -1,21 +1,23 @@ defmodule Explorer.Indexer.AddressFetcher do @moduledoc """ - TODO + Fetches and indexes `t:Explorer.Chain.Address.t/0` balances. """ use GenServer require Logger alias Explorer.Chain + alias Explorer.Chain.{ Address, - Hash, + Hash } + alias Explorer.JSONRPC @fetch_interval :timer.seconds(3) @max_batch_size 500 - def async_fetch_addresses(address_hashes) do + def async_fetch_balances(address_hashes) do GenServer.cast(__MODULE__, {:buffer_addresses, address_hashes}) end @@ -47,6 +49,7 @@ defmodule Explorer.Indexer.AddressFetcher do def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do {:noreply, state} end + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do batch = Map.fetch!(state.tasks, ref) @@ -76,8 +79,9 @@ defmodule Explorer.Indexer.AddressFetcher do {state.tasks, state.buffer} |> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, {tasks, batch} -> batch = MapSet.put(batch, Hash.to_string(hash)) + if MapSet.size(batch) >= @max_batch_size do - task = async_fetch_balances(batch) + task = do_async_fetch_balances(batch) {Map.put(tasks, task.ref, batch), MapSet.new()} else {tasks, batch} @@ -87,9 +91,10 @@ defmodule Explorer.Indexer.AddressFetcher do %{state | tasks: tasks} end + defp fetch_remaining({:ok, {tasks, batch}}) do if MapSet.size(batch) > 0 do - task = async_fetch_balances(batch) + task = do_async_fetch_balances(batch) Map.put(tasks, task.ref, batch) else tasks @@ -98,7 +103,7 @@ defmodule Explorer.Indexer.AddressFetcher do defp flush_buffer(state) do if MapSet.size(state.buffer) > 0 do - task = async_fetch_balances(state.buffer) + task = do_async_fetch_balances(state.buffer) new_tasks = Map.put(state.tasks, task.ref, state.buffer) %{state | tasks: new_tasks, buffer: MapSet.new()} @@ -115,7 +120,7 @@ defmodule Explorer.Indexer.AddressFetcher do JSONRPC.fetch_balances_by_hash(address_hashes) end - defp async_fetch_balances(hashes_mapset) do + defp do_async_fetch_balances(hashes_mapset) do Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn -> Logger.debug(fn -> "fetching #{MapSet.size(hashes_mapset)} balances" end) {:ok, balances} = do_fetch_addresses(Enum.to_list(hashes_mapset)) diff --git a/apps/explorer/lib/explorer/indexer/block_fetcher.ex b/apps/explorer/lib/explorer/indexer/block_fetcher.ex index 3317eb37d2..945b7f9b57 100644 --- a/apps/explorer/lib/explorer/indexer/block_fetcher.ex +++ b/apps/explorer/lib/explorer/indexer/block_fetcher.ex @@ -178,7 +178,7 @@ defmodule Explorer.Indexer.BlockFetcher do transactions_params: transactions_params }) do {:ok, %{addresses: address_hashes}} -> - :ok = AddressFetcher.async_fetch_addresses(address_hashes) + :ok = AddressFetcher.async_fetch_balances(address_hashes) :ok {:error, step, reason, _changes} ->