Missing ranges collector

pull/6583/head
Qwerty5Uiop 2 years ago
parent b1e7258cf0
commit cc67f502da
  1. 1
      CHANGELOG.md
  2. 1
      apps/explorer/lib/explorer/chain.ex
  3. 2
      apps/indexer/config/config.exs
  4. 2
      apps/indexer/config/test.exs
  5. 217
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  6. 39
      apps/indexer/lib/indexer/block/catchup/helper.ex
  7. 198
      apps/indexer/lib/indexer/block/catchup/missing_ranges_collector.ex
  8. 3
      apps/indexer/lib/indexer/block/catchup/supervisor.ex
  9. 27
      apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs
  10. 66
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs
  11. 83
      apps/indexer/test/indexer/block/catchup/missing_ranges_collector_test.exs
  12. 15
      config/runtime.exs

@ -30,6 +30,7 @@
- [#6510](https://github.com/blockscout/blockscout/pull/6510) - Set consensus: false for blocks on int transaction foreign_key_violation - [#6510](https://github.com/blockscout/blockscout/pull/6510) - Set consensus: false for blocks on int transaction foreign_key_violation
- [#6565](https://github.com/blockscout/blockscout/pull/6565) - Set restart: :permanent for permanent fetchers - [#6565](https://github.com/blockscout/blockscout/pull/6565) - Set restart: :permanent for permanent fetchers
- [#6568](https://github.com/blockscout/blockscout/pull/6568) - Drop unfetched_token_balances index - [#6568](https://github.com/blockscout/blockscout/pull/6568) - Drop unfetched_token_balances index
- [#6583](https://github.com/blockscout/blockscout/pull/6583) - Missing ranges collector
### Fixes ### Fixes

@ -3144,7 +3144,6 @@ defmodule Explorer.Chain do
WHERE NOT EXISTS WHERE NOT EXISTS
(SELECT 1 FROM blocks b2 WHERE b2.number=b1.number AND b2.consensus) (SELECT 1 FROM blocks b2 WHERE b2.number=b1.number AND b2.consensus)
ORDER BY b1.number DESC ORDER BY b1.number DESC
LIMIT 500000
) )
""", """,
^range_min, ^range_min,

@ -12,6 +12,8 @@ config :indexer, Indexer.Tracer,
adapter: SpandexDatadog.Adapter, adapter: SpandexDatadog.Adapter,
trace_key: :blockscout trace_key: :blockscout
config :indexer, Indexer.Block.Catchup.MissingRangesCollector, future_check_interval: :timer.minutes(1)
config :logger, :indexer, config :logger, :indexer,
# keep synced with `config/config.exs` # keep synced with `config/config.exs`
format: "$dateT$time $metadata[$level] $message\n", format: "$dateT$time $metadata[$level] $message\n",

@ -2,6 +2,8 @@ import Config
config :indexer, Indexer.Tracer, disabled?: false config :indexer, Indexer.Tracer, disabled?: false
config :indexer, Indexer.Block.Catchup.MissingRangesCollector, future_check_interval: 100
config :logger, :indexer, config :logger, :indexer,
level: :warn, level: :warn,
path: Path.absname("logs/test/indexer.log") path: Path.absname("logs/test/indexer.log")

@ -24,7 +24,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
alias Ecto.Changeset alias Ecto.Changeset
alias Explorer.Chain alias Explorer.Chain
alias Indexer.{Block, Tracer} alias Indexer.{Block, Tracer}
alias Indexer.Block.Catchup.Sequence alias Indexer.Block.Catchup.{MissingRangesCollector, Sequence}
alias Indexer.Memory.Shrinkable alias Indexer.Memory.Shrinkable
alias Indexer.Prometheus alias Indexer.Prometheus
@ -41,70 +41,45 @@ defmodule Indexer.Block.Catchup.Fetcher do
* `:json_rpc_named_arguments` - `t:EthereumJSONRPC.json_rpc_named_arguments/0` passed to * `:json_rpc_named_arguments` - `t:EthereumJSONRPC.json_rpc_named_arguments/0` passed to
`EthereumJSONRPC.json_rpc/2`. `EthereumJSONRPC.json_rpc/2`.
""" """
def task( def task(state) do
%__MODULE__{
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
} = state
) do
Logger.metadata(fetcher: :block_catchup) Logger.metadata(fetcher: :block_catchup)
with {:ok, ranges} <- block_ranges(json_rpc_named_arguments) do case MissingRangesCollector.get_latest_batch() do
case ranges do [] ->
# -1 means that latest block is 0, so let realtime indexer get the genesis block %{
[_..-1] -> first_block_number: nil,
%{first_block_number: 0, missing_block_count: 0, last_block_number: 0, shrunk: false} last_block_number: nil,
missing_block_count: 0,
_ -> shrunk: false
# realtime indexer gets the current latest block }
_..first = List.last(ranges)
last.._ = List.first(ranges) missing_ranges ->
first.._ = List.first(missing_ranges)
Logger.metadata(first_block_number: first, last_block_number: last) _..last = List.last(missing_ranges)
missing_ranges = Logger.metadata(first_block_number: first, last_block_number: last)
ranges
# let it fetch from newest to oldest block missing_block_count =
|> Enum.reverse() missing_ranges
|> Enum.flat_map(fn f..l -> Chain.missing_block_number_ranges(l..f) end) |> Stream.map(&Enum.count/1)
|> Enum.sum()
range_count = Enum.count(missing_ranges)
step = step(first, last, blocks_batch_size())
missing_block_count = sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state)
missing_ranges gen_server_opts = [name: @sequence_name]
|> Stream.map(&Enum.count/1) {:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts)
|> Enum.sum() Sequence.cap(sequence)
Prometheus.Instrumenter.missing_blocks(missing_block_count) stream_fetch_and_import(state, sequence)
Logger.debug(fn -> "Missed blocks in ranges." end, shrunk = Shrinkable.shrunk?(sequence)
missing_block_range_count: range_count,
missing_block_count: missing_block_count %{
) first_block_number: first,
last_block_number: last,
shrunk = missing_block_count: missing_block_count,
case missing_block_count do shrunk: shrunk
0 -> }
false
_ ->
step = step(first, last, blocks_batch_size())
sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state)
gen_server_opts = [name: @sequence_name]
{:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts)
Sequence.cap(sequence)
stream_fetch_and_import(state, sequence)
Shrinkable.shrunk?(sequence)
end
%{
first_block_number: first,
last_block_number: last,
missing_block_count: missing_block_count,
shrunk: shrunk
}
end
end end
end end
@ -129,16 +104,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
Application.get_env(:indexer, __MODULE__)[:concurrency] Application.get_env(:indexer, __MODULE__)[:concurrency]
end end
defp fetch_last_block(json_rpc_named_arguments) do
case latest_block() do
nil ->
EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments)
number ->
{:ok, number}
end
end
defp step(first, last, blocks_batch_size) do defp step(first, last, blocks_batch_size) do
if first < last, do: blocks_batch_size, else: -1 * blocks_batch_size if first < last, do: blocks_batch_size, else: -1 * blocks_batch_size
end end
@ -356,110 +321,4 @@ defmodule Indexer.Block.Catchup.Fetcher do
{:error, :queue_unavailable} {:error, :queue_unavailable}
end end
end end
@doc false
def block_ranges(json_rpc_named_arguments) do
block_ranges_string = Application.get_env(:indexer, :block_ranges)
ranges =
block_ranges_string
|> String.split(",")
|> Enum.map(fn string_range ->
case String.split(string_range, "..") do
[from_string, "latest"] ->
parse_integer(from_string)
[from_string, to_string] ->
with {from, ""} <- Integer.parse(from_string),
{to, ""} <- Integer.parse(to_string) do
if from <= to, do: from..to, else: nil
else
_ -> nil
end
_ ->
nil
end
end)
|> sanitize_ranges()
case List.last(ranges) do
_from.._to ->
{:ok, ranges}
nil ->
with {:ok, latest_block_number} <- fetch_last_block(json_rpc_named_arguments) do
{:ok, [last_block()..(latest_block_number - 1)]}
end
num ->
with {:ok, latest_block_number} <-
EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do
{:ok, List.update_at(ranges, -1, fn _ -> num..(latest_block_number - 1) end)}
end
end
end
defp sanitize_ranges(ranges) do
ranges
|> Enum.filter(&(not is_nil(&1)))
|> Enum.sort_by(
fn
from.._to -> from
el -> el
end,
:asc
)
|> Enum.chunk_while(
nil,
fn
_from.._to = chunk, nil ->
{:cont, chunk}
_ch_from..ch_to = chunk, acc_from..acc_to = acc ->
if Range.disjoint?(chunk, acc),
do: {:cont, acc, chunk},
else: {:cont, acc_from..max(ch_to, acc_to)}
num, nil ->
{:halt, num}
num, acc_from.._ = acc ->
if Range.disjoint?(num..num, acc), do: {:cont, acc, num}, else: {:halt, acc_from}
_, num ->
{:halt, num}
end,
fn reminder -> {:cont, reminder, nil} end
)
end
defp last_block do
string_value = Application.get_env(:indexer, :first_block)
case Integer.parse(string_value) do
{integer, ""} ->
integer
_ ->
min_missing_block_number =
"min_missing_block_number"
|> Chain.get_last_fetched_counter()
|> Decimal.to_integer()
min_missing_block_number
end
end
defp latest_block do
string_value = Application.get_env(:indexer, :last_block)
parse_integer(string_value)
end
defp parse_integer(integer_string) do
case Integer.parse(integer_string) do
{integer, ""} -> integer
_ -> nil
end
end
end end

@ -0,0 +1,39 @@
defmodule Indexer.Block.Catchup.Helper do
@moduledoc """
Catchup helper functions
"""
def sanitize_ranges(ranges) do
ranges
|> Enum.filter(&(not is_nil(&1)))
|> Enum.sort_by(
fn
from.._to -> from
el -> el
end,
:asc
)
|> Enum.chunk_while(
nil,
fn
_from.._to = chunk, nil ->
{:cont, chunk}
_ch_from..ch_to = chunk, acc_from..acc_to = acc ->
if Range.disjoint?(chunk, acc),
do: {:cont, acc, chunk},
else: {:cont, acc_from..max(ch_to, acc_to)}
num, nil ->
{:halt, num}
num, acc_from.._ = acc ->
if Range.disjoint?(num..num, acc), do: {:cont, acc, num}, else: {:halt, acc_from}
_, num ->
{:halt, num}
end,
fn reminder -> {:cont, reminder, nil} end
)
end
end

@ -0,0 +1,198 @@
defmodule Indexer.Block.Catchup.MissingRangesCollector do
@moduledoc """
Collects missing block ranges.
"""
use GenServer
alias Explorer.Chain
alias Explorer.Chain.Cache.BlockNumber
alias Indexer.Block.Catchup.Helper
@default_missing_ranges_batch_size 100_000
@future_check_interval Application.compile_env(:indexer, __MODULE__)[:future_check_interval]
@past_check_interval 10
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(_) do
{:ok, define_init()}
end
defp define_init do
case Application.get_env(:indexer, :block_ranges) do
nil ->
default_init()
string_ranges ->
case parse_block_ranges(string_ranges) do
:no_ranges -> default_init()
{:finite_ranges, ranges} -> ranges_init(ranges)
{:infinite_ranges, ranges, max_fetched_block_number} -> ranges_init(ranges, max_fetched_block_number)
end
end
end
defp default_init do
max_number = BlockNumber.get_max()
{min_number, first_batch} = fetch_missing_ranges_batch(max_number, false)
initial_queue = push_batch_to_queue(first_batch, :queue.new())
Process.send_after(self(), :update_future, @future_check_interval)
Process.send_after(self(), :update_past, @past_check_interval)
%{queue: initial_queue, min_fetched_block_number: min_number, max_fetched_block_number: max_number}
end
defp ranges_init(ranges, max_fetched_block_number \\ nil) do
missing_ranges =
ranges
|> Enum.reverse()
|> Enum.flat_map(fn f..l -> Chain.missing_block_number_ranges(l..f) end)
initial_queue = push_batch_to_queue(missing_ranges, :queue.new())
if not is_nil(max_fetched_block_number) do
Process.send_after(self(), :update_future, @future_check_interval)
end
%{queue: initial_queue, max_fetched_block_number: max_fetched_block_number}
end
def get_latest_batch do
GenServer.call(__MODULE__, :get_latest_batch)
end
@impl true
def handle_call(:get_latest_batch, _from, %{queue: queue} = state) do
{latest_batch, new_queue} =
case :queue.out(queue) do
{{:value, batch}, rest} -> {batch, rest}
{:empty, rest} -> {[], rest}
end
{:reply, latest_batch, %{state | queue: new_queue}}
end
@impl true
def handle_info(:update_future, %{queue: queue, max_fetched_block_number: max_number} = state) do
if continue_future_updating?(max_number) do
{new_max_number, batch} = fetch_missing_ranges_batch(max_number, true)
Process.send_after(self(), :update_future, @future_check_interval)
{:noreply, %{state | queue: push_batch_to_queue(batch, queue, true), max_fetched_block_number: new_max_number}}
else
{:noreply, state}
end
end
def handle_info(:update_past, %{queue: queue, min_fetched_block_number: min_number} = state) do
if min_number > first_block() do
{new_min_number, batch} = fetch_missing_ranges_batch(min_number, false)
Process.send_after(self(), :update_past, @past_check_interval)
{:noreply, %{state | queue: push_batch_to_queue(batch, queue), min_fetched_block_number: new_min_number}}
else
{:noreply, state}
end
end
defp fetch_missing_ranges_batch(min_fetched_block_number, false = _to_future?) do
from = min_fetched_block_number - 1
to = max(min_fetched_block_number - missing_ranges_batch_size(), first_block())
if from >= to do
{to, Chain.missing_block_number_ranges(from..to)}
else
{min_fetched_block_number, []}
end
end
defp fetch_missing_ranges_batch(max_fetched_block_number, true) do
to = max_fetched_block_number + 1
from = min(max_fetched_block_number + missing_ranges_batch_size(), BlockNumber.get_max())
if from >= to do
{from, Chain.missing_block_number_ranges(from..to)}
else
{max_fetched_block_number, []}
end
end
defp push_batch_to_queue(batch, queue, r? \\ false)
defp push_batch_to_queue([], queue, _r?), do: queue
defp push_batch_to_queue(batch, queue, false), do: :queue.in(batch, queue)
defp push_batch_to_queue(batch, queue, true), do: :queue.in_r(batch, queue)
defp first_block do
string_value = Application.get_env(:indexer, :first_block)
case Integer.parse(string_value) do
{integer, ""} ->
integer
_ ->
min_missing_block_number =
"min_missing_block_number"
|> Chain.get_last_fetched_counter()
|> Decimal.to_integer()
min_missing_block_number
end
end
defp continue_future_updating?(max_fetched_block_number) do
case Integer.parse(Application.get_env(:indexer, :last_block)) do
{last_block, ""} -> max_fetched_block_number < last_block
_ -> true
end
end
defp missing_ranges_batch_size do
Application.get_env(:indexer, :missing_ranges_batch_size) || @default_missing_ranges_batch_size
end
def parse_block_ranges(block_ranges_string) do
ranges =
block_ranges_string
|> String.split(",")
|> Enum.map(fn string_range ->
case String.split(string_range, "..") do
[from_string, "latest"] ->
parse_integer(from_string)
[from_string, to_string] ->
with {from, ""} <- Integer.parse(from_string),
{to, ""} <- Integer.parse(to_string) do
if from <= to, do: from..to, else: nil
else
_ -> nil
end
_ ->
nil
end
end)
|> Helper.sanitize_ranges()
case List.last(ranges) do
_from.._to ->
{:finite_ranges, ranges}
nil ->
:no_ranges
num ->
{:infinite_ranges, List.delete_at(ranges, -1), num - 1}
end
end
defp parse_integer(integer_string) do
case Integer.parse(integer_string) do
{integer, ""} -> integer
_ -> nil
end
end
end

@ -5,7 +5,7 @@ defmodule Indexer.Block.Catchup.Supervisor do
use Supervisor use Supervisor
alias Indexer.Block.Catchup.BoundIntervalSupervisor alias Indexer.Block.Catchup.{BoundIntervalSupervisor, MissingRangesCollector}
def child_spec([init_arguments]) do def child_spec([init_arguments]) do
child_spec([init_arguments, []]) child_spec([init_arguments, []])
@ -29,6 +29,7 @@ defmodule Indexer.Block.Catchup.Supervisor do
def init(bound_interval_supervisor_arguments) do def init(bound_interval_supervisor_arguments) do
Supervisor.init( Supervisor.init(
[ [
{MissingRangesCollector, []},
{Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor}, {Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor},
{BoundIntervalSupervisor, [bound_interval_supervisor_arguments, [name: BoundIntervalSupervisor]]} {BoundIntervalSupervisor, [bound_interval_supervisor_arguments, [name: BoundIntervalSupervisor]]}
], ],

@ -10,6 +10,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
alias Explorer.Repo alias Explorer.Repo
alias Indexer.BoundInterval alias Indexer.BoundInterval
alias Indexer.Block.Catchup alias Indexer.Block.Catchup
alias Indexer.Block.Catchup.MissingRangesCollector
alias Indexer.Fetcher.{ alias Indexer.Fetcher.{
CoinBalance, CoinBalance,
@ -30,6 +31,11 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
setup :verify_on_exit! setup :verify_on_exit!
describe "start_link/1" do describe "start_link/1" do
setup do
initial_env = Application.get_env(:indexer, :block_ranges)
on_exit(fn -> Application.put_env(:indexer, :block_ranges, initial_env) end)
end
# See https://github.com/poanetwork/blockscout/issues/597 # See https://github.com/poanetwork/blockscout/issues/597
@tag :no_geth @tag :no_geth
test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do
@ -213,6 +219,10 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
assert Repo.aggregate(Block, :count, :hash) == 0 assert Repo.aggregate(Block, :count, :hash) == 0
first_catchup_block_number = latest_block_number - 1
previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size
Application.put_env(:indexer, :block_ranges, "#{previous_batch_block_number}..#{first_catchup_block_number}")
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -236,8 +246,6 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
assert Repo.aggregate(Block, :count, :hash) >= 1 assert Repo.aggregate(Block, :count, :hash) >= 1
previous_batch_block_number = first_catchup_block_number - default_blocks_batch_size
wait_for_results(fn -> wait_for_results(fn ->
Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number)) Repo.one!(from(block in Block, where: block.number == ^previous_batch_block_number))
end) end)
@ -401,6 +409,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
describe "handle_info(:catchup_index, state)" do describe "handle_info(:catchup_index, state)" do
setup context do setup context do
initial_env = Application.get_env(:indexer, :block_ranges)
on_exit(fn -> Application.put_env(:indexer, :block_ranges, initial_env) end)
# force to use `Mox`, so we can manipulate `latest_block_number` # force to use `Mox`, so we can manipulate `latest_block_number`
put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox) put_in(context.json_rpc_named_arguments[:transport], EthereumJSONRPC.Mox)
end end
@ -414,11 +424,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
insert(:block, number: 0) insert(:block, number: 0)
insert(:block, number: 1) insert(:block, number: 1)
EthereumJSONRPC.Mox MissingRangesCollector.start_link([])
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x1"}}
end)
start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor}) start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -434,7 +440,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
%Catchup.BoundIntervalSupervisor{fetcher: %Catchup.Fetcher{}, task: %Task{pid: pid, ref: ref}} = %Catchup.BoundIntervalSupervisor{fetcher: %Catchup.Fetcher{}, task: %Task{pid: pid, ref: ref}} =
catchup_index_state} = Catchup.BoundIntervalSupervisor.handle_info(:catchup_index, state) catchup_index_state} = Catchup.BoundIntervalSupervisor.handle_info(:catchup_index, state)
assert_receive {^ref, %{first_block_number: 0, missing_block_count: 0}} = message assert_receive {^ref, %{first_block_number: nil, missing_block_count: 0}} = message
Process.sleep(100) Process.sleep(100)
@ -454,9 +460,6 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
state: state state: state
} do } do
EthereumJSONRPC.Mox EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x1"}}
end)
|> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options -> |> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByNumber", params: ["0x0", true]}], _options ->
{:ok, {:ok,
[ [
@ -512,6 +515,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
{:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]} {:ok, [%{id: id, jsonrpc: "2.0", result: "0x0"}]}
end) end)
Application.put_env(:indexer, :block_ranges, "0..0")
MissingRangesCollector.start_link([])
start_supervised({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor}) start_supervised({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)

@ -10,6 +10,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
alias Explorer.Chain.Hash alias Explorer.Chain.Hash
alias Indexer.Block alias Indexer.Block
alias Indexer.Block.Catchup.Fetcher alias Indexer.Block.Catchup.Fetcher
alias Indexer.Block.Catchup.MissingRangesCollector
alias Indexer.Fetcher.{BlockReward, CoinBalance, InternalTransaction, Token, TokenBalance, UncleBlock} alias Indexer.Fetcher.{BlockReward, CoinBalance, InternalTransaction, Token, TokenBalance, UncleBlock}
@moduletag capture_log: true @moduletag capture_log: true
@ -38,6 +39,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
MissingRangesCollector.start_link([])
parent = self() parent = self()
@ -123,14 +125,21 @@ defmodule Indexer.Block.Catchup.FetcherTest do
end end
describe "task/1" do describe "task/1" do
setup do
initial_env = Application.get_env(:indexer, :block_ranges)
on_exit(fn -> Application.put_env(:indexer, :block_ranges, initial_env) end)
end
test "ignores fetched beneficiaries with different hash for same number", %{ test "ignores fetched beneficiaries with different hash for same number", %{
json_rpc_named_arguments: json_rpc_named_arguments json_rpc_named_arguments: json_rpc_named_arguments
} do } do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10) Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
Application.put_env(:indexer, :block_ranges, "0..1")
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
MissingRangesCollector.start_link([])
latest_block_number = 2 latest_block_number = 2
latest_block_quantity = integer_to_quantity(latest_block_number) latest_block_quantity = integer_to_quantity(latest_block_number)
@ -281,10 +290,12 @@ defmodule Indexer.Block.Catchup.FetcherTest do
json_rpc_named_arguments: json_rpc_named_arguments json_rpc_named_arguments: json_rpc_named_arguments
} do } do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10) Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
Application.put_env(:indexer, :block_ranges, "0..1")
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
MissingRangesCollector.start_link([])
latest_block_number = 2 latest_block_number = 2
latest_block_quantity = integer_to_quantity(latest_block_number) latest_block_quantity = integer_to_quantity(latest_block_number)
@ -432,10 +443,12 @@ defmodule Indexer.Block.Catchup.FetcherTest do
json_rpc_named_arguments: json_rpc_named_arguments json_rpc_named_arguments: json_rpc_named_arguments
} do } do
Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10) Application.put_env(:indexer, Indexer.Block.Catchup.Fetcher, batch_size: 1, concurrency: 10)
Application.put_env(:indexer, :block_ranges, "0..1")
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
MissingRangesCollector.start_link([])
latest_block_number = 2 latest_block_number = 2
latest_block_quantity = integer_to_quantity(latest_block_number) latest_block_quantity = integer_to_quantity(latest_block_number)
@ -517,7 +530,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
jsonrpc: "2.0", jsonrpc: "2.0",
result: %{ result: %{
"hash" => to_string(block_hash_0), "hash" => to_string(block_hash_0),
"number" => block_quantity, "number" => "0x0",
"difficulty" => "0x0", "difficulty" => "0x0",
"gasLimit" => "0x0", "gasLimit" => "0x0",
"gasUsed" => "0x0", "gasUsed" => "0x0",
@ -572,57 +585,6 @@ defmodule Indexer.Block.Catchup.FetcherTest do
end end
end end
describe "block_ranges/0" do
setup do
initial_env = Application.get_all_env(:indexer)
on_exit(fn -> Application.put_all_env([{:indexer, initial_env}]) end)
end
test "ignores bad ranges", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x100"}}
end)
# doing such workaround is safe since this module is not async
Application.put_env(:indexer, :block_ranges, "1..5,3..5,2qw1..12,10..11a,,asd..qwe,10..latest")
# latest block is left for realtime_index
assert Fetcher.block_ranges(json_rpc_named_arguments) == {:ok, [1..5, 10..255]}
end
test "ignores FIRST_BLOCK/LAST_BLOCK when BLOCK_RANGES defined", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
Application.put_env(:indexer, :first_block, "1")
Application.put_env(:indexer, :last_block, "10")
Application.put_env(:indexer, :block_ranges, "2..5,10..100")
assert Fetcher.block_ranges(json_rpc_named_arguments) == {:ok, [2..5, 10..100]}
end
test "uses FIRST_BLOCK/LAST_BLOCK when BLOCK_RANGES is undefined or invalid", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
Application.put_env(:indexer, :first_block, "1")
Application.put_env(:indexer, :last_block, "10")
assert Fetcher.block_ranges(json_rpc_named_arguments) == {:ok, [1..9]}
Application.put_env(:indexer, :block_ranges, "latest..123,,fvdskvjglav!@#$%^&,2..1")
assert Fetcher.block_ranges(json_rpc_named_arguments) == {:ok, [1..9]}
end
test "all ranges are disjoint", %{json_rpc_named_arguments: json_rpc_named_arguments} do
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn %{method: "eth_getBlockByNumber", params: ["latest", false]}, _options ->
{:ok, %{"number" => "0x100"}}
end)
Application.put_env(:indexer, :block_ranges, "10..20,5..15,18..25,35..40,30..50,100..latest,150..200")
assert Fetcher.block_ranges(json_rpc_named_arguments) == {:ok, [5..25, 30..50, 100..255]}
end
end
defp count(schema) do defp count(schema) do
Repo.one!(select(schema, fragment("COUNT(*)"))) Repo.one!(select(schema, fragment("COUNT(*)")))
end end

@ -0,0 +1,83 @@
defmodule Indexer.Block.Catchup.MissingRangesCollectorTest do
use Explorer.DataCase
alias Indexer.Block.Catchup.MissingRangesCollector
test "default_init" do
insert(:block, number: 1_000_000)
insert(:block, number: 500_123)
MissingRangesCollector.start_link([])
Process.sleep(500)
assert [999_999..900_000//-1] = MissingRangesCollector.get_latest_batch()
assert [899_999..800_000//-1] = MissingRangesCollector.get_latest_batch()
assert [799_999..700_000//-1] = MissingRangesCollector.get_latest_batch()
insert(:block, number: 1_200_000)
Process.sleep(500)
assert [1_199_999..1_100_001//-1] = MissingRangesCollector.get_latest_batch()
assert [1_100_000..1_000_001//-1] = MissingRangesCollector.get_latest_batch()
assert [699_999..600_000//-1] = MissingRangesCollector.get_latest_batch()
assert [599_999..500_124//-1, 500_122..500_000//-1] = MissingRangesCollector.get_latest_batch()
end
describe "ranges_init" do
setup do
initial_env = Application.get_all_env(:indexer)
on_exit(fn -> Application.put_all_env([{:indexer, initial_env}]) end)
end
test "infinite range" do
Application.put_env(:indexer, :block_ranges, "1..5,3..5,2qw1..12,10..11a,,asd..qwe,10..latest")
insert(:block, number: 200_000)
MissingRangesCollector.start_link([])
Process.sleep(500)
assert [199_999..100_010//-1] = MissingRangesCollector.get_latest_batch()
assert [100_009..10//-1] = MissingRangesCollector.get_latest_batch()
assert [5..1//-1] = MissingRangesCollector.get_latest_batch()
end
test "finite range" do
Application.put_env(:indexer, :block_ranges, "10..20,5..15,18..25,35..40,30..50,150..200")
insert(:block, number: 200_000)
MissingRangesCollector.start_link([])
Process.sleep(500)
assert [200..150//-1, 50..30//-1, 25..5//-1] = MissingRangesCollector.get_latest_batch()
assert [] = MissingRangesCollector.get_latest_batch()
end
test "finite range with existing blocks" do
Application.put_env(:indexer, :block_ranges, "10..20,5..15,18..25,35..40,30..50,150..200")
insert(:block, number: 200_000)
insert(:block, number: 175)
insert(:block, number: 33)
MissingRangesCollector.start_link([])
Process.sleep(500)
assert [200..176//-1, 174..150//-1, 50..34//-1, 32..30//-1, 25..5//-1] = MissingRangesCollector.get_latest_batch()
assert [] = MissingRangesCollector.get_latest_batch()
end
end
test "parse_block_ranges/1" do
assert MissingRangesCollector.parse_block_ranges("1..5,3..5,2qw1..12,10..11a,,asd..qwe,10..latest") ==
{:infinite_ranges, [1..5], 9}
assert MissingRangesCollector.parse_block_ranges("latest..123,,fvdskvjglav!@#$%^&,2..1") == :no_ranges
assert MissingRangesCollector.parse_block_ranges("10..20,5..15,18..25,35..40,30..50,100..latest,150..200") ==
{:infinite_ranges, [5..25, 30..50], 99}
assert MissingRangesCollector.parse_block_ranges("10..20,5..15,18..25,35..40,30..50,150..200") ==
{:finite_ranges, [5..25, 30..50, 150..200]}
end
end

@ -410,7 +410,7 @@ config :indexer,
block_transformer: block_transformer, block_transformer: block_transformer,
metadata_updater_seconds_interval: metadata_updater_seconds_interval:
String.to_integer(System.get_env("TOKEN_METADATA_UPDATE_INTERVAL") || "#{2 * 24 * 60 * 60}"), String.to_integer(System.get_env("TOKEN_METADATA_UPDATE_INTERVAL") || "#{2 * 24 * 60 * 60}"),
block_ranges: System.get_env("BLOCK_RANGES") || "", block_ranges: System.get_env("BLOCK_RANGES"),
first_block: System.get_env("FIRST_BLOCK") || "", first_block: System.get_env("FIRST_BLOCK") || "",
last_block: System.get_env("LAST_BLOCK") || "", last_block: System.get_env("LAST_BLOCK") || "",
trace_first_block: System.get_env("TRACE_FIRST_BLOCK") || "", trace_first_block: System.get_env("TRACE_FIRST_BLOCK") || "",
@ -486,6 +486,19 @@ config :indexer, Indexer.Block.Catchup.Fetcher,
batch_size: blocks_catchup_fetcher_batch_size, batch_size: blocks_catchup_fetcher_batch_size,
concurrency: blocks_catchup_fetcher_concurrency concurrency: blocks_catchup_fetcher_concurrency
blocks_catchup_fetcher_missing_ranges_batch_size_default_str = "100000"
{blocks_catchup_fetcher_missing_ranges_batch_size, _} =
Integer.parse(
System.get_env(
"INDEXER_CATCHUP_MISSING_RANGES_BATCH_SIZE",
blocks_catchup_fetcher_missing_ranges_batch_size_default_str
)
)
config :indexer, Indexer.Block.Catchup.MissingRangesCollector,
missing_ranges_batch_size: blocks_catchup_fetcher_missing_ranges_batch_size
{internal_transaction_fetcher_batch_size, _} = {internal_transaction_fetcher_batch_size, _} =
Integer.parse(System.get_env("INDEXER_INTERNAL_TRANSACTIONS_BATCH_SIZE", "10")) Integer.parse(System.get_env("INDEXER_INTERNAL_TRANSACTIONS_BATCH_SIZE", "10"))

Loading…
Cancel
Save