Transition to realtime index and limit address task spawning

pull/162/head
Chris McCord 7 years ago
parent 740f4a6200
commit 195b50f14a
  1. 10
      apps/explorer/lib/explorer/chain.ex
  2. 118
      apps/explorer/lib/explorer/indexer/address_fetcher.ex
  3. 176
      apps/explorer/lib/explorer/indexer/block_fetcher.ex
  4. 36
      apps/explorer/lib/explorer/indexer/block_importer.ex
  5. 10
      apps/explorer/lib/explorer/indexer/supervisor.ex
  6. 24
      apps/explorer/lib/explorer/jsonrpc.ex
  7. 32
      apps/explorer/lib/explorer/jsonrpc/receipts.ex
  8. 11
      apps/explorer/mix.exs
  9. 7
      mix.exs

@ -411,11 +411,11 @@ defmodule Explorer.Chain do
"""
def import_blocks(%{
blocks_params: blocks_params,
logs_params: logs_params,
internal_transactions_params: internal_transactions_params,
receipts_params: receipts_params,
transactions_params: transactions_params
blocks: blocks_params,
logs: logs_params,
internal_transactions: internal_transactions_params,
receipts: receipts_params,
transactions: transactions_params
})
when is_list(blocks_params) and is_list(internal_transactions_params) and is_list(logs_params) and
is_list(receipts_params) and is_list(transactions_params) do

@ -13,7 +13,8 @@ defmodule Explorer.Indexer.AddressFetcher do
}
@fetch_interval :timer.seconds(3)
@max_batch_size 500
@max_batch_size 100
@max_concurrency 2
def async_fetch_balances(address_hashes) do
GenServer.cast(__MODULE__, {:buffer_addresses, address_hashes})
@ -26,17 +27,25 @@ defmodule Explorer.Indexer.AddressFetcher do
def init(_opts) do
send(self(), :fetch_unfetched_addresses)
{:ok, %{buffer: MapSet.new(), tasks: %{}}}
state = %{
flush_timer: nil,
fetch_interval: @fetch_interval,
buffer: :queue.new(),
tasks: %{}
}
{:ok, state}
end
def handle_info(:fetch_unfetched_addresses, state) do
schedule_next_buffer_fetch(0)
{:noreply, stream_unfetched_addresses(state)}
end
def handle_info(:buffer_fetch, state) do
schedule_next_buffer_fetch()
{:noreply, flush_buffer(state)}
def handle_info(:flush, state) do
{:noreply, state |> fetch_next_batch([]) |> schedule_next_buffer_flush()}
end
def handle_info({:async_fetch, hashes}, state) do
{:noreply, fetch_next_batch(state, hashes)}
end
def handle_info({ref, {:fetched_balances, results}}, state) do
@ -60,69 +69,82 @@ defmodule Explorer.Indexer.AddressFetcher do
end
def handle_cast({:buffer_addresses, address_hashes}, state) do
{:noreply, buffer_addresses(state, address_hashes)}
string_hashes = for hash <- address_hashes, do: Hash.to_string(hash)
{:noreply, buffer_addresses(state, string_hashes)}
end
defp drop_task(state, ref) do
schedule_async_fetch([])
%{state | tasks: Map.delete(state.tasks, ref)}
end
defp buffer_addresses(state, address_hashes) do
string_hashes = for hash <- address_hashes, do: Hash.to_string(hash)
%{state | buffer: MapSet.union(state.buffer, MapSet.new(string_hashes))}
defp buffer_addresses(state, string_hashes) do
%{state | buffer: :queue.join(state.buffer, :queue.from_list(string_hashes))}
end
defp stream_unfetched_addresses(state) do
tasks =
{state.tasks, state.buffer}
|> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, {tasks, batch} ->
batch = MapSet.put(batch, Hash.to_string(hash))
if MapSet.size(batch) >= @max_batch_size do
task = do_async_fetch_balances(batch)
{Map.put(tasks, task.ref, batch), MapSet.new()}
else
{tasks, batch}
end
end)
|> fetch_remaining()
state.buffer
|> Chain.stream_unfetched_addresses(fn %Address{hash: hash}, batch ->
batch = :queue.in(Hash.to_string(hash), batch)
if :queue.len(batch) >= @max_batch_size do
schedule_async_fetch(:queue.to_list(batch))
:queue.new()
else
batch
end
end)
|> fetch_remaining()
%{state | tasks: tasks}
schedule_next_buffer_flush(state)
end
defp fetch_remaining({:ok, {tasks, batch}}) do
if MapSet.size(batch) > 0 do
task = do_async_fetch_balances(batch)
Map.put(tasks, task.ref, batch)
else
tasks
defp fetch_remaining({:ok, batch}) do
if :queue.len(batch) > 0 do
schedule_async_fetch(:queue.to_list(batch))
end
:ok
end
defp flush_buffer(state) do
if MapSet.size(state.buffer) > 0 do
task = do_async_fetch_balances(state.buffer)
new_tasks = Map.put(state.tasks, task.ref, state.buffer)
defp do_fetch_addresses(address_hashes) do
JSONRPC.fetch_balances_by_hash(address_hashes)
end
%{state | tasks: new_tasks, buffer: MapSet.new()}
else
state
end
defp take_batch(queue) do
{hashes, remaining_queue} =
Enum.reduce_while(1..@max_batch_size, {[], queue}, fn _, {hashes, queue_acc} ->
case :queue.out(queue_acc) do
{{:value, hash}, new_queue} -> {:cont, {[hash | hashes], new_queue}}
{:empty, new_queue} -> {:halt, {hashes, new_queue}}
end
end)
{Enum.reverse(hashes), remaining_queue}
end
defp schedule_next_buffer_fetch(after_ms \\ @fetch_interval) do
Process.send_after(self(), :buffer_fetch, after_ms)
defp schedule_async_fetch(hashes, after_ms \\ 0) do
Process.send_after(self(), {:async_fetch, hashes}, after_ms)
end
defp do_fetch_addresses(address_hashes) do
JSONRPC.fetch_balances_by_hash(address_hashes)
defp schedule_next_buffer_flush(state) do
timer = Process.send_after(self(), :flush, state.fetch_interval)
%{state | flush_timer: timer}
end
defp do_async_fetch_balances(hashes_mapset) do
Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn ->
Logger.debug(fn -> "fetching #{MapSet.size(hashes_mapset)} balances" end)
{:ok, balances} = do_fetch_addresses(Enum.to_list(hashes_mapset))
{:fetched_balances, balances}
end)
defp fetch_next_batch(state, hashes) do
state = buffer_addresses(state, hashes)
if Enum.count(state.tasks) < @max_concurrency and :queue.len(state.buffer) > 0 do
{batch, new_queue} = take_batch(state.buffer)
task = Task.Supervisor.async_nolink(Explorer.Indexer.TaskSupervisor, fn ->
Logger.debug(fn -> "fetching #{Enum.count(batch)} balances" end)
{:ok, balances} = do_fetch_addresses(batch)
{:fetched_balances, balances}
end)
%{state | tasks: Map.put(state.tasks, task.ref, batch), buffer: new_queue}
else
buffer_addresses(state, hashes)
end
end
end

@ -1,10 +1,6 @@
defmodule Explorer.Indexer.BlockFetcher do
@moduledoc """
TODO
## Next steps
- after gensis index transition to RT index
Fetches and indexes block ranges from gensis to realtime.
"""
use GenServer
@ -15,39 +11,50 @@ defmodule Explorer.Indexer.BlockFetcher do
alias Explorer.Indexer.{
Sequence,
AddressFetcher
BlockImporter,
}
alias Explorer.JSONRPC.Transactions
# Struct
defstruct ~w(current_block genesis_task subscription_id)a
# Constants
@batch_size 50
@blocks_concurrency 20
@blocks_concurrency 10
@internal_batch_size 50
@internal_concurrency 8
@polling_interval 20_000
@block_rate 5_000
@receipts_batch_size 250
@receipts_concurrency 20
# Functions
@doc """
Starts the server.
## Options
* `:block_rate` - The millisecond rate new blocks are published at.
Defaults to `#{@block_rate}`.
"""
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
## GenServer callbacks
@impl GenServer
def init(opts) do
send(self(), :catchup_index)
:timer.send_interval(15_000, self(), :debug_count)
def handle_info(:index, state) do
state = %{
genesis_task: nil,
realtime_interval: (opts[:block_rate] || @block_rate) * 2,
}
{:ok, state}
end
@impl GenServer
def handle_info(:catchup_index, state) do
{count, missing_ranges} = missing_block_numbers()
current_block = Indexer.next_block_number()
@ -55,35 +62,34 @@ defmodule Explorer.Indexer.BlockFetcher do
{:ok, genesis_task} =
Task.start_link(fn ->
stream_import(missing_ranges, current_block)
{:ok, seq} = Sequence.start_link(missing_ranges, current_block, @batch_size)
stream_import(seq, max_concurrency: @blocks_concurrency)
end)
Process.monitor(genesis_task)
{:noreply, %__MODULE__{state | genesis_task: genesis_task}}
{:noreply, %{state | genesis_task: genesis_task}}
end
def handle_info(:poll, %__MODULE__{subscription_id: subscription_id} = state) do
Process.send_after(self(), :poll, @polling_interval)
with {:ok, blocks} when length(blocks) > 0 <- JSONRPC.check_for_updates(subscription_id) do
Logger.debug(fn -> "Processing #{length(blocks)} new block(s)" end)
def handle_info(:realtime_index, state) do
{:ok, realtime_task} =
Task.start_link(fn ->
{:ok, seq} = Sequence.start_link([], Indexer.next_block_number(), 2)
stream_import(seq, max_concurrency: 1)
end)
# TODO do something with the new blocks
JSONRPC.fetch_blocks_by_hash(blocks)
end
Process.monitor(realtime_task)
{:noreply, state}
{:noreply, %{state | realtime_task: realtime_task}}
end
def handle_info({:DOWN, _ref, :process, pid, :normal}, %__MODULE__{genesis_task: pid} = state) do
Logger.info(fn -> "Finished index from genesis" end)
{:ok, subscription_id} = JSONRPC.listen_for_new_blocks()
send(self(), :poll)
def handle_info({:DOWN, _ref, :process, pid, :normal}, %{realtime_task: pid} = state) do
{:noreply, schedule_next_realtime_fetch(%{state | realtime_task: nil})}
end
{:noreply, %__MODULE__{state | genesis_task: nil, subscription_id: subscription_id}}
def handle_info({:DOWN, _ref, :process, pid, :normal}, %{genesis_task: pid} = state) do
Logger.info(fn -> "Finished index from genesis. Transitioning to realtime index." end)
{:noreply, schedule_next_realtime_fetch(%{state | genesis_task: nil})}
end
def handle_info(:debug_count, state) do
@ -104,18 +110,7 @@ defmodule Explorer.Indexer.BlockFetcher do
{:noreply, state}
end
@impl GenServer
def init(_opts) do
send(self(), :index)
:timer.send_interval(15_000, self(), :debug_count)
{:ok, %__MODULE__{current_block: 0, genesis_task: nil, subscription_id: nil}}
end
## Private Functions
defp cap_seq(seq, :end_of_chain, {_block_start, block_end}) do
Logger.info("Reached end of blockchain #{inspect(block_end)}")
defp cap_seq(seq, :end_of_chain, {_block_start, _block_end}) do
:ok = Sequence.cap(seq)
end
@ -134,13 +129,13 @@ defmodule Explorer.Indexer.BlockFetcher do
|> Enum.chunk_every(@internal_batch_size)
|> Task.async_stream(&JSONRPC.fetch_internal_transactions(&1), stream_opts)
|> Enum.reduce_while({:ok, []}, fn
{:ok, {:ok, internal_transactions_params}}, {:ok, acc} -> {:cont, {:ok, acc ++ internal_transactions_params}}
{:ok, {:ok, internal_transactions}}, {:ok, acc} -> {:cont, {:ok, acc ++ internal_transactions}}
{:ok, {:error, reason}}, {:ok, _acc} -> {:halt, {:error, reason}}
{:error, reason}, {:ok, _acc} -> {:halt, {:error, reason}}
end)
end
defp fetch_transaction_receipts([]), do: {:ok, %{logs_params: [], receipts_params: []}}
defp fetch_transaction_receipts([]), do: {:ok, %{logs: [], receipts: []}}
defp fetch_transaction_receipts(hashes) do
Logger.debug(fn -> "fetching #{length(hashes)} transaction receipts" end)
@ -149,11 +144,10 @@ defmodule Explorer.Indexer.BlockFetcher do
hashes
|> Enum.chunk_every(@receipts_batch_size)
|> Task.async_stream(&JSONRPC.fetch_transaction_receipts(&1), stream_opts)
|> Enum.reduce_while({:ok, %{logs_params: [], receipts_params: []}}, fn
{:ok, {:ok, %{logs_params: logs_params, receipts_params: receipts_params}}},
{:ok, %{logs_params: acc_log_params, receipts_params: acc_receipts_params}} ->
{:cont,
{:ok, %{logs_params: acc_log_params ++ logs_params, receipts_params: acc_receipts_params ++ receipts_params}}}
|> Enum.reduce_while({:ok, %{logs: [], receipts: []}}, fn
{:ok, {:ok, %{logs: logs, receipts: receipts}}},
{:ok, %{logs: acc_logs, receipts: acc_receipts}} ->
{:cont, {:ok, %{logs: acc_logs ++ logs, receipts: acc_receipts ++ receipts}}}
{:ok, {:error, reason}}, {:ok, _acc} ->
{:halt, {:error, reason}}
@ -163,27 +157,12 @@ defmodule Explorer.Indexer.BlockFetcher do
end)
end
defp insert(%{
blocks_params: blocks_params,
internal_transactions_params: internal_transactions_params,
logs_params: log_params,
range: range,
receipts_params: receipt_params,
seq: seq,
transactions_params: transactions_params
}) do
case Chain.import_blocks(%{
blocks_params: blocks_params,
internal_transactions_params: internal_transactions_params,
logs_params: log_params,
receipts_params: receipt_params,
transactions_params: transactions_params
}) do
{:ok, %{addresses: address_hashes}} ->
:ok = AddressFetcher.async_fetch_balances(address_hashes)
defp insert(seq, range, params) do
case BlockImporter.import_blocks(params) do
:ok ->
:ok
{:error, step, reason, _changes} ->
{:error, step, reason} ->
Logger.debug(fn ->
"failed to insert blocks during #{step} #{inspect(range)}: #{inspect(reason)}. Retrying"
end)
@ -216,50 +195,43 @@ defmodule Explorer.Indexer.BlockFetcher do
{count, chunked_ranges}
end
defp stream_import(missing_ranges, current_block) do
{:ok, seq} = Sequence.start_link(missing_ranges, current_block, @batch_size)
defp stream_import(seq, task_opts) do
seq
|> Sequence.build_stream()
|> Task.async_stream(
fn {block_start, block_end} = range ->
with {:ok, value} <- JSONRPC.fetch_blocks_by_range(block_start, block_end),
# `mix format` bug made the line too long when pattern combined into above line
%{next: next, blocks_params: blocks_params, range: range, transactions_params: transactions_params} =
value,
with {:ok, next, result} <- JSONRPC.fetch_blocks_by_range(block_start, block_end),
%{blocks: blocks, transactions: transactions} <- result,
:ok <- cap_seq(seq, next, range),
transaction_hashes <- Transactions.params_to_hashes(transactions_params),
{:ok, %{logs_params: logs_params, receipts_params: receipts_params}} <-
fetch_transaction_receipts(transaction_hashes),
{:ok, internal_transactions_params} <- fetch_internal_transactions(transaction_hashes) do
insert(%{
blocks_params: blocks_params,
internal_transactions_params: internal_transactions_params,
logs_params: logs_params,
range: range,
receipts_params: receipts_params,
seq: seq,
transactions_params: transactions_params
transaction_hashes <- Transactions.params_to_hashes(transactions),
{:ok, receipt_params} <- fetch_transaction_receipts(transaction_hashes),
%{logs: logs, receipts: receipts} <- receipt_params,
{:ok, internal_transactions} <- fetch_internal_transactions(transaction_hashes) do
insert(seq, range, %{
blocks: blocks,
internal_transactions: internal_transactions,
logs: logs,
receipts: receipts,
transactions: transactions
})
else
{:error, reason} ->
Logger.debug(fn ->
"failed to fetch blocks #{inspect(range)}: #{inspect(reason)}. Retrying"
end)
:ok = Sequence.inject_range(seq, range)
{:error, reason, range} ->
Logger.debug(fn ->
"failed to fetch blocks #{inspect(range)}: #{inspect(reason)}. Retrying"
end)
:ok = Sequence.inject_range(seq, range)
end
end,
max_concurrency: @blocks_concurrency,
timeout: :infinity
Keyword.merge(task_opts, timeout: :infinity)
)
|> Enum.each(fn {:ok, :ok} -> :ok end)
end
defp schedule_next_realtime_fetch(state) do
timer = Process.send_after(self(), :realtime_index, state.realtime_interval)
%{state | poll_timer: timer}
end
end

@ -0,0 +1,36 @@
defmodule Explorer.Indexer.BlockImporter do
@moduledoc """
Imports blocks to the chain.
Batched block ranges are serialized through the importer to avoid
races and lock contention against conurrent address upserts.
"""
use GenServer
alias Explorer.Chain
alias Explorer.Indexer.AddressFetcher
def import_blocks(blocks) do
GenServer.call(__MODULE__, {:import, blocks})
end
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
{:ok, %{}}
end
def handle_call({:import, blocks}, _from, state) do
case Chain.import_blocks(blocks) do
{:ok, %{addresses: address_hashes}} ->
:ok = AddressFetcher.async_fetch_balances(address_hashes)
{:reply, :ok, state}
{:error, step, reason, _changes} ->
{:reply, {:error, step, reason}, state}
end
end
end

@ -7,23 +7,21 @@ defmodule Explorer.Indexer.Supervisor do
alias Explorer.Indexer.{
BlockFetcher,
AddressFetcher
BlockImporter,
AddressFetcher,
}
# Functions
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts)
end
## Supervisor callbacks
@impl Supervisor
def init(_opts) do
children = [
{Task.Supervisor, name: Explorer.Indexer.TaskSupervisor},
{BlockFetcher, []},
{AddressFetcher, []}
{BlockImporter, []},
{AddressFetcher, []},
]
Supervisor.init(children, strategy: :rest_for_one)

@ -147,7 +147,13 @@ defmodule Explorer.JSONRPC do
}
end
json_rpc(batched_requests, config(:url))
batched_requests
|> json_rpc(config(:url))
|> handle_get_block_by_number()
|> case do
{:ok, _next, results} -> {:ok, results}
{:error, reason} -> {:error, reason}
end
end
@doc """
@ -157,7 +163,7 @@ defmodule Explorer.JSONRPC do
block_start
|> build_batch_get_block_by_number(block_end)
|> json_rpc(config(:url))
|> handle_get_block_by_number(block_start, block_end)
|> handle_get_block_by_number()
end
@doc """
@ -266,7 +272,7 @@ defmodule Explorer.JSONRPC do
raise("bad jason")
end
defp handle_get_block_by_number({:ok, results}, block_start, block_end) do
defp handle_get_block_by_number({:ok, results}) do
{blocks, next} =
Enum.reduce(results, {[], :more}, fn
%{"result" => nil}, {blocks, _} -> {blocks, :end_of_chain}
@ -278,17 +284,15 @@ defmodule Explorer.JSONRPC do
blocks_params = Blocks.elixir_to_params(elixir_blocks)
transactions_params = Transactions.elixir_to_params(elixir_transactions)
{:ok,
{:ok, next,
%{
next: next,
blocks_params: blocks_params,
range: {block_start, block_end},
transactions_params: transactions_params
blocks: blocks_params,
transactions: transactions_params
}}
end
defp handle_get_block_by_number({:error, reason}, block_start, block_end) do
{:error, reason, {block_start, block_end}}
defp handle_get_block_by_number({:error, reason}) do
{:error, reason}
end
defp handle_response(resp, 200) do

@ -27,20 +27,24 @@ defmodule Explorer.JSONRPC.Receipts do
end
def fetch(hashes) when is_list(hashes) do
with {:ok, responses} <-
hashes
|> Enum.map(&hash_to_json/1)
|> json_rpc(config(:url)) do
elixir_receipts =
responses
|> responses_to_receipts()
|> to_elixir()
elixir_logs = elixir_to_logs(elixir_receipts)
receipts_params = elixir_to_params(elixir_receipts)
logs_params = Logs.elixir_to_params(elixir_logs)
{:ok, %{logs_params: logs_params, receipts_params: receipts_params}}
hashes
|> Enum.map(&hash_to_json/1)
|> json_rpc(config(:url))
|> case do
{:ok, responses} ->
elixir_receipts =
responses
|> responses_to_receipts()
|> to_elixir()
elixir_logs = elixir_to_logs(elixir_receipts)
receipts = elixir_to_params(elixir_receipts)
logs = Logs.elixir_to_params(elixir_logs)
{:ok, %{logs: logs, receipts: receipts}}
{:error, _reason} = err ->
err
end
end

@ -3,7 +3,7 @@ defmodule Explorer.Mixfile do
def project do
[
aliases: aliases(),
aliases: aliases(Mix.env()),
app: :explorer,
build_path: "../../_build",
config_path: "../../config/config.exs",
@ -98,13 +98,16 @@ defmodule Explorer.Mixfile do
# $ mix ecto.setup
#
# See the documentation for `Mix` for more info on aliases.
defp aliases do
defp aliases(env) do
[
compile: "compile --warnings-as-errors",
"ecto.setup": ["ecto.create", "ecto.migrate", "run priv/repo/seeds.exs"],
"ecto.reset": ["ecto.drop", "ecto.setup"],
test: ["ecto.drop", "ecto.create --quiet", "ecto.migrate", "test"]
]
] ++ env_aliases(env)
end
defp env_aliases(:dev), do: []
defp env_aliases(_env) do
[compile: "compile --warnings-as-errors"]
end
defp package do

@ -5,7 +5,7 @@ defmodule ExplorerUmbrella.Mixfile do
def project do
[
aliases: aliases(),
aliases: aliases(Mix.env()),
apps_path: "apps",
deps: deps(),
dialyzer: [
@ -27,7 +27,10 @@ defmodule ExplorerUmbrella.Mixfile do
## Private Functions
defp aliases do
defp aliases(:dev) do
[]
end
defp aliases(_env) do
[
compile: "compile --warnings-as-errors"
]

Loading…
Cancel
Save