Land #411: Fix chunking issues with BlockFetcher and Sequence

pull/419/head
Luke Imhoff 6 years ago committed by GitHub
commit e59d3c5c72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      apps/indexer/lib/indexer/block_fetcher.ex
  2. 215
      apps/indexer/lib/indexer/sequence.ex
  3. 217
      apps/indexer/test/indexer/sequence_test.exs

@ -183,13 +183,13 @@ defmodule Indexer.BlockFetcher do
defp genesis_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
missing_ranges = missing_block_number_ranges(state, latest_block_number..0)
missing_ranges = Chain.missing_block_number_ranges(latest_block_number..0)
count = Enum.count(missing_ranges)
debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end)
{:ok, seq} =
Sequence.start_link(prefix: missing_ranges, first: latest_block_number, step: -1 * state.blocks_batch_size)
{:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size)
Sequence.cap(seq)
stream_import(state, seq, max_concurrency: state.blocks_concurrency)
end
@ -214,7 +214,7 @@ defmodule Indexer.BlockFetcher do
"failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying"
end)
:ok = Sequence.inject_range(seq, range)
:ok = Sequence.queue(seq, range)
error
end
@ -268,32 +268,6 @@ defmodule Indexer.BlockFetcher do
|> InternalTransactionFetcher.async_fetch(10_000)
end
defp missing_block_number_ranges(%{blocks_batch_size: blocks_batch_size}, range) do
range
|> Chain.missing_block_number_ranges()
|> chunk_ranges(blocks_batch_size)
end
defp chunk_ranges(ranges, size) do
Enum.flat_map(ranges, fn
first..last = range when last - first <= size ->
[range]
first..last ->
first
|> Stream.iterate(&(&1 + size))
|> Enum.reduce_while([], fn
chunk_first, acc when chunk_first + size >= last ->
{:halt, [chunk_first..last | acc]}
chunk_first, acc ->
chunk_last = chunk_first + size - 1
{:cont, [chunk_first..chunk_last | acc]}
end)
|> Enum.reverse()
end)
end
defp realtime_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do
{:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
{:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2)
@ -345,7 +319,7 @@ defmodule Indexer.BlockFetcher do
"failed to fetch #{step} for blocks #{first} - #{last}: #{inspect(reason)}. Retrying block range."
end)
:ok = Sequence.inject_range(seq, range)
:ok = Sequence.queue(seq, range)
{:error, step, reason}
end

@ -6,25 +6,27 @@ defmodule Indexer.Sequence do
@enforce_keys ~w(current queue step)a
defstruct current: nil,
queue: nil,
step: nil,
mode: :infinite
step: nil
@typedoc """
The initial ranges to stream from the `t:Stream.t/` returned from `build_stream/1`
The ranges to stream from the `t:Stream.t/` returned from `build_stream/1`
"""
@type prefix :: [Range.t()]
@type ranges :: [Range.t()]
@typep prefix_option :: {:prefix, prefix}
@typep ranges_option :: {:ranges, ranges}
@typedoc """
The first number in the sequence to start at once the `t:prefix/0` ranges and any `t:Range.t/0`s injected with
`inject_range/2` are all consumed.
The first number in the sequence to start for infinite sequences.
"""
@type first :: pos_integer()
@type first :: integer()
@typep first_named_argument :: {:first, pos_integer()}
@typep first_option :: {:first, first}
@type mode :: :infinite | :finite
@typedoc """
* `:finite` - only popping ranges from `queue`.
* `:infinite` - generating new ranges from `current` and `step` when `queue` is empty.
"""
@type mode :: :finite | :infinite
@typedoc """
The size of `t:Range.t/0` to construct based on the `t:first_named_argument/0` or its current value when all
@ -34,17 +36,25 @@ defmodule Indexer.Sequence do
@typep step_named_argument :: {:step, step}
@type options :: [prefix_option | first_named_argument | step_named_argument]
@type options :: [ranges_option | first_option | step_named_argument]
@typep t :: %__MODULE__{
current: pos_integer(),
queue: :queue.queue(Range.t()),
step: step(),
mode: mode()
current: nil | integer(),
step: step()
}
@doc """
Starts a process for managing a block sequence.
Infinite sequence
Indexer.Sequence.start_link(first: 100, step: 10)
Finite sequence
Indexer.Sequence.start_link(ranges: [100..0])
"""
@spec start_link(options) :: GenServer.on_start()
def start_link(options) when is_list(options) do
@ -69,9 +79,7 @@ defmodule Indexer.Sequence do
end
@doc """
Changes the mode for the sequencer to signal continuous streaming mode.
Returns the previous `t:mode/0`.
Changes the mode for the sequence to finite.
"""
@spec cap(pid()) :: mode
def cap(sequence) when is_pid(sequence) do
@ -79,11 +87,11 @@ defmodule Indexer.Sequence do
end
@doc """
Adds a range of block numbers to the sequence.
Adds a range of block numbers to the end of sequence.
"""
@spec inject_range(pid(), Range.t()) :: :ok
def inject_range(sequence, _first.._last = range) when is_pid(sequence) do
GenServer.call(sequence, {:inject_range, range})
@spec queue(pid(), Range.t()) :: :ok
def queue(sequence, _first.._last = range) when is_pid(sequence) do
GenServer.call(sequence, {:queue, range})
end
@doc """
@ -96,59 +104,164 @@ defmodule Indexer.Sequence do
@impl GenServer
@spec init(options) :: {:ok, t}
def init(named_arguments) when is_list(named_arguments) do
def init(options) when is_list(options) do
Process.flag(:trap_exit, true)
{:ok,
%__MODULE__{
queue:
named_arguments
|> Keyword.get(:prefix, [])
|> :queue.from_list(),
current: Keyword.fetch!(named_arguments, :first),
step: Keyword.fetch!(named_arguments, :step)
}}
with {:ok, %{ranges: ranges, first: first, step: step}} <- validate_options(options),
initial_queue = :queue.new(),
{:ok, queue} <- queue_chunked_ranges(initial_queue, step, ranges) do
{:ok, %__MODULE__{queue: queue, current: first, step: step}}
else
{:error, reason} ->
{:stop, reason}
end
end
@impl GenServer
@spec handle_call(:cap, GenServer.from(), t()) :: {:reply, mode(), %__MODULE__{mode: :infinite}}
def handle_call(:cap, _from, %__MODULE__{mode: mode} = state) do
{:reply, mode, %__MODULE__{state | mode: :finite}}
@spec handle_call(:cap, GenServer.from(), %__MODULE__{current: nil}) :: {:reply, :finite, %__MODULE__{current: nil}}
@spec handle_call(:cap, GenServer.from(), %__MODULE__{current: integer()}) ::
{:reply, :infinite, %__MODULE__{current: nil}}
def handle_call(:cap, _from, %__MODULE__{current: current} = state) do
mode =
case current do
nil -> :finite
_ -> :infinite
end
{:reply, mode, %__MODULE__{state | current: nil}}
end
@spec handle_call({:inject_range, Range.t()}, GenServer.from(), t()) :: {:reply, mode(), t()}
def handle_call({:inject_range, _first.._last = range}, _from, %__MODULE__{queue: queue} = state) do
{:reply, :ok, %__MODULE__{state | queue: :queue.in(range, queue)}}
@spec handle_call({:queue, Range.t()}, GenServer.from(), t()) :: {:reply, :ok | {:error, String.t()}, t()}
def handle_call({:queue, _first.._last = range}, _from, %__MODULE__{queue: queue, step: step} = state) do
case queue_chunked_range(queue, step, range) do
{:ok, updated_queue} ->
{:reply, :ok, %__MODULE__{state | queue: updated_queue}}
{:error, _} = error ->
{:reply, error, state}
end
end
@spec handle_call(:pop, GenServer.from(), t()) :: {:reply, Range.t() | :halt, t()}
def handle_call(:pop, _from, %__MODULE__{mode: mode, queue: queue, current: current, step: step} = state) do
def handle_call(:pop, _from, %__MODULE__{queue: queue, current: current, step: step} = state) do
{reply, new_state} =
case {mode, :queue.out(queue)} do
case {current, :queue.out(queue)} do
{_, {{:value, range}, new_queue}} ->
{range, %__MODULE__{state | queue: new_queue}}
{:infinite, {:empty, new_queue}} ->
case current + step do
negative when negative < 0 ->
{current..0, %__MODULE__{state | current: 0, mode: :finite, queue: new_queue}}
{nil, {:empty, new_queue}} ->
{:halt, %__MODULE__{state | queue: new_queue}}
{_, {:empty, new_queue}} ->
case current + step do
new_current ->
last = new_current - sign(step)
last = new_current - 1
{current..last, %__MODULE__{state | current: new_current, queue: new_queue}}
end
{:finite, {:empty, new_queue}} ->
{:halt, %__MODULE__{state | queue: new_queue}}
end
{:reply, reply, new_state}
end
@spec sign(neg_integer()) :: -1
defp sign(integer) when integer < 0, do: -1
@spec queue_chunked_range(:queue.queue(Range.t()), step, Range.t()) ::
{:ok, :queue.queue(Range.t())} | {:error, reason :: String.t()}
defp queue_chunked_range(queue, step, _.._ = range) when is_integer(step) do
with {:error, [reason]} <- queue_chunked_ranges(queue, step, [range]) do
{:error, reason}
end
end
@spec queue_chunked_range(:queue.queue(Range.t()), step, [Range.t()]) ::
{:ok, :queue.queue(Range.t())} | {:error, reasons :: [String.t()]}
defp queue_chunked_ranges(queue, step, ranges) when is_integer(step) and is_list(ranges) do
reduce_chunked_ranges(ranges, step, queue, &:queue.in/2)
end
defp reduce_chunked_ranges(ranges, step, initial, reducer)
when is_list(ranges) and is_integer(step) and step != 0 and is_function(reducer, 2) do
Enum.reduce(ranges, {:ok, initial}, fn
range, {:ok, acc} ->
case reduce_chunked_range(range, step, acc, reducer) do
{:ok, _} = ok ->
ok
{:error, reason} ->
{:error, [reason]}
end
range, {:error, acc_reasons} = acc ->
case reduce_chunked_range(range, step, initial, reducer) do
{:ok, _} -> acc
{:error, reason} -> {:error, [reason | acc_reasons]}
end
end)
end
defp reduce_chunked_range(_.._ = range, step, initial, reducer) do
count = Enum.count(range)
reduce_chunked_range(range, count, step, initial, reducer)
end
defp reduce_chunked_range(first..last = range, _count, step, _initial, _reducer)
when (step < 0 and first < last) or (0 < step and last < first) do
{:error, "Range (#{inspect(range)}) direction is opposite step (#{step}) direction"}
end
@spec sign(non_neg_integer()) :: 1
defp sign(_), do: 1
defp reduce_chunked_range(_.._ = range, count, step, initial, reducer) when count <= abs(step) do
{:ok, reducer.(range, initial)}
end
defp reduce_chunked_range(first..last = range, _, step, initial, reducer) do
{sign, comparator} =
if step > 0 do
{1, &Kernel.>=/2}
else
{-1, &Kernel.<=/2}
end
final =
first
|> Stream.iterate(&(&1 + step))
|> Enum.reduce_while(initial, fn chunk_first, acc ->
next_chunk_first = chunk_first + step
full_chunk_last = next_chunk_first - sign
{action, chunk_last} =
if comparator.(full_chunk_last, last) do
{:halt, last}
else
{:cont, full_chunk_last}
end
{action, reducer.(chunk_first..chunk_last, acc)}
end)
{:ok, final}
end
defp validate_options(options) do
step = Keyword.fetch!(options, :step)
case {Keyword.fetch(options, :ranges), Keyword.fetch(options, :first)} do
{:error, {:ok, first}} ->
case step do
pos_integer when is_integer(pos_integer) and pos_integer > 0 ->
{:ok, %{ranges: [], first: first, step: step}}
_ ->
{:error, ":step must be a positive integer for infinite sequences"}
end
{{:ok, ranges}, :error} ->
{:ok, %{ranges: ranges, first: nil, step: step}}
{{:ok, _}, {:ok, _}} ->
{:error,
":ranges and :first cannot be set at the same time as :ranges is for :finite mode while :first is for :infinite mode"}
{:error, :error} ->
{:error, "either :ranges or :first must be set"}
end
end
end

@ -4,15 +4,62 @@ defmodule Indexer.SequenceTest do
alias Indexer.Sequence
describe "start_link/1" do
test "sets state" do
{:ok, pid} = Sequence.start_link(prefix: [1..4], first: 5, step: 1)
test "without :ranges with :first with positive step pops infinitely" do
{:ok, ascending} = Sequence.start_link(first: 5, step: 1)
assert state(pid) == %Sequence{
current: 5,
mode: :infinite,
queue: {[1..4], []},
step: 1
}
assert Sequence.pop(ascending) == 5..5
assert Sequence.pop(ascending) == 6..6
end
test "without :ranges with :first with negative :step is error" do
{child_pid, child_ref} =
spawn_monitor(fn ->
Sequence.start_link(first: 1, step: -1)
Process.sleep(5_000)
end)
assert_receive {:DOWN, ^child_ref, :process, ^child_pid,
":step must be a positive integer for infinite sequences"}
end
test "without :ranges without :first returns error" do
{child_pid, child_ref} =
spawn_monitor(fn ->
Sequence.start_link(step: -1)
Process.sleep(5_000)
end)
assert_receive {:DOWN, ^child_ref, :process, ^child_pid, "either :ranges or :first must be set"}
end
test "with ranges without :first" do
{:ok, pid} = Sequence.start_link(ranges: [1..4], step: 1)
assert Sequence.pop(pid) == 1..1
assert Sequence.pop(pid) == 2..2
assert Sequence.pop(pid) == 3..3
assert Sequence.pop(pid) == 4..4
assert Sequence.pop(pid) == :halt
end
test "with :ranges with :first returns error" do
{child_pid, child_ref} =
spawn_monitor(fn ->
Sequence.start_link(ranges: [1..0], first: 1, step: -1)
Process.sleep(5_000)
end)
assert_receive {:DOWN, ^child_ref, :process, ^child_pid,
":ranges and :first cannot be set at the same time" <>
" as :ranges is for :finite mode while :first is for :infinite mode"}
end
test "with 0 first with negative step does not return 0 twice" do
{:ok, pid} = Sequence.start_link(ranges: [1..0], step: -1)
assert Sequence.pop(pid) == 1..1
assert Sequence.pop(pid) == 0..0
assert Sequence.pop(pid) == :halt
end
# Regression test for https://github.com/poanetwork/poa-explorer/issues/387
@ -29,96 +76,122 @@ defmodule Indexer.SequenceTest do
# noproc when the sequence has already died by the time monitor is called
assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc]
end
test "with :ranges in direction opposite of :step returns errors for all ranges in wrong direction" do
parent = self()
{child_pid, child_ref} =
spawn_monitor(fn ->
send(
parent,
Sequence.start_link(
ranges: [
# ok, ok
7..6,
# ok, error
4..5,
# error, ok
3..2,
# error, error
0..1
],
step: -1
)
)
end)
assert_receive {:DOWN, ^child_ref, :process, ^child_pid,
[
"Range (0..1) direction is opposite step (-1) direction",
"Range (4..5) direction is opposite step (-1) direction"
]}
end
end
describe "queue/2" do
test "with finite mode range is chunked" do
{:ok, pid} = Sequence.start_link(ranges: [1..0], step: -1)
test "inject_range" do
{:ok, pid} = Sequence.start_link(prefix: [1..2], first: 5, step: 1)
assert Sequence.pop(pid) == 1..1
assert Sequence.pop(pid) == 0..0
assert :ok = Sequence.inject_range(pid, 3..4)
assert Sequence.queue(pid, 1..0) == :ok
assert state(pid) == %Sequence{
current: 5,
mode: :infinite,
queue: {[3..4], [1..2]},
step: 1
}
assert Sequence.pop(pid) == 1..1
assert Sequence.pop(pid) == 0..0
assert Sequence.pop(pid) == :halt
assert Sequence.pop(pid) == :halt
end
test "cap" do
{:ok, pid} = Sequence.start_link(prefix: [1..2], first: 5, step: 1)
test "with finite mode with range in wrong direction returns error" do
{:ok, ascending} = Sequence.start_link(first: 0, step: 1)
assert :infinite = Sequence.cap(pid)
assert state(pid).mode == :finite
assert :finite = Sequence.cap(pid)
end
assert Sequence.queue(ascending, 1..0) == {:error, "Range (1..0) direction is opposite step (1) direction"}
describe "pop" do
test "with a non-empty queue in finite and infinite modes" do
{:ok, pid} = Sequence.start_link(prefix: [1..4, 6..9], first: 99, step: 5)
{:ok, descending} = Sequence.start_link(ranges: [1..0], step: -1)
assert 1..4 == Sequence.pop(pid)
assert Sequence.queue(descending, 0..1) == {:error, "Range (0..1) direction is opposite step (-1) direction"}
end
assert state(pid) == %Sequence{
current: 99,
mode: :infinite,
queue: {[], [6..9]},
step: 5
}
test "with infinite mode range is chunked and is returned prior to calculated ranges" do
{:ok, pid} = Sequence.start_link(first: 5, step: 1)
:infinite = Sequence.cap(pid)
assert :ok = Sequence.queue(pid, 3..4)
assert Sequence.pop(pid) == 3..3
assert Sequence.pop(pid) == 4..4
# infinite sequence takes over
assert Sequence.pop(pid) == 5..5
assert Sequence.pop(pid) == 6..6
end
end
assert 6..9 == Sequence.pop(pid)
describe "cap/1" do
test "returns previous mode" do
{:ok, pid} = Sequence.start_link(first: 5, step: 1)
assert state(pid) == %Sequence{
current: 99,
mode: :finite,
queue: {[], []},
step: 5
}
assert Sequence.cap(pid) == :infinite
assert Sequence.cap(pid) == :finite
end
test "with an empty queue in infinite mode" do
{:ok, pid} = Sequence.start_link(first: 5, step: 5)
test "disables infinite mode that uses first and step" do
{:ok, late_capped} = Sequence.start_link(first: 5, step: 1)
assert 5..9 == Sequence.pop(pid)
assert Sequence.pop(late_capped) == 5..5
assert Sequence.pop(late_capped) == 6..6
assert Sequence.queue(late_capped, 5..5) == :ok
assert Sequence.cap(late_capped) == :infinite
assert Sequence.pop(late_capped) == 5..5
assert Sequence.pop(late_capped) == :halt
{:ok, immediately_capped} = Sequence.start_link(first: 5, step: 1)
assert state(pid) == %Sequence{
current: 10,
mode: :infinite,
queue: {[], []},
step: 5
}
assert Sequence.cap(immediately_capped) == :infinite
assert Sequence.pop(immediately_capped) == :halt
end
end
test "with an empty queue in infinit mode with negative step" do
{:ok, pid} = Sequence.start_link(first: 4, step: -5)
describe "pop" do
test "with a non-empty queue in finite mode" do
{:ok, pid} = Sequence.start_link(ranges: [1..4, 6..9], step: 5)
assert 4..0 == Sequence.pop(pid)
assert Sequence.pop(pid) == 1..4
assert Sequence.pop(pid) == 6..9
assert Sequence.pop(pid) == :halt
assert Sequence.pop(pid) == :halt
end
test "with an empty queue in infinite mode returns range from next step from current" do
{:ok, pid} = Sequence.start_link(first: 5, step: 5)
assert state(pid) == %Sequence{
current: 0,
mode: :finite,
queue: {[], []},
step: -5
}
assert 5..9 == Sequence.pop(pid)
end
test "with an empty queue in finite mode" do
test "with an empty queue in finite mode halts immediately" do
{:ok, pid} = Sequence.start_link(first: 5, step: 5)
:infinite = Sequence.cap(pid)
assert :halt == Sequence.pop(pid)
assert state(pid) == %Sequence{
current: 5,
mode: :finite,
queue: {[], []},
step: 5
}
assert Sequence.pop(pid) == :halt
end
end
defp state(sequence) do
:sys.get_state(sequence)
end
end

Loading…
Cancel
Save