Raise error if `:first` would cause retraversal

pull/411/head
Luke Imhoff 6 years ago
parent 15be505219
commit a3d51f5e1a
  1. 2
      apps/indexer/lib/indexer/block_fetcher.ex
  2. 46
      apps/indexer/lib/indexer/sequence.ex
  3. 5
      apps/indexer/test/indexer/sequence_test.exs

@ -189,7 +189,7 @@ defmodule Indexer.BlockFetcher do
debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end) debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end)
{:ok, seq} = {:ok, seq} =
Sequence.start_link(prefix: missing_ranges, first: latest_block_number, step: -1 * state.blocks_batch_size) Sequence.start_link(prefix: missing_ranges, first: 0, step: -1 * state.blocks_batch_size)
stream_import(state, seq, max_concurrency: state.blocks_concurrency) stream_import(state, seq, max_concurrency: state.blocks_concurrency)
end end

@ -96,20 +96,19 @@ defmodule Indexer.Sequence do
@impl GenServer @impl GenServer
@spec init(options) :: {:ok, t} @spec init(options) :: {:ok, t}
def init(named_arguments) when is_list(named_arguments) do def init(options) when is_list(options) do
Process.flag(:trap_exit, true) Process.flag(:trap_exit, true)
step = Keyword.fetch!(named_arguments, :step)
initial_queue = :queue.new() case validate_options(options) do
prefix = Keyword.get(named_arguments, :prefix, []) {:ok, %{prefix: prefix, first: first, step: step}} ->
queue = queue_chunked_ranges(initial_queue, step, prefix) initial_queue = :queue.new()
queue = queue_chunked_ranges(initial_queue, step, prefix)
{:ok, {:ok, %__MODULE__{queue: queue, current: first, step: step}}
%__MODULE__{
queue: queue, {:error, reason} ->
current: Keyword.fetch!(named_arguments, :first), {:stop, reason}
step: step end
}}
end end
@impl GenServer @impl GenServer
@ -202,4 +201,29 @@ defmodule Indexer.Sequence do
@spec sign(non_neg_integer()) :: 1 @spec sign(non_neg_integer()) :: 1
defp sign(_), do: 1 defp sign(_), do: 1
defp validate_options(options) do
step = Keyword.fetch!(options, :step)
prefix = Keyword.get(options, :prefix, [])
first = Keyword.fetch!(options, :first)
validated_options = %{prefix: prefix, first: first, step: step}
if step < 0 do
minimum_in_prefix =
prefix
|> Stream.map(&Enum.min/1)
|> Enum.min(fn -> first end)
if first > minimum_in_prefix do
{:error,
":first (#{first}) is greater than the minimum in `:prefix` (#{minimum_in_prefix}), " <>
"which will cause retraversal."}
else
{:ok, validated_options}
end
else
{:ok, validated_options}
end
end
end end

@ -29,6 +29,11 @@ defmodule Indexer.SequenceTest do
# noproc when the sequence has already died by the time monitor is called # noproc when the sequence has already died by the time monitor is called
assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc] assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc]
end end
test "with negative :step with :first greater than minimum in :prefix returns error" do
assert {:error, ":first (1) is greater than the minimum in `:prefix` (0), which will cause retraversal."} =
Sequence.start_link(prefix: [1..0], first: 1, step: -1)
end
end end
test "inject_range" do test "inject_range" do

Loading…
Cancel
Save