diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index eee26587f7..e782062daa 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -183,13 +183,13 @@ defmodule Indexer.BlockFetcher do defp genesis_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) - missing_ranges = missing_block_number_ranges(state, latest_block_number..0) + missing_ranges = Chain.missing_block_number_ranges(latest_block_number..0) count = Enum.count(missing_ranges) debug(fn -> "#{count} missed block ranges between #{latest_block_number} and genesis" end) - {:ok, seq} = - Sequence.start_link(prefix: missing_ranges, first: latest_block_number, step: -1 * state.blocks_batch_size) + {:ok, seq} = Sequence.start_link(ranges: missing_ranges, step: -1 * state.blocks_batch_size) + Sequence.cap(seq) stream_import(state, seq, max_concurrency: state.blocks_concurrency) end @@ -214,7 +214,7 @@ defmodule Indexer.BlockFetcher do "failed to insert blocks during #{step} #{inspect(range)}: #{inspect(failed_value)}. Retrying" end) - :ok = Sequence.inject_range(seq, range) + :ok = Sequence.queue(seq, range) error end @@ -268,32 +268,6 @@ defmodule Indexer.BlockFetcher do |> InternalTransactionFetcher.async_fetch(10_000) end - defp missing_block_number_ranges(%{blocks_batch_size: blocks_batch_size}, range) do - range - |> Chain.missing_block_number_ranges() - |> chunk_ranges(blocks_batch_size) - end - - defp chunk_ranges(ranges, size) do - Enum.flat_map(ranges, fn - first..last = range when last - first <= size -> - [range] - - first..last -> - first - |> Stream.iterate(&(&1 + size)) - |> Enum.reduce_while([], fn - chunk_first, acc when chunk_first + size >= last -> - {:halt, [chunk_first..last | acc]} - - chunk_first, acc -> - chunk_last = chunk_first + size - 1 - {:cont, [chunk_first..chunk_last | acc]} - end) - |> Enum.reverse() - end) - end - defp realtime_task(%{json_rpc_named_arguments: json_rpc_named_arguments} = state) do {:ok, latest_block_number} = EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) {:ok, seq} = Sequence.start_link(first: latest_block_number, step: 2) @@ -345,7 +319,7 @@ defmodule Indexer.BlockFetcher do "failed to fetch #{step} for blocks #{first} - #{last}: #{inspect(reason)}. Retrying block range." end) - :ok = Sequence.inject_range(seq, range) + :ok = Sequence.queue(seq, range) {:error, step, reason} end diff --git a/apps/indexer/lib/indexer/sequence.ex b/apps/indexer/lib/indexer/sequence.ex index 60d3cfe5c7..3ce3773a78 100644 --- a/apps/indexer/lib/indexer/sequence.ex +++ b/apps/indexer/lib/indexer/sequence.ex @@ -6,25 +6,27 @@ defmodule Indexer.Sequence do @enforce_keys ~w(current queue step)a defstruct current: nil, queue: nil, - step: nil, - mode: :infinite + step: nil @typedoc """ - The initial ranges to stream from the `t:Stream.t/` returned from `build_stream/1` + The ranges to stream from the `t:Stream.t/` returned from `build_stream/1` """ - @type prefix :: [Range.t()] + @type ranges :: [Range.t()] - @typep prefix_option :: {:prefix, prefix} + @typep ranges_option :: {:ranges, ranges} @typedoc """ - The first number in the sequence to start at once the `t:prefix/0` ranges and any `t:Range.t/0`s injected with - `inject_range/2` are all consumed. + The first number in the sequence to start for infinite sequences. """ - @type first :: pos_integer() + @type first :: integer() - @typep first_named_argument :: {:first, pos_integer()} + @typep first_option :: {:first, first} - @type mode :: :infinite | :finite + @typedoc """ + * `:finite` - only popping ranges from `queue`. + * `:infinite` - generating new ranges from `current` and `step` when `queue` is empty. + """ + @type mode :: :finite | :infinite @typedoc """ The size of `t:Range.t/0` to construct based on the `t:first_named_argument/0` or its current value when all @@ -34,17 +36,25 @@ defmodule Indexer.Sequence do @typep step_named_argument :: {:step, step} - @type options :: [prefix_option | first_named_argument | step_named_argument] + @type options :: [ranges_option | first_option | step_named_argument] @typep t :: %__MODULE__{ - current: pos_integer(), queue: :queue.queue(Range.t()), - step: step(), - mode: mode() + current: nil | integer(), + step: step() } @doc """ Starts a process for managing a block sequence. + + Infinite sequence + + Indexer.Sequence.start_link(first: 100, step: 10) + + Finite sequence + + Indexer.Sequence.start_link(ranges: [100..0]) + """ @spec start_link(options) :: GenServer.on_start() def start_link(options) when is_list(options) do @@ -69,9 +79,7 @@ defmodule Indexer.Sequence do end @doc """ - Changes the mode for the sequencer to signal continuous streaming mode. - - Returns the previous `t:mode/0`. + Changes the mode for the sequence to finite. """ @spec cap(pid()) :: mode def cap(sequence) when is_pid(sequence) do @@ -79,11 +87,11 @@ defmodule Indexer.Sequence do end @doc """ - Adds a range of block numbers to the sequence. + Adds a range of block numbers to the end of sequence. """ - @spec inject_range(pid(), Range.t()) :: :ok - def inject_range(sequence, _first.._last = range) when is_pid(sequence) do - GenServer.call(sequence, {:inject_range, range}) + @spec queue(pid(), Range.t()) :: :ok + def queue(sequence, _first.._last = range) when is_pid(sequence) do + GenServer.call(sequence, {:queue, range}) end @doc """ @@ -96,59 +104,164 @@ defmodule Indexer.Sequence do @impl GenServer @spec init(options) :: {:ok, t} - def init(named_arguments) when is_list(named_arguments) do + def init(options) when is_list(options) do Process.flag(:trap_exit, true) - {:ok, - %__MODULE__{ - queue: - named_arguments - |> Keyword.get(:prefix, []) - |> :queue.from_list(), - current: Keyword.fetch!(named_arguments, :first), - step: Keyword.fetch!(named_arguments, :step) - }} + with {:ok, %{ranges: ranges, first: first, step: step}} <- validate_options(options), + initial_queue = :queue.new(), + {:ok, queue} <- queue_chunked_ranges(initial_queue, step, ranges) do + {:ok, %__MODULE__{queue: queue, current: first, step: step}} + else + {:error, reason} -> + {:stop, reason} + end end @impl GenServer - @spec handle_call(:cap, GenServer.from(), t()) :: {:reply, mode(), %__MODULE__{mode: :infinite}} - def handle_call(:cap, _from, %__MODULE__{mode: mode} = state) do - {:reply, mode, %__MODULE__{state | mode: :finite}} + @spec handle_call(:cap, GenServer.from(), %__MODULE__{current: nil}) :: {:reply, :finite, %__MODULE__{current: nil}} + @spec handle_call(:cap, GenServer.from(), %__MODULE__{current: integer()}) :: + {:reply, :infinite, %__MODULE__{current: nil}} + def handle_call(:cap, _from, %__MODULE__{current: current} = state) do + mode = + case current do + nil -> :finite + _ -> :infinite + end + + {:reply, mode, %__MODULE__{state | current: nil}} end - @spec handle_call({:inject_range, Range.t()}, GenServer.from(), t()) :: {:reply, mode(), t()} - def handle_call({:inject_range, _first.._last = range}, _from, %__MODULE__{queue: queue} = state) do - {:reply, :ok, %__MODULE__{state | queue: :queue.in(range, queue)}} + @spec handle_call({:queue, Range.t()}, GenServer.from(), t()) :: {:reply, :ok | {:error, String.t()}, t()} + def handle_call({:queue, _first.._last = range}, _from, %__MODULE__{queue: queue, step: step} = state) do + case queue_chunked_range(queue, step, range) do + {:ok, updated_queue} -> + {:reply, :ok, %__MODULE__{state | queue: updated_queue}} + + {:error, _} = error -> + {:reply, error, state} + end end @spec handle_call(:pop, GenServer.from(), t()) :: {:reply, Range.t() | :halt, t()} - def handle_call(:pop, _from, %__MODULE__{mode: mode, queue: queue, current: current, step: step} = state) do + def handle_call(:pop, _from, %__MODULE__{queue: queue, current: current, step: step} = state) do {reply, new_state} = - case {mode, :queue.out(queue)} do + case {current, :queue.out(queue)} do {_, {{:value, range}, new_queue}} -> {range, %__MODULE__{state | queue: new_queue}} - {:infinite, {:empty, new_queue}} -> - case current + step do - negative when negative < 0 -> - {current..0, %__MODULE__{state | current: 0, mode: :finite, queue: new_queue}} + {nil, {:empty, new_queue}} -> + {:halt, %__MODULE__{state | queue: new_queue}} + {_, {:empty, new_queue}} -> + case current + step do new_current -> - last = new_current - sign(step) + last = new_current - 1 {current..last, %__MODULE__{state | current: new_current, queue: new_queue}} end - - {:finite, {:empty, new_queue}} -> - {:halt, %__MODULE__{state | queue: new_queue}} end {:reply, reply, new_state} end - @spec sign(neg_integer()) :: -1 - defp sign(integer) when integer < 0, do: -1 + @spec queue_chunked_range(:queue.queue(Range.t()), step, Range.t()) :: + {:ok, :queue.queue(Range.t())} | {:error, reason :: String.t()} + defp queue_chunked_range(queue, step, _.._ = range) when is_integer(step) do + with {:error, [reason]} <- queue_chunked_ranges(queue, step, [range]) do + {:error, reason} + end + end + + @spec queue_chunked_range(:queue.queue(Range.t()), step, [Range.t()]) :: + {:ok, :queue.queue(Range.t())} | {:error, reasons :: [String.t()]} + defp queue_chunked_ranges(queue, step, ranges) when is_integer(step) and is_list(ranges) do + reduce_chunked_ranges(ranges, step, queue, &:queue.in/2) + end + + defp reduce_chunked_ranges(ranges, step, initial, reducer) + when is_list(ranges) and is_integer(step) and step != 0 and is_function(reducer, 2) do + Enum.reduce(ranges, {:ok, initial}, fn + range, {:ok, acc} -> + case reduce_chunked_range(range, step, acc, reducer) do + {:ok, _} = ok -> + ok + + {:error, reason} -> + {:error, [reason]} + end + + range, {:error, acc_reasons} = acc -> + case reduce_chunked_range(range, step, initial, reducer) do + {:ok, _} -> acc + {:error, reason} -> {:error, [reason | acc_reasons]} + end + end) + end + + defp reduce_chunked_range(_.._ = range, step, initial, reducer) do + count = Enum.count(range) + reduce_chunked_range(range, count, step, initial, reducer) + end + + defp reduce_chunked_range(first..last = range, _count, step, _initial, _reducer) + when (step < 0 and first < last) or (0 < step and last < first) do + {:error, "Range (#{inspect(range)}) direction is opposite step (#{step}) direction"} + end + + defp reduce_chunked_range(_.._ = range, count, step, initial, reducer) when count <= abs(step) do + {:ok, reducer.(range, initial)} + end + + defp reduce_chunked_range(first..last = range, _, step, initial, reducer) do + {sign, comparator} = + if step > 0 do + {1, &Kernel.>=/2} + else + {-1, &Kernel.<=/2} + end + + final = + first + |> Stream.iterate(&(&1 + step)) + |> Enum.reduce_while(initial, fn chunk_first, acc -> + next_chunk_first = chunk_first + step + full_chunk_last = next_chunk_first - sign - @spec sign(non_neg_integer()) :: 1 - defp sign(_), do: 1 + {action, chunk_last} = + if comparator.(full_chunk_last, last) do + {:halt, last} + else + {:cont, full_chunk_last} + end + + {action, reducer.(chunk_first..chunk_last, acc)} + end) + + {:ok, final} + end + + defp validate_options(options) do + step = Keyword.fetch!(options, :step) + + case {Keyword.fetch(options, :ranges), Keyword.fetch(options, :first)} do + {:error, {:ok, first}} -> + case step do + pos_integer when is_integer(pos_integer) and pos_integer > 0 -> + {:ok, %{ranges: [], first: first, step: step}} + + _ -> + {:error, ":step must be a positive integer for infinite sequences"} + end + + {{:ok, ranges}, :error} -> + {:ok, %{ranges: ranges, first: nil, step: step}} + + {{:ok, _}, {:ok, _}} -> + {:error, + ":ranges and :first cannot be set at the same time as :ranges is for :finite mode while :first is for :infinite mode"} + + {:error, :error} -> + {:error, "either :ranges or :first must be set"} + end + end end diff --git a/apps/indexer/test/indexer/sequence_test.exs b/apps/indexer/test/indexer/sequence_test.exs index 27ca393c5c..ff3d9c3e1f 100644 --- a/apps/indexer/test/indexer/sequence_test.exs +++ b/apps/indexer/test/indexer/sequence_test.exs @@ -4,15 +4,62 @@ defmodule Indexer.SequenceTest do alias Indexer.Sequence describe "start_link/1" do - test "sets state" do - {:ok, pid} = Sequence.start_link(prefix: [1..4], first: 5, step: 1) + test "without :ranges with :first with positive step pops infinitely" do + {:ok, ascending} = Sequence.start_link(first: 5, step: 1) - assert state(pid) == %Sequence{ - current: 5, - mode: :infinite, - queue: {[1..4], []}, - step: 1 - } + assert Sequence.pop(ascending) == 5..5 + assert Sequence.pop(ascending) == 6..6 + end + + test "without :ranges with :first with negative :step is error" do + {child_pid, child_ref} = + spawn_monitor(fn -> + Sequence.start_link(first: 1, step: -1) + Process.sleep(5_000) + end) + + assert_receive {:DOWN, ^child_ref, :process, ^child_pid, + ":step must be a positive integer for infinite sequences"} + end + + test "without :ranges without :first returns error" do + {child_pid, child_ref} = + spawn_monitor(fn -> + Sequence.start_link(step: -1) + Process.sleep(5_000) + end) + + assert_receive {:DOWN, ^child_ref, :process, ^child_pid, "either :ranges or :first must be set"} + end + + test "with ranges without :first" do + {:ok, pid} = Sequence.start_link(ranges: [1..4], step: 1) + + assert Sequence.pop(pid) == 1..1 + assert Sequence.pop(pid) == 2..2 + assert Sequence.pop(pid) == 3..3 + assert Sequence.pop(pid) == 4..4 + assert Sequence.pop(pid) == :halt + end + + test "with :ranges with :first returns error" do + {child_pid, child_ref} = + spawn_monitor(fn -> + Sequence.start_link(ranges: [1..0], first: 1, step: -1) + Process.sleep(5_000) + end) + + assert_receive {:DOWN, ^child_ref, :process, ^child_pid, + ":ranges and :first cannot be set at the same time" <> + " as :ranges is for :finite mode while :first is for :infinite mode"} + end + + test "with 0 first with negative step does not return 0 twice" do + {:ok, pid} = Sequence.start_link(ranges: [1..0], step: -1) + + assert Sequence.pop(pid) == 1..1 + assert Sequence.pop(pid) == 0..0 + assert Sequence.pop(pid) == :halt end # Regression test for https://github.com/poanetwork/poa-explorer/issues/387 @@ -29,96 +76,122 @@ defmodule Indexer.SequenceTest do # noproc when the sequence has already died by the time monitor is called assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc] end + + test "with :ranges in direction opposite of :step returns errors for all ranges in wrong direction" do + parent = self() + + {child_pid, child_ref} = + spawn_monitor(fn -> + send( + parent, + Sequence.start_link( + ranges: [ + # ok, ok + 7..6, + # ok, error + 4..5, + # error, ok + 3..2, + # error, error + 0..1 + ], + step: -1 + ) + ) + end) + + assert_receive {:DOWN, ^child_ref, :process, ^child_pid, + [ + "Range (0..1) direction is opposite step (-1) direction", + "Range (4..5) direction is opposite step (-1) direction" + ]} + end end - test "inject_range" do - {:ok, pid} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) + describe "queue/2" do + test "with finite mode range is chunked" do + {:ok, pid} = Sequence.start_link(ranges: [1..0], step: -1) - assert :ok = Sequence.inject_range(pid, 3..4) + assert Sequence.pop(pid) == 1..1 + assert Sequence.pop(pid) == 0..0 - assert state(pid) == %Sequence{ - current: 5, - mode: :infinite, - queue: {[3..4], [1..2]}, - step: 1 - } - end + assert Sequence.queue(pid, 1..0) == :ok - test "cap" do - {:ok, pid} = Sequence.start_link(prefix: [1..2], first: 5, step: 1) + assert Sequence.pop(pid) == 1..1 + assert Sequence.pop(pid) == 0..0 + assert Sequence.pop(pid) == :halt + assert Sequence.pop(pid) == :halt + end - assert :infinite = Sequence.cap(pid) - assert state(pid).mode == :finite - assert :finite = Sequence.cap(pid) - end + test "with finite mode with range in wrong direction returns error" do + {:ok, ascending} = Sequence.start_link(first: 0, step: 1) - describe "pop" do - test "with a non-empty queue in finite and infinite modes" do - {:ok, pid} = Sequence.start_link(prefix: [1..4, 6..9], first: 99, step: 5) + assert Sequence.queue(ascending, 1..0) == {:error, "Range (1..0) direction is opposite step (1) direction"} - assert 1..4 == Sequence.pop(pid) + {:ok, descending} = Sequence.start_link(ranges: [1..0], step: -1) - assert state(pid) == %Sequence{ - current: 99, - mode: :infinite, - queue: {[], [6..9]}, - step: 5 - } + assert Sequence.queue(descending, 0..1) == {:error, "Range (0..1) direction is opposite step (-1) direction"} + end - :infinite = Sequence.cap(pid) + test "with infinite mode range is chunked and is returned prior to calculated ranges" do + {:ok, pid} = Sequence.start_link(first: 5, step: 1) + + assert :ok = Sequence.queue(pid, 3..4) - assert 6..9 == Sequence.pop(pid) + assert Sequence.pop(pid) == 3..3 + assert Sequence.pop(pid) == 4..4 + # infinite sequence takes over + assert Sequence.pop(pid) == 5..5 + assert Sequence.pop(pid) == 6..6 + end + end + + describe "cap/1" do + test "returns previous mode" do + {:ok, pid} = Sequence.start_link(first: 5, step: 1) - assert state(pid) == %Sequence{ - current: 99, - mode: :finite, - queue: {[], []}, - step: 5 - } + assert Sequence.cap(pid) == :infinite + assert Sequence.cap(pid) == :finite end - test "with an empty queue in infinite mode" do - {:ok, pid} = Sequence.start_link(first: 5, step: 5) + test "disables infinite mode that uses first and step" do + {:ok, late_capped} = Sequence.start_link(first: 5, step: 1) - assert 5..9 == Sequence.pop(pid) + assert Sequence.pop(late_capped) == 5..5 + assert Sequence.pop(late_capped) == 6..6 + assert Sequence.queue(late_capped, 5..5) == :ok + assert Sequence.cap(late_capped) == :infinite + assert Sequence.pop(late_capped) == 5..5 + assert Sequence.pop(late_capped) == :halt - assert state(pid) == %Sequence{ - current: 10, - mode: :infinite, - queue: {[], []}, - step: 5 - } + {:ok, immediately_capped} = Sequence.start_link(first: 5, step: 1) + + assert Sequence.cap(immediately_capped) == :infinite + assert Sequence.pop(immediately_capped) == :halt end + end - test "with an empty queue in infinit mode with negative step" do - {:ok, pid} = Sequence.start_link(first: 4, step: -5) + describe "pop" do + test "with a non-empty queue in finite mode" do + {:ok, pid} = Sequence.start_link(ranges: [1..4, 6..9], step: 5) + + assert Sequence.pop(pid) == 1..4 + assert Sequence.pop(pid) == 6..9 + assert Sequence.pop(pid) == :halt + assert Sequence.pop(pid) == :halt + end - assert 4..0 == Sequence.pop(pid) + test "with an empty queue in infinite mode returns range from next step from current" do + {:ok, pid} = Sequence.start_link(first: 5, step: 5) - assert state(pid) == %Sequence{ - current: 0, - mode: :finite, - queue: {[], []}, - step: -5 - } + assert 5..9 == Sequence.pop(pid) end - test "with an empty queue in finite mode" do + test "with an empty queue in finite mode halts immediately" do {:ok, pid} = Sequence.start_link(first: 5, step: 5) :infinite = Sequence.cap(pid) - assert :halt == Sequence.pop(pid) - - assert state(pid) == %Sequence{ - current: 5, - mode: :finite, - queue: {[], []}, - step: 5 - } + assert Sequence.pop(pid) == :halt end end - - defp state(sequence) do - :sys.get_state(sequence) - end end