Catch up index based on whether previous catch up found missing blocks

When a catchup_index task completes without error, increase the interval
until the next check by 2 if no blocks were missing and decrease it by 2
if blocks were missing.  Minimum interval is the same as the realtime
interval while the maximum interval is arbitarily chosen to be 10x the
realtime interval.
pull/439/head
Luke Imhoff 6 years ago
parent 8f63f2a87c
commit 6abe66f7b1
  1. 133
      apps/indexer/lib/indexer/block_fetcher.ex
  2. 31
      apps/indexer/lib/indexer/bound_interval.ex
  3. 131
      apps/indexer/test/indexer/block_fetcher_test.exs

@ -11,7 +11,7 @@ defmodule Indexer.BlockFetcher do
alias EthereumJSONRPC
alias Explorer.Chain
alias Indexer.{BalanceFetcher, AddressExtraction, InternalTransactionFetcher, Sequence}
alias Indexer.{BalanceFetcher, AddressExtraction, BoundInterval, InternalTransactionFetcher, Sequence}
# dialyzer thinks that Logger.debug functions always have no_local_return
@dialyzer {:nowarn_function, import_range: 3}
@ -60,6 +60,17 @@ defmodule Indexer.BlockFetcher do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
defstruct json_rpc_named_arguments: [],
catchup_task: nil,
catchup_block_number: nil,
catchup_bound_interval: nil,
realtime_tasks: [],
realtime_interval: nil,
blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
receipts_batch_size: @receipts_batch_size,
receipts_concurrency: @receipts_concurrency
@impl GenServer
def init(opts) do
opts =
@ -67,48 +78,81 @@ defmodule Indexer.BlockFetcher do
|> Application.get_all_env()
|> Keyword.merge(opts)
state = %{
interval = div(opts[:block_interval] || @block_interval, 2)
state = %__MODULE__{
json_rpc_named_arguments: Keyword.fetch!(opts, :json_rpc_named_arguments),
catchup_task: nil,
realtime_tasks: [],
realtime_interval: div(opts[:block_interval] || @block_interval, 2),
catchup_bound_interval: BoundInterval.within(interval..(interval * 10)),
realtime_interval: interval,
blocks_batch_size: Keyword.get(opts, :blocks_batch_size, @blocks_batch_size),
blocks_concurrency: Keyword.get(opts, :blocks_concurrency, @blocks_concurrency),
receipts_batch_size: Keyword.get(opts, :receipts_batch_size, @receipts_batch_size),
receipts_concurrency: Keyword.get(opts, :receipts_concurrency, @receipts_concurrency)
}
send(self(), :catchup_index)
{:ok, _} = :timer.send_interval(state.realtime_interval, :realtime_index)
{:ok, schedule_next_catchup_index(state)}
{:ok, state}
end
@impl GenServer
def handle_info(:catchup_index, %{} = state) do
def handle_info(:catchup_index, %__MODULE__{} = state) do
catchup_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> catchup_task(state) end)
{:noreply, %{state | catchup_task: catchup_task}}
end
def handle_info(:realtime_index, %{realtime_tasks: realtime_tasks} = state) when is_list(realtime_tasks) do
def handle_info(:realtime_index, %__MODULE__{realtime_tasks: realtime_tasks} = state) when is_list(realtime_tasks) do
realtime_task = Task.Supervisor.async_nolink(Indexer.TaskSupervisor, fn -> realtime_task(state) end)
{:noreply, %{state | realtime_tasks: [realtime_task | realtime_tasks]}}
end
def handle_info({:DOWN, ref, :process, pid, :normal}, %{catchup_task: %Task{pid: pid, ref: ref}} = state) do
Logger.info(fn -> "Finished index down to genesis. Transitioning to only realtime index." end)
def handle_info(
{ref, missing_block_count},
%__MODULE__{
catchup_block_number: catchup_block_number,
catchup_bound_interval: catchup_bound_interval,
catchup_task: %Task{ref: ref}
} = state
)
when is_integer(missing_block_count) do
new_catchup_bound_interval =
case missing_block_count do
0 ->
Logger.info("Index already caught up in #{catchup_block_number}-0")
BoundInterval.increase(catchup_bound_interval)
_ ->
Logger.info("Index had to catch up #{missing_block_count} blocks in #{catchup_block_number}-0")
BoundInterval.decrease(catchup_bound_interval)
end
Process.demonitor(ref, [:flush])
interval = new_catchup_bound_interval.current
Logger.info(fn ->
"Checking if index needs to catch up in #{interval}ms"
end)
Process.send_after(self(), :catchup_index, interval)
{:noreply, %{state | catchup_task: nil}}
{:noreply, %{state | catchup_bound_interval: new_catchup_bound_interval, catchup_task: nil}}
end
def handle_info({:DOWN, ref, :process, pid, reason}, %{catchup_task: %Task{pid: pid, ref: ref}} = state) do
Logger.error(fn -> "catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)
def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{catchup_task: %Task{pid: pid, ref: ref}} = state) do
Logger.error(fn -> "Catchup index stream exited with reason (#{inspect(reason)}). Restarting" end)
{:noreply, schedule_next_catchup_index(%{state | catchup_task: nil})}
send(self(), :catchup_index)
{:noreply, %__MODULE__{state | catchup_task: nil}}
end
def handle_info({:DOWN, ref, :process, pid, reason}, %{realtime_tasks: realtime_tasks} = state)
def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{realtime_tasks: realtime_tasks} = state)
when is_list(realtime_tasks) do
{down_realtime_tasks, running_realtime_tasks} =
Enum.split_with(realtime_tasks, fn
@ -127,7 +171,7 @@ defmodule Indexer.BlockFetcher do
Logger.error(fn -> "Unexpected pid (#{inspect(pid)}) exited with reason (#{inspect(reason)})." end)
end
{:noreply, %{state | realtime_tasks: running_realtime_tasks}}
{:noreply, %__MODULE__{state | realtime_tasks: running_realtime_tasks}}
end
defp cap_seq(seq, next, range) do
@ -145,9 +189,12 @@ defmodule Indexer.BlockFetcher do
:ok
end
defp fetch_transaction_receipts(_state, []), do: {:ok, %{logs: [], receipts: []}}
defp fetch_transaction_receipts(%__MODULE__{} = _state, []), do: {:ok, %{logs: [], receipts: []}}
defp fetch_transaction_receipts(%{json_rpc_named_arguments: json_rpc_named_arguments} = state, transaction_params) do
defp fetch_transaction_receipts(
%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state,
transaction_params
) do
debug(fn -> "fetching #{length(transaction_params)} transaction receipts" end)
stream_opts = [max_concurrency: state.receipts_concurrency, timeout: :infinity]
@ -166,17 +213,42 @@ defmodule Indexer.BlockFetcher do
end)
end
defp catchup_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
# Returns number of missing blocks that had to be caught up
defp catchup_task(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
missing_ranges = Chain.missing_block_number_ranges(latest_block_number..0)
count = Enum.count(missing_ranges)
debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end)
case latest_block_number do
# let realtime indexer get the genesis block
0 ->
0
_ ->
# realtime indexer gets the current latest block
first = latest_block_number - 1
last = 0
missing_ranges = Chain.missing_block_number_ranges(first..last)
range_count = Enum.count(missing_ranges)
missing_block_count =
missing_ranges
|> Stream.map(&Enum.count/1)
|> Enum.sum()
debug(fn -> "#{missing_block_count} missed blocks in #{range_count} ranges between #{first} and #{last}" end)
{:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size)
Sequence.cap(seq)
case missing_block_count do
0 ->
:ok
stream_import(state, seq, max_concurrency: state.blocks_concurrency)
_ ->
{:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size)
Sequence.cap(seq)
stream_import(state, seq, max_concurrency: state.blocks_concurrency)
end
missing_block_count
end
end
defp insert(seq, range, options) when is_list(options) do
@ -253,13 +325,13 @@ defmodule Indexer.BlockFetcher do
|> InternalTransactionFetcher.async_fetch(10_000)
end
defp realtime_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
defp realtime_task(%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
{:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2)
stream_import(state, seq, max_concurrency: 1)
end
defp stream_import(state, seq, task_opts) do
defp stream_import(%__MODULE__{} = state, seq, task_opts) do
seq
|> Sequence.build_stream()
|> Task.async_stream(
@ -272,7 +344,7 @@ defmodule Indexer.BlockFetcher do
# Run at state.blocks_concurrency max_concurrency when called by `stream_import/3`
# Only public for testing
@doc false
def import_range(range, %{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq) do
def import_range(range, %__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state, seq) do
with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{blocks: blocks, transactions: transactions_without_receipts} = result,
@ -321,9 +393,4 @@ defmodule Indexer.BlockFetcher do
Map.merge(transaction_params, Map.fetch!(transaction_hash_to_receipt_params, transaction_hash))
end)
end
defp schedule_next_catchup_index(state) do
send(self(), :catchup_index)
state
end
end

@ -0,0 +1,31 @@
defmodule Indexer.BoundInterval do
@moduledoc """
An interval for `Process.send_after` that is restricted to being between a `minimum` and `maximum` value
"""
@enforce_keys ~w(maximum)a
defstruct minimum: 1,
current: 1,
maximum: nil
def within(minimum..maximum) when is_integer(minimum) and is_integer(maximum) and minimum <= maximum do
%__MODULE__{minimum: minimum, current: minimum, maximum: maximum}
end
def decrease(%__MODULE__{minimum: minimum, current: current} = bound_interval)
when is_integer(minimum) and is_integer(current) do
new_current =
current
|> div(2)
|> max(minimum)
%__MODULE__{bound_interval | current: new_current}
end
def increase(%__MODULE__{current: current, maximum: maximum} = bound_interval)
when is_integer(current) and is_integer(maximum) do
new_current = min(current * 2, maximum)
%__MODULE__{bound_interval | current: new_current}
end
end

@ -13,6 +13,7 @@ defmodule Indexer.BlockFetcherTest do
BalanceFetcher,
AddressBalanceFetcherCase,
BlockFetcher,
BoundInterval,
BufferedTask,
InternalTransactionFetcher,
InternalTransactionFetcherCase,
@ -224,13 +225,15 @@ defmodule Indexer.BlockFetcherTest do
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
start_supervised!({BlockFetcher, json_rpc_named_arguments: json_rpc_named_arguments})
first_catchup_block_number = latest_block_number - 1
wait_for_results(fn ->
Repo.one!(from(block in Block, where: block.number == ^latest_block_number))
Repo.one!(from(block in Block, where: block.number == ^first_catchup_block_number))
end)
assert Repo.aggregate(Block, :count, :hash) >= 1
previous_batch_block_number = latest_block_number - default_blocks_batch_size
previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size
wait_for_results(fn ->
Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number))
@ -240,6 +243,130 @@ defmodule Indexer.BlockFetcherTest do
end
end
describe "handle_info(:catchup_index, state)" do
setup context do
# force to use `Mox`, so we can manipulate `lastest_block_number`
put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox)
end
setup :state
test "increases catchup_bound_interval if no blocks missing", %{
json_rpc_named_arguments: json_rpc_named_arguments,
state: state
} do
insert(:block, number: 0)
insert(:block, number: 1)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x1"}}
end)
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
# from `setup :state`
assert_received :catchup_index
assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
BlockFetcher.handle_info(:catchup_index, state)
assert_receive {^ref, 0} = message
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup_bound_interval.current > catchup_index_state.catchup_bound_interval.current
end
test "decreases catchup_bound_interval if blocks missing", %{
json_rpc_named_arguments: json_rpc_named_arguments,
state: state
} do
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x1"}}
end)
|> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: %{
"difficulty" => "0x0",
"gasLimit" => "0x0",
"gasUsed" => "0x0",
"hash" =>
Explorer.Factory.block_hash()
|> to_string(),
"miner" => "0xb2930b35844a230f00e51431acae96fe543a0347",
"number" => "0x0",
"parentHash" =>
Explorer.Factory.block_hash()
|> to_string(),
"size" => "0x0",
"timestamp" => "0x0",
"totalDifficulty" => "0x0",
"transactions" => []
}
}
]}
end)
|> stub(:json_rpc, fn [
%{
id: id,
method: "eth_getBalance",
params: ["0xb2930b35844a230f00e51431acae96fe543a0347", "0x0"]
}
],
_options ->
{:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]}
end)
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
AddressBalanceFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransactionFetcherCase.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
# from `setup :state`
assert_received :catchup_index
assert {:noreply, %BlockFetcher{catchup_task: %Task{pid: pid, ref: ref}} = catchup_index_state} =
BlockFetcher.handle_info(:catchup_index, state)
# 2 blocks are missing, but latest is assumed to be handled by realtime_index, so only 1 is missing for
# catchup_index
assert_receive {^ref, 1} = message
# DOWN is not flushed
assert {:messages, [{:DOWN, ^ref, :process, ^pid, :normal}]} = Process.info(self(), :messages)
assert {:noreply, message_state} = BlockFetcher.handle_info(message, catchup_index_state)
# DOWN is flushed
assert {:messages, []} = Process.info(self(), :messages)
assert message_state.catchup_bound_interval.current == message_state.catchup_bound_interval.minimum
# When not at minimum it is decreased
above_minimum_state = update_in(catchup_index_state.catchup_bound_interval, &BoundInterval.increase/1)
assert above_minimum_state.catchup_bound_interval.current > message_state.catchup_bound_interval.minimum
assert {:noreply, above_minimum_message_state} = BlockFetcher.handle_info(message, above_minimum_state)
assert above_minimum_message_state.catchup_bound_interval.current <
above_minimum_state.catchup_bound_interval.current
end
end
describe "import_range/3" do
setup :state

Loading…
Cancel
Save