From ff11541bdee4f546a9e27a6379b124b1ef5402f7 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 12 Jul 2018 14:01:52 -0500 Subject: [PATCH 01/14] Failing regression tests for #340 --- apps/indexer/lib/indexer/block_fetcher.ex | 3 +- .../test/indexer/block_fetcher_test.exs | 62 +++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index eee26587f7..726e2826dc 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -274,7 +274,8 @@ defmodule Indexer.BlockFetcher do |> chunk_ranges(blocks_batch_size) end - defp chunk_ranges(ranges, size) do + @doc false + def chunk_ranges(ranges, size) do Enum.flat_map(ranges, fn first..last = range when last - first <= size -> [range] diff --git a/apps/indexer/test/indexer/block_fetcher_test.exs b/apps/indexer/test/indexer/block_fetcher_test.exs index d0d26f27d4..36eafce0d3 100644 --- a/apps/indexer/test/indexer/block_fetcher_test.exs +++ b/apps/indexer/test/indexer/block_fetcher_test.exs @@ -829,6 +829,68 @@ defmodule Indexer.BlockFetcherTest do end end + describe "chunk_ranges/2" do + test "with first < last in one chunk" do + range = 0..1 + size = 2 + + assert Enum.count(range) <= size + assert BlockFetcher.chunk_ranges([range], size) == [range] + end + + test "with first < last with filled last chunk" do + range = 0..1 + size = 1 + + assert Enum.count(range) > size + assert rem(Enum.count(range), size) == 0 + assert BlockFetcher.chunk_ranges([range], size) == [0..0, 1..1] + end + + test "with first < last with unfilled last chunk" do + range = 0..2 + size = 2 + + assert Enum.count(range) > size + refute rem(Enum.count(range), size) == 0 + assert BlockFetcher.chunk_ranges([range], size) == [0..1, 2..2] + end + + test "with first == last" do + range = 0..0 + size = 1 + + assert Enum.count(range) == size + assert BlockFetcher.chunk_ranges([range], size) == [range] + end + + test "with first > last in one chunk" do + range = 1..0 + size = 2 + + assert Enum.count(range) <= size + assert BlockFetcher.chunk_ranges([range], size) == [range] + end + + test "with first > last with filled last chunk" do + range = 1..0 + size = 1 + + assert Enum.count(range) > size + assert rem(Enum.count(range), size) == 0 + assert BlockFetcher.chunk_ranges([range], size) == [1..1, 0..0] + end + + test "with first > last with unfilled last chunk" do + range = 2..0 + size = 2 + + assert Enum.count(range) > size + refute rem(Enum.count(range), size) == 0 + assert BlockFetcher.chunk_ranges([range], size) == [2..1, 0..0] + end + end + defp capture_log_at_level(level, block) do logger_level_transaction(fn -> Logger.configure(level: level) From 7a530f2b9eee47b49041c9533e9c386e29bff22b Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 12 Jul 2018 14:19:36 -0500 Subject: [PATCH 02/14] Fix off-by-1 errors in chunk_ranges --- apps/indexer/lib/indexer/block_fetcher.ex | 26 +++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 726e2826dc..800cd8cc40 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -276,22 +276,30 @@ defmodule Indexer.BlockFetcher do @doc false def chunk_ranges(ranges, size) do - Enum.flat_map(ranges, fn - first..last = range when last - first <= size -> + Enum.flat_map(ranges, fn range -> + count = Enum.count(range) + + if count <= size do [range] + else + first..last = range - first..last -> first |> Stream.iterate(&(&1 + size)) - |> Enum.reduce_while([], fn - chunk_first, acc when chunk_first + size >= last -> - {:halt, [chunk_first..last | acc]} + |> Enum.reduce_while([], fn chunk_first, acc -> + next_chunk_first = chunk_first + size + full_chunk_last = next_chunk_first - 1 + + {action, chunk_last} = if full_chunk_last >= last do + {:halt, last} + else + {:cont, full_chunk_last} + end - chunk_first, acc -> - chunk_last = chunk_first + size - 1 - {:cont, [chunk_first..chunk_last | acc]} + {action, [chunk_first..chunk_last | acc]} end) |> Enum.reverse() + end end) end From 308879d2ffd517a55deccc0ca27dde12f1054474 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 12 Jul 2018 14:33:11 -0500 Subject: [PATCH 03/14] chunk_range --- apps/indexer/lib/indexer/block_fetcher.ex | 52 ++++++++++++----------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 800cd8cc40..ea1c1a31e4 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -276,31 +276,33 @@ defmodule Indexer.BlockFetcher do @doc false def chunk_ranges(ranges, size) do - Enum.flat_map(ranges, fn range -> - count = Enum.count(range) - - if count <= size do - [range] - else - first..last = range - - first - |> Stream.iterate(&(&1 + size)) - |> Enum.reduce_while([], fn chunk_first, acc -> - next_chunk_first = chunk_first + size - full_chunk_last = next_chunk_first - 1 - - {action, chunk_last} = if full_chunk_last >= last do - {:halt, last} - else - {:cont, full_chunk_last} - end - - {action, [chunk_first..chunk_last | acc]} - end) - |> Enum.reverse() - end - end) + Enum.flat_map(ranges, &chunk_range(&1, size)) + end + + defp chunk_range(range, size) do + count = Enum.count(range) + + if count <= size do + [range] + else + first..last = range + + first + |> Stream.iterate(&(&1 + size)) + |> Enum.reduce_while([], fn chunk_first, acc -> + next_chunk_first = chunk_first + size + full_chunk_last = next_chunk_first - 1 + + {action, chunk_last} = if full_chunk_last >= last do + {:halt, last} + else + {:cont, full_chunk_last} + end + + {action, [chunk_first..chunk_last | acc]} + end) + |> Enum.reverse() + end end defp realtime_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do From 3a9dac846d3e299522526c0f06360caff3e166cd Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 12 Jul 2018 14:35:15 -0500 Subject: [PATCH 04/14] chunk_range/3 --- apps/indexer/lib/indexer/block_fetcher.ex | 39 ++++++++++++----------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index ea1c1a31e4..663a05c35d 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -281,28 +281,31 @@ defmodule Indexer.BlockFetcher do defp chunk_range(range, size) do count = Enum.count(range) + chunk_range(range, count, size) + end - if count <= size do - [range] - else - first..last = range + defp chunk_range(range, count, size) when count <= size do + [range] + end - first - |> Stream.iterate(&(&1 + size)) - |> Enum.reduce_while([], fn chunk_first, acc -> - next_chunk_first = chunk_first + size - full_chunk_last = next_chunk_first - 1 + defp chunk_range(range, _, size) do + first..last = range - {action, chunk_last} = if full_chunk_last >= last do - {:halt, last} - else - {:cont, full_chunk_last} - end + first + |> Stream.iterate(&(&1 + size)) + |> Enum.reduce_while([], fn chunk_first, acc -> + next_chunk_first = chunk_first + size + full_chunk_last = next_chunk_first - 1 - {action, [chunk_first..chunk_last | acc]} - end) - |> Enum.reverse() - end + {action, chunk_last} = if full_chunk_last >= last do + {:halt, last} + else + {:cont, full_chunk_last} + end + + {action, [chunk_first..chunk_last | acc]} + end) + |> Enum.reverse() end defp realtime_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do From 9ee7ab64f58e6bcd4c751986983a3022efd9466f Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 12 Jul 2018 14:48:15 -0500 Subject: [PATCH 05/14] Fix chunk_range/3 for descending ranges --- apps/indexer/lib/indexer/block_fetcher.ex | 34 +++++++++++++++++------ 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 663a05c35d..6b8c14a5e4 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -288,20 +288,38 @@ defmodule Indexer.BlockFetcher do [range] end - defp chunk_range(range, _, size) do - first..last = range - + defp chunk_range(first..last, _, size) when first < last do first |> Stream.iterate(&(&1 + size)) |> Enum.reduce_while([], fn chunk_first, acc -> next_chunk_first = chunk_first + size full_chunk_last = next_chunk_first - 1 - {action, chunk_last} = if full_chunk_last >= last do - {:halt, last} - else - {:cont, full_chunk_last} - end + {action, chunk_last} = + if full_chunk_last >= last do + {:halt, last} + else + {:cont, full_chunk_last} + end + + {action, [chunk_first..chunk_last | acc]} + end) + |> Enum.reverse() + end + + defp chunk_range(first..last, _, size) when last < first do + first + |> Stream.iterate(&(&1 - size)) + |> Enum.reduce_while([], fn chunk_first, acc -> + next_chunk_first = chunk_first - size + full_chunk_last = next_chunk_first + 1 + + {action, chunk_last} = + if full_chunk_last <= last do + {:halt, last} + else + {:cont, full_chunk_last} + end {action, [chunk_first..chunk_last | acc]} end) From 2ca6ac9fa3412ca3999f1aca3ca51e01a6f4895e Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 12 Jul 2018 14:59:35 -0500 Subject: [PATCH 06/14] Unify chunk_range/3 when count > size --- apps/indexer/lib/indexer/block_fetcher.ex | 33 +++++++---------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 6b8c14a5e4..9eef297365 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -288,34 +288,18 @@ defmodule Indexer.BlockFetcher do [range] end - defp chunk_range(first..last, _, size) when first < last do - first - |> Stream.iterate(&(&1 + size)) - |> Enum.reduce_while([], fn chunk_first, acc -> - next_chunk_first = chunk_first + size - full_chunk_last = next_chunk_first - 1 - - {action, chunk_last} = - if full_chunk_last >= last do - {:halt, last} - else - {:cont, full_chunk_last} - end + defp chunk_range(first..last = range, _, size) do + sign = sign(range) + step = sign * size - {action, [chunk_first..chunk_last | acc]} - end) - |> Enum.reverse() - end - - defp chunk_range(first..last, _, size) when last < first do first - |> Stream.iterate(&(&1 - size)) + |> Stream.iterate(&(&1 + step)) |> Enum.reduce_while([], fn chunk_first, acc -> - next_chunk_first = chunk_first - size - full_chunk_last = next_chunk_first + 1 + next_chunk_first = chunk_first + step + full_chunk_last = next_chunk_first - sign {action, chunk_last} = - if full_chunk_last <= last do + if (sign > 0 and full_chunk_last >= last) or (sign < 0 and full_chunk_last <= last) do {:halt, last} else {:cont, full_chunk_last} @@ -326,6 +310,9 @@ defmodule Indexer.BlockFetcher do |> Enum.reverse() end + defp sign(first..last) when first <= last, do: 1 + defp sign(_first.._last), do: -1 + defp realtime_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) From d7e6403bb7ba50aad86e6dcddc366cdd443fa10e Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 12 Jul 2018 15:09:07 -0500 Subject: [PATCH 07/14] Calculate comparator outside of loop --- apps/indexer/lib/indexer/block_fetcher.ex | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 9eef297365..60ff859492 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -289,7 +289,12 @@ defmodule Indexer.BlockFetcher do end defp chunk_range(first..last = range, _, size) do - sign = sign(range) + {sign, comparator} = if first < last do + {1, &Kernel.>=/2} + else + {-1, &Kernel.<=/2} + end + step = sign * size first @@ -299,7 +304,7 @@ defmodule Indexer.BlockFetcher do full_chunk_last = next_chunk_first - sign {action, chunk_last} = - if (sign > 0 and full_chunk_last >= last) or (sign < 0 and full_chunk_last <= last) do + if comparator.(full_chunk_last, last) do {:halt, last} else {:cont, full_chunk_last} @@ -310,9 +315,6 @@ defmodule Indexer.BlockFetcher do |> Enum.reverse() end - defp sign(first..last) when first <= last, do: 1 - defp sign(_first.._last), do: -1 - defp realtime_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) From 15be5052191b04fefb0c9071850aa7291f85130f Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 16 Jul 2018 09:28:39 -0500 Subject: [PATCH 08/14] Move range chunking from BlockFetcher to Sequence Ensures that all callers benefit from chunking bug fixes, not just BlockFetcher. --- apps/indexer/lib/indexer/block_fetcher.ex | 49 +-------- apps/indexer/lib/indexer/sequence.ex | 65 +++++++++-- .../test/indexer/block_fetcher_test.exs | 62 ----------- apps/indexer/test/indexer/sequence_test.exs | 104 +++++++----------- 4 files changed, 101 insertions(+), 179 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 60ff859492..ebe105bec7 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -183,7 +183,7 @@ defmodule Indexer.BlockFetcher do defp genesis_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) - missing_ranges = missing_block_number_ranges(state, latest_block_number..0) + missing_ranges = Chain.missing_block_number_ranges(latest_block_number..0) count = Enum.count(missing_ranges) debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end) @@ -268,53 +268,6 @@ defmodule Indexer.BlockFetcher do |> InternalTransactionFetcher.async_fetch(10_000) end - defp missing_block_number_ranges(%{blocks_batch_size: blocks_batch_size}, range) do - range - |> Chain.missing_block_number_ranges() - |> chunk_ranges(blocks_batch_size) - end - - @doc false - def chunk_ranges(ranges, size) do - Enum.flat_map(ranges, &chunk_range(&1, size)) - end - - defp chunk_range(range, size) do - count = Enum.count(range) - chunk_range(range, count, size) - end - - defp chunk_range(range, count, size) when count <= size do - [range] - end - - defp chunk_range(first..last = range, _, size) do - {sign, comparator} = if first < last do - {1, &Kernel.>=/2} - else - {-1, &Kernel.<=/2} - end - - step = sign * size - - first - |> Stream.iterate(&(&1 + step)) - |> Enum.reduce_while([], fn chunk_first, acc -> - next_chunk_first = chunk_first + step - full_chunk_last = next_chunk_first - sign - - {action, chunk_last} = - if comparator.(full_chunk_last, last) do - {:halt, last} - else - {:cont, full_chunk_last} - end - - {action, [chunk_first..chunk_last | acc]} - end) - |> Enum.reverse() - end - defp realtime_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) diff --git a/apps/indexer/lib/indexer/sequence.ex b/apps/indexer/lib/indexer/sequence.ex index 60d3cfe5c7..898384701e 100644 --- a/apps/indexer/lib/indexer/sequence.ex +++ b/apps/indexer/lib/indexer/sequence.ex @@ -98,15 +98,17 @@ defmodule Indexer.Sequence do @spec init(options) :: {:ok, t} def init(named_arguments) when is_list(named_arguments) do Process.flag(:trap_exit, true) + step = Keyword.fetch!(named_arguments, :step) + + initial_queue = :queue.new() + prefix = Keyword.get(named_arguments, :prefix, []) + queue = queue_chunked_ranges(initial_queue, step, prefix) {:ok, %__MODULE__{ - queue: - named_arguments - |> Keyword.get(:prefix, []) - |> :queue.from_list(), + queue: queue, current: Keyword.fetch!(named_arguments, :first), - step: Keyword.fetch!(named_arguments, :step) + step: step }} end @@ -118,8 +120,8 @@ defmodule Indexer.Sequence do end @spec handle_call({:inject_range, Range.t()}, GenServer.from(), t()) :: {:reply, mode(), t()} - def handle_call({:inject_range, _first.._last = range}, _from, %__MODULE__{queue: queue} = state) do - {:reply, :ok, %__MODULE__{state | queue: :queue.in(range, queue)}} + def handle_call({:inject_range, _first.._last = range}, _from, %__MODULE__{queue: queue, step: step} = state) do + {:reply, :ok, %__MODULE__{state | queue: queue_chunked_range(queue, step, range)}} end @spec handle_call(:pop, GenServer.from(), t()) :: {:reply, Range.t() | :halt, t()} @@ -146,6 +148,55 @@ defmodule Indexer.Sequence do {:reply, reply, new_state} end + defp queue_chunked_range(queue, step, _.._ = range) when is_integer(step) do + queue_chunked_ranges(queue, step, [range]) + end + + defp queue_chunked_ranges(queue, step, ranges) when is_integer(step) and is_list(ranges) do + reduce_chunked_ranges(ranges, abs(step), queue, &:queue.in/2) + end + + defp reduce_chunked_ranges(ranges, size, initial, reducer) + when is_list(ranges) and is_integer(size) and size > 0 and is_function(reducer, 2) do + Enum.reduce(ranges, initial, &reduce_chunked_range(&1, size, &2, reducer)) + end + + defp reduce_chunked_range(_.._ = range, size, initial, reducer) do + count = Enum.count(range) + reduce_chunked_range(range, count, size, initial, reducer) + end + + defp reduce_chunked_range(_.._ = range, count, size, initial, reducer) when count <= size do + reducer.(range, initial) + end + + defp reduce_chunked_range(first..last = range, _, size, initial, reducer) do + {sign, comparator} = + if first < last do + {1, &Kernel.>=/2} + else + {-1, &Kernel.<=/2} + end + + step = sign * size + + first + |> Stream.iterate(&(&1 + step)) + |> Enum.reduce_while(initial, fn chunk_first, acc -> + next_chunk_first = chunk_first + step + full_chunk_last = next_chunk_first - sign + + {action, chunk_last} = + if comparator.(full_chunk_last, last) do + {:halt, last} + else + {:cont, full_chunk_last} + end + + {action, reducer.(chunk_first..chunk_last, acc)} + end) + end + @spec sign(neg_integer()) :: -1 defp sign(integer) when integer < 0, do: -1 diff --git a/apps/indexer/test/indexer/block_fetcher_test.exs b/apps/indexer/test/indexer/block_fetcher_test.exs index 36eafce0d3..d0d26f27d4 100644 --- a/apps/indexer/test/indexer/block_fetcher_test.exs +++ b/apps/indexer/test/indexer/block_fetcher_test.exs @@ -829,68 +829,6 @@ defmodule Indexer.BlockFetcherTest do end end - describe "chunk_ranges/2" do - test "with first < last in one chunk" do - range = 0..1 - size = 2 - - assert Enum.count(range) <= size - assert BlockFetcher.chunk_ranges([range], size) == [range] - end - - test "with first < last with filled last chunk" do - range = 0..1 - size = 1 - - assert Enum.count(range) > size - assert rem(Enum.count(range), size) == 0 - assert BlockFetcher.chunk_ranges([range], size) == [0..0, 1..1] - end - - test "with first < last with unfilled last chunk" do - range = 0..2 - size = 2 - - assert Enum.count(range) > size - refute rem(Enum.count(range), size) == 0 - assert BlockFetcher.chunk_ranges([range], size) == [0..1, 2..2] - end - - test "with first == last" do - range = 0..0 - size = 1 - - assert Enum.count(range) == size - assert BlockFetcher.chunk_ranges([range], size) == [range] - end - - test "with first > last in one chunk" do - range = 1..0 - size = 2 - - assert Enum.count(range) <= size - assert BlockFetcher.chunk_ranges([range], size) == [range] - end - - test "with first > last with filled last chunk" do - range = 1..0 - size = 1 - - assert Enum.count(range) > size - assert rem(Enum.count(range), size) == 0 - assert BlockFetcher.chunk_ranges([range], size) == [1..1, 0..0] - end - - test "with first > last with unfilled last chunk" do - range = 2..0 - size = 2 - - assert Enum.count(range) > size - refute rem(Enum.count(range), size) == 0 - assert BlockFetcher.chunk_ranges([range], size) == [2..1, 0..0] - end - end - defp capture_log_at_level(level, block) do logger_level_transaction(fn -> Logger.configure(level: level) diff --git a/apps/indexer/test/indexer/sequence_test.exs b/apps/indexer/test/indexer/sequence_test.exs index 27ca393c5c..35061d5777 100644 --- a/apps/indexer/test/indexer/sequence_test.exs +++ b/apps/indexer/test/indexer/sequence_test.exs @@ -7,12 +7,12 @@ defmodule Indexer.SequenceTest do test "sets state" do {:ok, pid} = Sequence.start_link(prefix: [1..4], first: 5, step: 1) - assert state(pid) == %Sequence{ - current: 5, - mode: :infinite, - queue: {[1..4], []}, - step: 1 - } + assert Sequence.pop(pid) == 1..1 + assert Sequence.pop(pid) == 2..2 + assert Sequence.pop(pid) == 3..3 + assert Sequence.pop(pid) == 4..4 + # infinite sequence takes over + assert Sequence.pop(pid) == 5..5 end # Regression test for https://github.com/poanetwork/poa-explorer/issues/387 @@ -36,20 +36,38 @@ defmodule Indexer.SequenceTest do assert :ok = Sequence.inject_range(pid, 3..4) - assert state(pid) == %Sequence{ - current: 5, - mode: :infinite, - queue: {[3..4], [1..2]}, - step: 1 - } + assert Sequence.pop(pid) == 1..1 + assert Sequence.pop(pid) == 2..2 + assert Sequence.pop(pid) == 3..3 + assert Sequence.pop(pid) == 4..4 + # infinite sequence takes over + assert Sequence.pop(pid) == 5..5 end - test "cap" do - {:ok, pid} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) + describe "cap/1" do + test "returns previous mode" do + {:ok, pid} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) + + assert Sequence.cap(pid) == :infinite + assert Sequence.cap(pid) == :finite + end + + test "disables infinite mode that uses first and step" do + {:ok, late_capped} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) - assert :infinite = Sequence.cap(pid) - assert state(pid).mode == :finite - assert :finite = Sequence.cap(pid) + assert Sequence.pop(late_capped) == 1..1 + assert Sequence.pop(late_capped) == 2..2 + assert Sequence.pop(late_capped) == 5..5 + assert Sequence.cap(late_capped) == :infinite + assert Sequence.pop(late_capped) == :halt + + {:ok, immediately_capped} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) + + assert Sequence.cap(immediately_capped) == :infinite + assert Sequence.pop(immediately_capped) == 1..1 + assert Sequence.pop(immediately_capped) == 2..2 + assert Sequence.pop(immediately_capped) == :halt + end end describe "pop" do @@ -58,67 +76,29 @@ defmodule Indexer.SequenceTest do assert 1..4 == Sequence.pop(pid) - assert state(pid) == %Sequence{ - current: 99, - mode: :infinite, - queue: {[], [6..9]}, - step: 5 - } - - :infinite = Sequence.cap(pid) + assert :infinite = Sequence.cap(pid) assert 6..9 == Sequence.pop(pid) - - assert state(pid) == %Sequence{ - current: 99, - mode: :finite, - queue: {[], []}, - step: 5 - } end - test "with an empty queue in infinite mode" do + test "with an empty queue in infinite mode returns range from next step from current" do {:ok, pid} = Sequence.start_link(first: 5, step: 5) assert 5..9 == Sequence.pop(pid) - - assert state(pid) == %Sequence{ - current: 10, - mode: :infinite, - queue: {[], []}, - step: 5 - } end - test "with an empty queue in infinit mode with negative step" do + test "with an empty queue in infinite mode with negative step does not go past 0" do {:ok, pid} = Sequence.start_link(first: 4, step: -5) - assert 4..0 == Sequence.pop(pid) - - assert state(pid) == %Sequence{ - current: 0, - mode: :finite, - queue: {[], []}, - step: -5 - } + assert Sequence.pop(pid) == 4..0 + assert Sequence.pop(pid) == :halt end - test "with an empty queue in finite mode" do + test "with an empty queue in finite mode halts immediately" do {:ok, pid} = Sequence.start_link(first: 5, step: 5) :infinite = Sequence.cap(pid) - assert :halt == Sequence.pop(pid) - - assert state(pid) == %Sequence{ - current: 5, - mode: :finite, - queue: {[], []}, - step: 5 - } + assert Sequence.pop(pid) == :halt end end - - defp state(sequence) do - :sys.get_state(sequence) - end end From a3d51f5e1aa0a1416f4e51c2560f6dff40bfb41d Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 16 Jul 2018 10:21:16 -0500 Subject: [PATCH 09/14] Raise error if `:first` would cause retraversal --- apps/indexer/lib/indexer/block_fetcher.ex | 2 +- apps/indexer/lib/indexer/sequence.ex | 46 ++++++++++++++++----- apps/indexer/test/indexer/sequence_test.exs | 5 +++ 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index ebe105bec7..db5dfaa188 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -189,7 +189,7 @@ defmodule Indexer.BlockFetcher do debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end) {:ok, seq} = - Sequence.start_link(prefix: missing_ranges, first: latest_block_number, step: -1 * state.blocks_batch_size) + Sequence.start_link(prefix: missing_ranges, first: 0, step: -1 * state.blocks_batch_size) stream_import(state, seq, max_concurrency: state.blocks_concurrency) end diff --git a/apps/indexer/lib/indexer/sequence.ex b/apps/indexer/lib/indexer/sequence.ex index 898384701e..8148562f8b 100644 --- a/apps/indexer/lib/indexer/sequence.ex +++ b/apps/indexer/lib/indexer/sequence.ex @@ -96,20 +96,19 @@ defmodule Indexer.Sequence do @impl GenServer @spec init(options) :: {:ok, t} - def init(named_arguments) when is_list(named_arguments) do + def init(options) when is_list(options) do Process.flag(:trap_exit, true) - step = Keyword.fetch!(named_arguments, :step) - initial_queue = :queue.new() - prefix = Keyword.get(named_arguments, :prefix, []) - queue = queue_chunked_ranges(initial_queue, step, prefix) + case validate_options(options) do + {:ok, %{prefix: prefix, first: first, step: step}} -> + initial_queue = :queue.new() + queue = queue_chunked_ranges(initial_queue, step, prefix) - {:ok, - %__MODULE__{ - queue: queue, - current: Keyword.fetch!(named_arguments, :first), - step: step - }} + {:ok, %__MODULE__{queue: queue, current: first, step: step}} + + {:error, reason} -> + {:stop, reason} + end end @impl GenServer @@ -202,4 +201,29 @@ defmodule Indexer.Sequence do @spec sign(non_neg_integer()) :: 1 defp sign(_), do: 1 + + defp validate_options(options) do + step = Keyword.fetch!(options, :step) + prefix = Keyword.get(options, :prefix, []) + first = Keyword.fetch!(options, :first) + + validated_options = %{prefix: prefix, first: first, step: step} + + if step < 0 do + minimum_in_prefix = + prefix + |> Stream.map(&Enum.min/1) + |> Enum.min(fn -> first end) + + if first > minimum_in_prefix do + {:error, + ":first (#{first}) is greater than the minimum in `:prefix` (#{minimum_in_prefix}), " <> + "which will cause retraversal."} + else + {:ok, validated_options} + end + else + {:ok, validated_options} + end + end end diff --git a/apps/indexer/test/indexer/sequence_test.exs b/apps/indexer/test/indexer/sequence_test.exs index 35061d5777..c8fdf8b8ab 100644 --- a/apps/indexer/test/indexer/sequence_test.exs +++ b/apps/indexer/test/indexer/sequence_test.exs @@ -29,6 +29,11 @@ defmodule Indexer.SequenceTest do # noproc when the sequence has already died by the time monitor is called assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc] end + + test "with negative :step with :first greater than minimum in :prefix returns error" do + assert {:error, ":first (1) is greater than the minimum in `:prefix` (0), which will cause retraversal."} = + Sequence.start_link(prefix: [1..0], first: 1, step: -1) + end end test "inject_range" do From ded84c963387bd42810bcf37ab139759fd04d9ab Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 16 Jul 2018 10:26:46 -0500 Subject: [PATCH 10/14] Cap genesis sequence immediately Genesis sequence should only fetch the missing ranges --- apps/indexer/lib/indexer/block_fetcher.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index db5dfaa188..1073236389 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -188,8 +188,8 @@ defmodule Indexer.BlockFetcher do debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end) - {:ok, seq} = - Sequence.start_link(prefix: missing_ranges, first: 0, step: -1 * state.blocks_batch_size) + {:ok, seq} = Sequence.start_link(prefix: missing_ranges, first: 0, step: -1 * state.blocks_batch_size) + Sequence.cap(seq) stream_import(state, seq, max_concurrency: state.blocks_concurrency) end From c409d637208dbf8c0e7b274e9ab3f8bd4f57bfc0 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 16 Jul 2018 11:22:29 -0500 Subject: [PATCH 11/14] Allow first to be 0 It can be `0` when in finite mode, so allow it to be set to `0` to indicate that ahead of time. --- apps/indexer/lib/indexer/sequence.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/indexer/lib/indexer/sequence.ex b/apps/indexer/lib/indexer/sequence.ex index 8148562f8b..3f20a1102d 100644 --- a/apps/indexer/lib/indexer/sequence.ex +++ b/apps/indexer/lib/indexer/sequence.ex @@ -20,9 +20,9 @@ defmodule Indexer.Sequence do The first number in the sequence to start at once the `t:prefix/0` ranges and any `t:Range.t/0`s injected with `inject_range/2` are all consumed. """ - @type first :: pos_integer() + @type first :: non_neg_integer() - @typep first_named_argument :: {:first, pos_integer()} + @typep first_named_argument :: {:first, first} @type mode :: :infinite | :finite From 94d8267799958109b49764d561dcb3c4f1619e13 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 16 Jul 2018 14:47:09 -0500 Subject: [PATCH 12/14] Don't allow Sequence to be started with a mix of ranges and infinite start We either use Sequence with a fixed set of ranges in finite mode when getting the missing blocks down to genesis or in infinite mode when using it for the realtime index. We can't completely separate the support for queuing ranges for the realtime index as that handles retries, but we can simplify the validation considerably by disallowing ranges and first to `start_link`. --- apps/indexer/lib/indexer/block_fetcher.ex | 6 +- apps/indexer/lib/indexer/sequence.ex | 113 ++++++++++--------- apps/indexer/test/indexer/sequence_test.exs | 118 ++++++++++++++------ 3 files changed, 148 insertions(+), 89 deletions(-) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 1073236389..e782062daa 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -188,7 +188,7 @@ defmodule Indexer.BlockFetcher do debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end) - {:ok, seq} = Sequence.start_link(prefix: missing_ranges, first: 0, step: -1 * state.blocks_batch_size) + {:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size) Sequence.cap(seq) stream_import(state, seq, max_concurrency: state.blocks_concurrency) @@ -214,7 +214,7 @@ defmodule Indexer.BlockFetcher do "failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying" end) - :ok = Sequence.inject_range(seq, range) + :ok = Sequence.queue(seq, range) error end @@ -319,7 +319,7 @@ defmodule Indexer.BlockFetcher do "failed to fetch #{step} for blocks #{first} - #{last}: #{inspect(reason)}. Retrying block range." end) - :ok = Sequence.inject_range(seq, range) + :ok = Sequence.queue(seq, range) {:error, step, reason} end diff --git a/apps/indexer/lib/indexer/sequence.ex b/apps/indexer/lib/indexer/sequence.ex index 3f20a1102d..29adaaf4f9 100644 --- a/apps/indexer/lib/indexer/sequence.ex +++ b/apps/indexer/lib/indexer/sequence.ex @@ -6,25 +6,27 @@ defmodule Indexer.Sequence do @enforce_keys ~w(current queue step)a defstruct current: nil, queue: nil, - step: nil, - mode: :infinite + step: nil @typedoc """ - The initial ranges to stream from the `t:Stream.t/` returned from `build_stream/1` + The ranges to stream from the `t:Stream.t/` returned from `build_stream/1` """ - @type prefix :: [Range.t()] + @type ranges :: [Range.t()] - @typep prefix_option :: {:prefix, prefix} + @typep ranges_option :: {:ranges, ranges} @typedoc """ - The first number in the sequence to start at once the `t:prefix/0` ranges and any `t:Range.t/0`s injected with - `inject_range/2` are all consumed. + The first number in the sequence to start for infinite sequences. """ - @type first :: non_neg_integer() + @type first :: integer() - @typep first_named_argument :: {:first, first} + @typep first_option :: {:first, first} - @type mode :: :infinite | :finite + @typedoc """ + * `:finite` - only popping ranges from `queue`. + * `:infinite` - generating new ranges from `current` and `step` when `queue` is empty. + """ + @type mode :: :finite | :infinite @typedoc """ The size of `t:Range.t/0` to construct based on the `t:first_named_argument/0` or its current value when all @@ -34,17 +36,25 @@ defmodule Indexer.Sequence do @typep step_named_argument :: {:step, step} - @type options :: [prefix_option | first_named_argument | step_named_argument] + @type options :: [ranges_option | first_option | step_named_argument] @typep t :: %__MODULE__{ - current: pos_integer(), queue: :queue.queue(Range.t()), - step: step(), - mode: mode() + current: nil | integer(), + step: step() } @doc """ Starts a process for managing a block sequence. + + Infinite sequence + + Indexer.Sequence.start_link(first: 100, step: 10) + + Finite sequence + + Indexer.Sequence.start_link(ranges: [100..0]) + """ @spec start_link(options) :: GenServer.on_start() def start_link(options) when is_list(options) do @@ -69,9 +79,7 @@ defmodule Indexer.Sequence do end @doc """ - Changes the mode for the sequencer to signal continuous streaming mode. - - Returns the previous `t:mode/0`. + Changes the mode for the sequence to finite. """ @spec cap(pid()) :: mode def cap(sequence) when is_pid(sequence) do @@ -79,11 +87,11 @@ defmodule Indexer.Sequence do end @doc """ - Adds a range of block numbers to the sequence. + Adds a range of block numbers to the end of sequence. """ - @spec inject_range(pid(), Range.t()) :: :ok - def inject_range(sequence, _first.._last = range) when is_pid(sequence) do - GenServer.call(sequence, {:inject_range, range}) + @spec queue(pid(), Range.t()) :: :ok + def queue(sequence, _first.._last = range) when is_pid(sequence) do + GenServer.call(sequence, {:queue, range}) end @doc """ @@ -100,9 +108,9 @@ defmodule Indexer.Sequence do Process.flag(:trap_exit, true) case validate_options(options) do - {:ok, %{prefix: prefix, first: first, step: step}} -> + {:ok, %{ranges: ranges, first: first, step: step}} -> initial_queue = :queue.new() - queue = queue_chunked_ranges(initial_queue, step, prefix) + queue = queue_chunked_ranges(initial_queue, step, ranges) {:ok, %__MODULE__{queue: queue, current: first, step: step}} @@ -113,35 +121,40 @@ defmodule Indexer.Sequence do @impl GenServer - @spec handle_call(:cap, GenServer.from(), t()) :: {:reply, mode(), %__MODULE__{mode: :infinite}} - def handle_call(:cap, _from, %__MODULE__{mode: mode} = state) do - {:reply, mode, %__MODULE__{state | mode: :finite}} + @spec handle_call(:cap, GenServer.from(), %__MODULE__{current: nil}) :: {:reply, :finite, %__MODULE__{current: nil}} + @spec handle_call(:cap, GenServer.from(), %__MODULE__{current: integer()}) :: + {:reply, :infinite, %__MODULE__{current: nil}} + def handle_call(:cap, _from, %__MODULE__{current: current} = state) do + mode = + case current do + nil -> :finite + _ -> :infinite + end + + {:reply, mode, %__MODULE__{state | current: nil}} end - @spec handle_call({:inject_range, Range.t()}, GenServer.from(), t()) :: {:reply, mode(), t()} - def handle_call({:inject_range, _first.._last = range}, _from, %__MODULE__{queue: queue, step: step} = state) do + @spec handle_call({:queue, Range.t()}, GenServer.from(), t()) :: {:reply, :ok, t()} + def handle_call({:queue, _first.._last = range}, _from, %__MODULE__{queue: queue, step: step} = state) do {:reply, :ok, %__MODULE__{state | queue: queue_chunked_range(queue, step, range)}} end @spec handle_call(:pop, GenServer.from(), t()) :: {:reply, Range.t() | :halt, t()} - def handle_call(:pop, _from, %__MODULE__{mode: mode, queue: queue, current: current, step: step} = state) do + def handle_call(:pop, _from, %__MODULE__{queue: queue, current: current, step: step} = state) do {reply, new_state} = - case {mode, :queue.out(queue)} do + case {current, :queue.out(queue)} do {_, {{:value, range}, new_queue}} -> {range, %__MODULE__{state | queue: new_queue}} - {:infinite, {:empty, new_queue}} -> - case current + step do - negative when negative < 0 -> - {current..0, %__MODULE__{state | current: 0, mode: :finite, queue: new_queue}} + {nil, {:empty, new_queue}} -> + {:halt, %__MODULE__{state | queue: new_queue}} + {_, {:empty, new_queue}} -> + case current + step do new_current -> last = new_current - sign(step) {current..last, %__MODULE__{state | current: new_current, queue: new_queue}} end - - {:finite, {:empty, new_queue}} -> - {:halt, %__MODULE__{state | queue: new_queue}} end {:reply, reply, new_state} @@ -204,26 +217,20 @@ defmodule Indexer.Sequence do defp validate_options(options) do step = Keyword.fetch!(options, :step) - prefix = Keyword.get(options, :prefix, []) - first = Keyword.fetch!(options, :first) - validated_options = %{prefix: prefix, first: first, step: step} + case {Keyword.fetch(options, :ranges), Keyword.fetch(options, :first)} do + {:error, {:ok, first}} -> + {:ok, %{ranges: [], first: first, step: step}} - if step < 0 do - minimum_in_prefix = - prefix - |> Stream.map(&Enum.min/1) - |> Enum.min(fn -> first end) + {{:ok, ranges}, :error} -> + {:ok, %{ranges: ranges, first: nil, step: step}} - if first > minimum_in_prefix do + {{:ok, _}, {:ok, _}} -> {:error, - ":first (#{first}) is greater than the minimum in `:prefix` (#{minimum_in_prefix}), " <> - "which will cause retraversal."} - else - {:ok, validated_options} - end - else - {:ok, validated_options} + ":ranges and :first cannot be set at the same time as :ranges is for :finite mode while :first is for :infinite mode"} + + {:error, :error} -> + {:error, "either :ranges or :first must be set"} end end end diff --git a/apps/indexer/test/indexer/sequence_test.exs b/apps/indexer/test/indexer/sequence_test.exs index c8fdf8b8ab..1c261a880c 100644 --- a/apps/indexer/test/indexer/sequence_test.exs +++ b/apps/indexer/test/indexer/sequence_test.exs @@ -4,15 +4,59 @@ defmodule Indexer.SequenceTest do alias Indexer.Sequence describe "start_link/1" do - test "sets state" do - {:ok, pid} = Sequence.start_link(prefix: [1..4], first: 5, step: 1) + test "without :ranges with :first with positive step pops infinitely" do + {:ok, ascending} = Sequence.start_link(first: 5, step: 1) + + assert Sequence.pop(ascending) == 5..5 + assert Sequence.pop(ascending) == 6..6 + end + + test "without :ranges with :first pop with negative step pops past 0" do + {:ok, descending} = Sequence.start_link(first: 1, step: -1) + + assert Sequence.pop(descending) == 1..1 + assert Sequence.pop(descending) == 0..0 + assert Sequence.pop(descending) == -1..-1 + end + + test "without :ranges without :first returns error" do + {child_pid, child_ref} = + spawn_monitor(fn -> + Sequence.start_link(step: -1) + Process.sleep(5_000) + end) + + assert_receive {:DOWN, ^child_ref, :process, ^child_pid, "either :ranges or :first must be set"} + end + + test "with ranges without :first" do + {:ok, pid} = Sequence.start_link(ranges: [1..4], step: 1) assert Sequence.pop(pid) == 1..1 assert Sequence.pop(pid) == 2..2 assert Sequence.pop(pid) == 3..3 assert Sequence.pop(pid) == 4..4 - # infinite sequence takes over - assert Sequence.pop(pid) == 5..5 + assert Sequence.pop(pid) == :halt + end + + test "with :ranges with :first returns error" do + {child_pid, child_ref} = + spawn_monitor(fn -> + Sequence.start_link(ranges: [1..0], first: 1, step: -1) + Process.sleep(5_000) + end) + + assert_receive {:DOWN, ^child_ref, :process, ^child_pid, + ":ranges and :first cannot be set at the same time" <> + " as :ranges is for :finite mode while :first is for :infinite mode"} + end + + test "with 0 first with negative step does not return 0 twice" do + {:ok, pid} = Sequence.start_link(ranges: [1..0], step: -1) + + assert Sequence.pop(pid) == 1..1 + assert Sequence.pop(pid) == 0..0 + assert Sequence.pop(pid) == :halt end # Regression test for https://github.com/poanetwork/poa-explorer/issues/387 @@ -29,61 +73,69 @@ defmodule Indexer.SequenceTest do # noproc when the sequence has already died by the time monitor is called assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc] end + end + + describe "queue/2" do + test "with finite mode range is chunked" do + {:ok, pid} = Sequence.start_link(ranges: [1..0], step: -1) + + assert Sequence.pop(pid) == 1..1 + assert Sequence.pop(pid) == 0..0 + + assert Sequence.queue(pid, 1..0) == :ok - test "with negative :step with :first greater than minimum in :prefix returns error" do - assert {:error, ":first (1) is greater than the minimum in `:prefix` (0), which will cause retraversal."} = - Sequence.start_link(prefix: [1..0], first: 1, step: -1) + assert Sequence.pop(pid) == 1..1 + assert Sequence.pop(pid) == 0..0 + assert Sequence.pop(pid) == :halt + assert Sequence.pop(pid) == :halt end - end - test "inject_range" do - {:ok, pid} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) + test "with infinite mode range is chunked and is returned prior to calculated ranges" do + {:ok, pid} = Sequence.start_link(first: 5, step: 1) - assert :ok = Sequence.inject_range(pid, 3..4) + assert :ok = Sequence.queue(pid, 3..4) - assert Sequence.pop(pid) == 1..1 - assert Sequence.pop(pid) == 2..2 - assert Sequence.pop(pid) == 3..3 - assert Sequence.pop(pid) == 4..4 - # infinite sequence takes over - assert Sequence.pop(pid) == 5..5 + assert Sequence.pop(pid) == 3..3 + assert Sequence.pop(pid) == 4..4 + # infinite sequence takes over + assert Sequence.pop(pid) == 5..5 + assert Sequence.pop(pid) == 6..6 + end end describe "cap/1" do test "returns previous mode" do - {:ok, pid} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) + {:ok, pid} = Sequence.start_link(first: 5, step: 1) assert Sequence.cap(pid) == :infinite assert Sequence.cap(pid) == :finite end test "disables infinite mode that uses first and step" do - {:ok, late_capped} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) + {:ok, late_capped} = Sequence.start_link(first: 5, step: 1) - assert Sequence.pop(late_capped) == 1..1 - assert Sequence.pop(late_capped) == 2..2 assert Sequence.pop(late_capped) == 5..5 + assert Sequence.pop(late_capped) == 6..6 + assert Sequence.queue(late_capped, 5..5) == :ok assert Sequence.cap(late_capped) == :infinite + assert Sequence.pop(late_capped) == 5..5 assert Sequence.pop(late_capped) == :halt - {:ok, immediately_capped} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) + {:ok, immediately_capped} = Sequence.start_link(first: 5, step: 1) assert Sequence.cap(immediately_capped) == :infinite - assert Sequence.pop(immediately_capped) == 1..1 - assert Sequence.pop(immediately_capped) == 2..2 assert Sequence.pop(immediately_capped) == :halt end end describe "pop" do - test "with a non-empty queue in finite and infinite modes" do - {:ok, pid} = Sequence.start_link(prefix: [1..4, 6..9], first: 99, step: 5) + test "with a non-empty queue in finite mode" do + {:ok, pid} = Sequence.start_link(ranges: [1..4, 6..9], step: 5) - assert 1..4 == Sequence.pop(pid) - - assert :infinite = Sequence.cap(pid) - - assert 6..9 == Sequence.pop(pid) + assert Sequence.pop(pid) == 1..4 + assert Sequence.pop(pid) == 6..9 + assert Sequence.pop(pid) == :halt + assert Sequence.pop(pid) == :halt end test "with an empty queue in infinite mode returns range from next step from current" do @@ -92,11 +144,11 @@ defmodule Indexer.SequenceTest do assert 5..9 == Sequence.pop(pid) end - test "with an empty queue in infinite mode with negative step does not go past 0" do + test "with an empty queue in infinite mode with negative step goes past 0" do {:ok, pid} = Sequence.start_link(first: 4, step: -5) assert Sequence.pop(pid) == 4..0 - assert Sequence.pop(pid) == :halt + assert Sequence.pop(pid) == -1..-5 end test "with an empty queue in finite mode halts immediately" do From 105ce9b6139e5fbb971bbba172126dde1d7134da Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 17 Jul 2018 10:53:36 -0500 Subject: [PATCH 13/14] Only allow positive step for infinite sequences --- apps/indexer/lib/indexer/sequence.ex | 16 ++++++++-------- apps/indexer/test/indexer/sequence_test.exs | 19 +++++++------------ 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/apps/indexer/lib/indexer/sequence.ex b/apps/indexer/lib/indexer/sequence.ex index 29adaaf4f9..90e3bb2d78 100644 --- a/apps/indexer/lib/indexer/sequence.ex +++ b/apps/indexer/lib/indexer/sequence.ex @@ -152,7 +152,7 @@ defmodule Indexer.Sequence do {_, {:empty, new_queue}} -> case current + step do new_current -> - last = new_current - sign(step) + last = new_current - 1 {current..last, %__MODULE__{state | current: new_current, queue: new_queue}} end end @@ -209,18 +209,18 @@ defmodule Indexer.Sequence do end) end - @spec sign(neg_integer()) :: -1 - defp sign(integer) when integer < 0, do: -1 - - @spec sign(non_neg_integer()) :: 1 - defp sign(_), do: 1 - defp validate_options(options) do step = Keyword.fetch!(options, :step) case {Keyword.fetch(options, :ranges), Keyword.fetch(options, :first)} do {:error, {:ok, first}} -> - {:ok, %{ranges: [], first: first, step: step}} + case step do + pos_integer when is_integer(pos_integer) and pos_integer > 0 -> + {:ok, %{ranges: [], first: first, step: step}} + + _ -> + {:error, ":step must be a positive integer for infinite sequences"} + end {{:ok, ranges}, :error} -> {:ok, %{ranges: ranges, first: nil, step: step}} diff --git a/apps/indexer/test/indexer/sequence_test.exs b/apps/indexer/test/indexer/sequence_test.exs index 1c261a880c..97205b9c18 100644 --- a/apps/indexer/test/indexer/sequence_test.exs +++ b/apps/indexer/test/indexer/sequence_test.exs @@ -11,12 +11,14 @@ defmodule Indexer.SequenceTest do assert Sequence.pop(ascending) == 6..6 end - test "without :ranges with :first pop with negative step pops past 0" do - {:ok, descending} = Sequence.start_link(first: 1, step: -1) + test "without :ranges with :first with negative :step is error" do + {child_pid, child_ref} = + spawn_monitor(fn -> + Sequence.start_link(first: 1, step: -1) + Process.sleep(5_000) + end) - assert Sequence.pop(descending) == 1..1 - assert Sequence.pop(descending) == 0..0 - assert Sequence.pop(descending) == -1..-1 + assert_receive {:DOWN, ^child_ref, :process, ^child_pid, ":step must be a positive integer for infinite sequences"} end test "without :ranges without :first returns error" do @@ -144,13 +146,6 @@ defmodule Indexer.SequenceTest do assert 5..9 == Sequence.pop(pid) end - test "with an empty queue in infinite mode with negative step goes past 0" do - {:ok, pid} = Sequence.start_link(first: 4, step: -5) - - assert Sequence.pop(pid) == 4..0 - assert Sequence.pop(pid) == -1..-5 - end - test "with an empty queue in finite mode halts immediately" do {:ok, pid} = Sequence.start_link(first: 5, step: 5) :infinite = Sequence.cap(pid) From 5b01cd396b904655c78caee78e42e5d33bf18a4e Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 17 Jul 2018 12:23:12 -0500 Subject: [PATCH 14/14] Check that ranges are in same direction as step --- apps/indexer/lib/indexer/sequence.ex | 101 +++++++++++++------- apps/indexer/test/indexer/sequence_test.exs | 43 ++++++++- 2 files changed, 108 insertions(+), 36 deletions(-) diff --git a/apps/indexer/lib/indexer/sequence.ex b/apps/indexer/lib/indexer/sequence.ex index 90e3bb2d78..3ce3773a78 100644 --- a/apps/indexer/lib/indexer/sequence.ex +++ b/apps/indexer/lib/indexer/sequence.ex @@ -107,13 +107,11 @@ defmodule Indexer.Sequence do def init(options) when is_list(options) do Process.flag(:trap_exit, true) - case validate_options(options) do - {:ok, %{ranges: ranges, first: first, step: step}} -> - initial_queue = :queue.new() - queue = queue_chunked_ranges(initial_queue, step, ranges) - - {:ok, %__MODULE__{queue: queue, current: first, step: step}} - + with {:ok, %{ranges: ranges, first: first, step: step}} <- validate_options(options), + initial_queue = :queue.new(), + {:ok, queue} <- queue_chunked_ranges(initial_queue, step, ranges) do + {:ok, %__MODULE__{queue: queue, current: first, step: step}} + else {:error, reason} -> {:stop, reason} end @@ -134,9 +132,15 @@ defmodule Indexer.Sequence do {:reply, mode, %__MODULE__{state | current: nil}} end - @spec handle_call({:queue, Range.t()}, GenServer.from(), t()) :: {:reply, :ok, t()} + @spec handle_call({:queue, Range.t()}, GenServer.from(), t()) :: {:reply, :ok | {:error, String.t()}, t()} def handle_call({:queue, _first.._last = range}, _from, %__MODULE__{queue: queue, step: step} = state) do - {:reply, :ok, %__MODULE__{state | queue: queue_chunked_range(queue, step, range)}} + case queue_chunked_range(queue, step, range) do + {:ok, updated_queue} -> + {:reply, :ok, %__MODULE__{state | queue: updated_queue}} + + {:error, _} = error -> + {:reply, error, state} + end end @spec handle_call(:pop, GenServer.from(), t()) :: {:reply, Range.t() | :halt, t()} @@ -160,53 +164,80 @@ defmodule Indexer.Sequence do {:reply, reply, new_state} end + @spec queue_chunked_range(:queue.queue(Range.t()), step, Range.t()) :: + {:ok, :queue.queue(Range.t())} | {:error, reason :: String.t()} defp queue_chunked_range(queue, step, _.._ = range) when is_integer(step) do - queue_chunked_ranges(queue, step, [range]) + with {:error, [reason]} <- queue_chunked_ranges(queue, step, [range]) do + {:error, reason} + end end + @spec queue_chunked_range(:queue.queue(Range.t()), step, [Range.t()]) :: + {:ok, :queue.queue(Range.t())} | {:error, reasons :: [String.t()]} defp queue_chunked_ranges(queue, step, ranges) when is_integer(step) and is_list(ranges) do - reduce_chunked_ranges(ranges, abs(step), queue, &:queue.in/2) + reduce_chunked_ranges(ranges, step, queue, &:queue.in/2) end - defp reduce_chunked_ranges(ranges, size, initial, reducer) - when is_list(ranges) and is_integer(size) and size > 0 and is_function(reducer, 2) do - Enum.reduce(ranges, initial, &reduce_chunked_range(&1, size, &2, reducer)) + defp reduce_chunked_ranges(ranges, step, initial, reducer) + when is_list(ranges) and is_integer(step) and step != 0 and is_function(reducer, 2) do + Enum.reduce(ranges, {:ok, initial}, fn + range, {:ok, acc} -> + case reduce_chunked_range(range, step, acc, reducer) do + {:ok, _} = ok -> + ok + + {:error, reason} -> + {:error, [reason]} + end + + range, {:error, acc_reasons} = acc -> + case reduce_chunked_range(range, step, initial, reducer) do + {:ok, _} -> acc + {:error, reason} -> {:error, [reason | acc_reasons]} + end + end) end - defp reduce_chunked_range(_.._ = range, size, initial, reducer) do + defp reduce_chunked_range(_.._ = range, step, initial, reducer) do count = Enum.count(range) - reduce_chunked_range(range, count, size, initial, reducer) + reduce_chunked_range(range, count, step, initial, reducer) + end + + defp reduce_chunked_range(first..last = range, _count, step, _initial, _reducer) + when (step < 0 and first < last) or (0 < step and last < first) do + {:error, "Range (#{inspect(range)}) direction is opposite step (#{step}) direction"} end - defp reduce_chunked_range(_.._ = range, count, size, initial, reducer) when count <= size do - reducer.(range, initial) + defp reduce_chunked_range(_.._ = range, count, step, initial, reducer) when count <= abs(step) do + {:ok, reducer.(range, initial)} end - defp reduce_chunked_range(first..last = range, _, size, initial, reducer) do + defp reduce_chunked_range(first..last = range, _, step, initial, reducer) do {sign, comparator} = - if first < last do + if step > 0 do {1, &Kernel.>=/2} else {-1, &Kernel.<=/2} end - step = sign * size - - first - |> Stream.iterate(&(&1 + step)) - |> Enum.reduce_while(initial, fn chunk_first, acc -> - next_chunk_first = chunk_first + step - full_chunk_last = next_chunk_first - sign + final = + first + |> Stream.iterate(&(&1 + step)) + |> Enum.reduce_while(initial, fn chunk_first, acc -> + next_chunk_first = chunk_first + step + full_chunk_last = next_chunk_first - sign + + {action, chunk_last} = + if comparator.(full_chunk_last, last) do + {:halt, last} + else + {:cont, full_chunk_last} + end - {action, chunk_last} = - if comparator.(full_chunk_last, last) do - {:halt, last} - else - {:cont, full_chunk_last} - end + {action, reducer.(chunk_first..chunk_last, acc)} + end) - {action, reducer.(chunk_first..chunk_last, acc)} - end) + {:ok, final} end defp validate_options(options) do diff --git a/apps/indexer/test/indexer/sequence_test.exs b/apps/indexer/test/indexer/sequence_test.exs index 97205b9c18..ff3d9c3e1f 100644 --- a/apps/indexer/test/indexer/sequence_test.exs +++ b/apps/indexer/test/indexer/sequence_test.exs @@ -18,7 +18,8 @@ defmodule Indexer.SequenceTest do Process.sleep(5_000) end) - assert_receive {:DOWN, ^child_ref, :process, ^child_pid, ":step must be a positive integer for infinite sequences"} + assert_receive {:DOWN, ^child_ref, :process, ^child_pid, + ":step must be a positive integer for infinite sequences"} end test "without :ranges without :first returns error" do @@ -75,6 +76,36 @@ defmodule Indexer.SequenceTest do # noproc when the sequence has already died by the time monitor is called assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc] end + + test "with :ranges in direction opposite of :step returns errors for all ranges in wrong direction" do + parent = self() + + {child_pid, child_ref} = + spawn_monitor(fn -> + send( + parent, + Sequence.start_link( + ranges: [ + # ok, ok + 7..6, + # ok, error + 4..5, + # error, ok + 3..2, + # error, error + 0..1 + ], + step: -1 + ) + ) + end) + + assert_receive {:DOWN, ^child_ref, :process, ^child_pid, + [ + "Range (0..1) direction is opposite step (-1) direction", + "Range (4..5) direction is opposite step (-1) direction" + ]} + end end describe "queue/2" do @@ -92,6 +123,16 @@ defmodule Indexer.SequenceTest do assert Sequence.pop(pid) == :halt end + test "with finite mode with range in wrong direction returns error" do + {:ok, ascending} = Sequence.start_link(first: 0, step: 1) + + assert Sequence.queue(ascending, 1..0) == {:error, "Range (1..0) direction is opposite step (1) direction"} + + {:ok, descending} = Sequence.start_link(ranges: [1..0], step: -1) + + assert Sequence.queue(descending, 0..1) == {:error, "Range (0..1) direction is opposite step (-1) direction"} + end + test "with infinite mode range is chunked and is returned prior to calculated ranges" do {:ok, pid} = Sequence.start_link(first: 5, step: 1)