|
|
@ -1,21 +1,23 @@ |
|
|
|
defmodule Explorer.Indexer.AddressFetcher do |
|
|
|
defmodule Explorer.Indexer.AddressFetcher do |
|
|
|
@moduledoc """ |
|
|
|
@moduledoc """ |
|
|
|
TODO |
|
|
|
Fetches and indexes `t:Explorer.Chain.Address.t/0` balances. |
|
|
|
""" |
|
|
|
""" |
|
|
|
use GenServer |
|
|
|
use GenServer |
|
|
|
require Logger |
|
|
|
require Logger |
|
|
|
|
|
|
|
|
|
|
|
alias Explorer.Chain |
|
|
|
alias Explorer.Chain |
|
|
|
|
|
|
|
|
|
|
|
alias Explorer.Chain.{ |
|
|
|
alias Explorer.Chain.{ |
|
|
|
Address, |
|
|
|
Address, |
|
|
|
Hash, |
|
|
|
Hash |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
alias Explorer.JSONRPC |
|
|
|
alias Explorer.JSONRPC |
|
|
|
|
|
|
|
|
|
|
|
@fetch_interval :timer.seconds(3) |
|
|
|
@fetch_interval :timer.seconds(3) |
|
|
|
@max_batch_size 500 |
|
|
|
@max_batch_size 500 |
|
|
|
|
|
|
|
|
|
|
|
def async_fetch_addresses(address_hashes) do |
|
|
|
def async_fetch_balances(address_hashes) do |
|
|
|
GenServer.cast(__MODULE__, {:buffer_addresses, address_hashes}) |
|
|
|
GenServer.cast(__MODULE__, {:buffer_addresses, address_hashes}) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
@ -47,6 +49,7 @@ defmodule Explorer.Indexer.AddressFetcher do |
|
|
|
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do |
|
|
|
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do |
|
|
|
{:noreply, state} |
|
|
|
{:noreply, state} |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do |
|
|
|
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do |
|
|
|
batch = Map.fetch!(state.tasks, ref) |
|
|
|
batch = Map.fetch!(state.tasks, ref) |
|
|
|
|
|
|
|
|
|
|
@ -76,8 +79,9 @@ defmodule Explorer.Indexer.AddressFetcher do |
|
|
|
{state.tasks, state.buffer} |
|
|
|
{state.tasks, state.buffer} |
|
|
|
|> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, {tasks, batch} -> |
|
|
|
|> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, {tasks, batch} -> |
|
|
|
batch = MapSet.put(batch, Hash.to_string(hash)) |
|
|
|
batch = MapSet.put(batch, Hash.to_string(hash)) |
|
|
|
|
|
|
|
|
|
|
|
if MapSet.size(batch) >= @max_batch_size do |
|
|
|
if MapSet.size(batch) >= @max_batch_size do |
|
|
|
task = async_fetch_balances(batch) |
|
|
|
task = do_async_fetch_balances(batch) |
|
|
|
{Map.put(tasks, task.ref, batch), MapSet.new()} |
|
|
|
{Map.put(tasks, task.ref, batch), MapSet.new()} |
|
|
|
else |
|
|
|
else |
|
|
|
{tasks, batch} |
|
|
|
{tasks, batch} |
|
|
@ -87,9 +91,10 @@ defmodule Explorer.Indexer.AddressFetcher do |
|
|
|
|
|
|
|
|
|
|
|
%{state | tasks: tasks} |
|
|
|
%{state | tasks: tasks} |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp fetch_remaining({:ok, {tasks, batch}}) do |
|
|
|
defp fetch_remaining({:ok, {tasks, batch}}) do |
|
|
|
if MapSet.size(batch) > 0 do |
|
|
|
if MapSet.size(batch) > 0 do |
|
|
|
task = async_fetch_balances(batch) |
|
|
|
task = do_async_fetch_balances(batch) |
|
|
|
Map.put(tasks, task.ref, batch) |
|
|
|
Map.put(tasks, task.ref, batch) |
|
|
|
else |
|
|
|
else |
|
|
|
tasks |
|
|
|
tasks |
|
|
@ -98,7 +103,7 @@ defmodule Explorer.Indexer.AddressFetcher do |
|
|
|
|
|
|
|
|
|
|
|
defp flush_buffer(state) do |
|
|
|
defp flush_buffer(state) do |
|
|
|
if MapSet.size(state.buffer) > 0 do |
|
|
|
if MapSet.size(state.buffer) > 0 do |
|
|
|
task = async_fetch_balances(state.buffer) |
|
|
|
task = do_async_fetch_balances(state.buffer) |
|
|
|
new_tasks = Map.put(state.tasks, task.ref, state.buffer) |
|
|
|
new_tasks = Map.put(state.tasks, task.ref, state.buffer) |
|
|
|
|
|
|
|
|
|
|
|
%{state | tasks: new_tasks, buffer: MapSet.new()} |
|
|
|
%{state | tasks: new_tasks, buffer: MapSet.new()} |
|
|
@ -115,7 +120,7 @@ defmodule Explorer.Indexer.AddressFetcher do |
|
|
|
JSONRPC.fetch_balances_by_hash(address_hashes) |
|
|
|
JSONRPC.fetch_balances_by_hash(address_hashes) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp async_fetch_balances(hashes_mapset) do |
|
|
|
defp do_async_fetch_balances(hashes_mapset) do |
|
|
|
Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn -> |
|
|
|
Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn -> |
|
|
|
Logger.debug(fn -> "fetching #{MapSet.size(hashes_mapset)} balances" end) |
|
|
|
Logger.debug(fn -> "fetching #{MapSet.size(hashes_mapset)} balances" end) |
|
|
|
{:ok, balances} = do_fetch_addresses(Enum.to_list(hashes_mapset)) |
|
|
|
{:ok, balances} = do_fetch_addresses(Enum.to_list(hashes_mapset)) |
|
|
|