Merge pull request #7493 from blockscout/fix-missing-ranges-manipulator

Fix MissingRangesManipulator
pull/7506/head
Victor Baranov 1 year ago committed by GitHub
commit ae0847e61e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CHANGELOG.md
  2. 3
      apps/explorer/lib/explorer/application.ex
  3. 4
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  4. 4
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex
  5. 4
      apps/explorer/lib/explorer/chain/import/runner/transactions.ex
  6. 113
      apps/explorer/lib/explorer/utility/missing_block_range.ex
  7. 61
      apps/explorer/lib/explorer/utility/missing_ranges_manipulator.ex
  8. 2
      apps/explorer/test/explorer/chain/import/runner/blocks_test.exs
  9. 7
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  10. 12
      apps/indexer/lib/indexer/block/catchup/missing_ranges_collector.ex
  11. 50
      apps/indexer/lib/indexer/block/catchup/missing_ranges_manipulator.ex
  12. 3
      apps/indexer/lib/indexer/block/catchup/supervisor.ex
  13. 4
      apps/indexer/lib/indexer/temporary/blocks_transactions_mismatch.ex
  14. 6
      apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs
  15. 3
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs

@ -11,7 +11,7 @@
- [#7355](https://github.com/blockscout/blockscout/pull/7355) - Add endpoint for token info import
- [#7393](https://github.com/blockscout/blockscout/pull/7393) - Realtime fetcher max gap
- [#7436](https://github.com/blockscout/blockscout/pull/7436) - TokenBalanceOnDemand ERC-1155 support
- [#7469](https://github.com/blockscout/blockscout/pull/7469), [#7485](https://github.com/blockscout/blockscout/pull/7485) - Clear missing block ranges after every success import
- [#7469](https://github.com/blockscout/blockscout/pull/7469), [#7485](https://github.com/blockscout/blockscout/pull/7485), [#7493](https://github.com/blockscout/blockscout/pull/7493) - Clear missing block ranges after every success import
- [#7489](https://github.com/blockscout/blockscout/pull/7489) - INDEXER_CATCHUP_BLOCK_INTERVAL env var
### Fixes

@ -70,7 +70,8 @@ defmodule Explorer.Application do
TransactionsApiV2,
Accounts,
Uncles,
{Redix, redix_opts()}
{Redix, redix_opts()},
{Explorer.Utility.MissingRangesManipulator, []}
]
children = base_children ++ configurable_children()

@ -15,7 +15,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
alias Explorer.Chain.Import.Runner.Tokens
alias Explorer.Prometheus.Instrumenter
alias Explorer.Repo, as: ExplorerRepo
alias Explorer.Utility.MissingBlockRange
alias Explorer.Utility.MissingRangesManipulator
@behaviour Runner
@ -395,7 +395,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
removed_consensus_block_hashes
|> Enum.map(fn {number, _hash} -> number end)
|> MissingBlockRange.add_ranges_by_block_numbers()
|> MissingRangesManipulator.add_ranges_by_block_numbers()
{:ok, removed_consensus_block_hashes}
rescue

@ -13,7 +13,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
alias Explorer.Chain.Import.Runner
alias Explorer.Prometheus.Instrumenter
alias Explorer.Repo, as: ExplorerRepo
alias Explorer.Utility.MissingBlockRange
alias Explorer.Utility.MissingRangesManipulator
import Ecto.Query, only: [from: 2]
@ -682,7 +682,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
try do
{_num, result} = repo.update_all(update_query, [])
MissingBlockRange.add_ranges_by_block_numbers(invalid_block_numbers)
MissingRangesManipulator.add_ranges_by_block_numbers(invalid_block_numbers)
Logger.debug(fn ->
[

@ -11,7 +11,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
alias Explorer.Chain.{Block, Hash, Import, Transaction}
alias Explorer.Chain.Import.Runner.TokenTransfers
alias Explorer.Prometheus.Instrumenter
alias Explorer.Utility.MissingBlockRange
alias Explorer.Utility.MissingRangesManipulator
@behaviour Import.Runner
@ -226,7 +226,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
timeout: timeout
)
MissingBlockRange.add_ranges_by_block_numbers(result)
MissingRangesManipulator.add_ranges_by_block_numbers(result)
{:ok, result}
rescue

@ -33,10 +33,38 @@ defmodule Explorer.Utility.MissingBlockRange do
def add_ranges_by_block_numbers(numbers) do
numbers
|> Enum.map(fn number -> number..number end)
|> numbers_to_ranges()
|> save_batch()
end
def save_range(from..to) do
min_number = min(from, to)
max_number = max(from, to)
lower_range = get_range_by_block_number(min_number)
higher_range = get_range_by_block_number(max_number)
case {lower_range, higher_range} do
{%__MODULE__{} = same_range, %__MODULE__{} = same_range} ->
:ok
{%__MODULE__{} = range, nil} ->
delete_ranges_between(max_number, range.from_number)
update_range(range, %{from_number: max_number})
{nil, %__MODULE__{} = range} ->
delete_ranges_between(range.to_number, min_number)
update_range(range, %{to_number: min_number})
{%__MODULE__{} = range_1, %__MODULE__{} = range_2} ->
delete_ranges_between(range_2.from_number, range_1.from_number)
update_range(range_1, %{from_number: range_2.from_number})
_ ->
insert_range(%{from_number: max_number, to_number: min_number})
end
end
def delete_range(from..to) do
min_number = min(from, to)
max_number = max(from, to)
@ -51,9 +79,11 @@ defmodule Explorer.Utility.MissingBlockRange do
insert_if_needed(%{from_number: min_number - 1, to_number: same_range.to_number})
{%__MODULE__{} = range, nil} ->
delete_ranges_between(max_number, range.from_number)
update_from_number_or_delete_range(range, min_number - 1)
{nil, %__MODULE__{} = range} ->
delete_ranges_between(range.to_number, min_number)
update_to_number_or_delete_range(range, max_number + 1)
{%__MODULE__{} = range_1, %__MODULE__{} = range_2} ->
@ -70,15 +100,10 @@ defmodule Explorer.Utility.MissingBlockRange do
Enum.map(batch, &delete_range/1)
end
def save_batch([]), do: {0, nil}
def save_batch(batch) do
records =
batch
|> List.wrap()
|> Enum.map(fn from..to -> %{from_number: from, to_number: to} end)
Repo.insert_all(__MODULE__, records, on_conflict: :nothing, conflict_target: [:from_number, :to_number])
batch
|> List.wrap()
|> Enum.map(&save_range/1)
end
defp insert_range(params) do
@ -121,28 +146,39 @@ defmodule Explorer.Utility.MissingBlockRange do
|> update([r], set: [from_number: r.to_number, to_number: r.from_number])
|> Repo.update_all([])
__MODULE__
|> join(:inner, [r], r1 in __MODULE__,
on:
((r1.from_number <= r.from_number and r1.from_number >= r.to_number) or
(r1.to_number <= r.from_number and r1.to_number >= r.to_number)) and r1.id != r.id
)
|> select([r, r1], [r, r1])
|> Repo.all()
|> Enum.map(&Enum.sort/1)
|> Enum.uniq()
|> Enum.map(fn [range_1, range_2] ->
Repo.delete(range_2)
range_1
|> changeset(%{
from_number: max(range_1.from_number, range_2.from_number),
to_number: min(range_1.to_number, range_2.to_number)
})
|> Repo.update()
{last_range, merged_ranges} = delete_and_merge_ranges()
save_batch((last_range && [last_range | merged_ranges]) || [])
end
defp delete_and_merge_ranges do
delete_intersecting_ranges()
|> Enum.sort_by(& &1.from_number, &>=/2)
|> Enum.reduce({nil, []}, fn %{from_number: from, to_number: to}, {last_range, result} ->
cond do
is_nil(last_range) -> {from..to, result}
Range.disjoint?(from..to, last_range) -> {from..to, [last_range | result]}
true -> {Range.new(max(from, last_range.first), min(to, last_range.last)), result}
end
end)
end
defp delete_intersecting_ranges do
{_, intersecting_ranges} =
__MODULE__
|> join(:inner, [r], r1 in __MODULE__,
on:
((r1.from_number <= r.from_number and r1.from_number >= r.to_number) or
(r1.to_number <= r.from_number and r1.to_number >= r.to_number) or
(r.from_number <= r1.from_number and r.from_number >= r1.to_number) or
(r.to_number <= r1.from_number and r.to_number >= r1.to_number)) and r1.id != r.id
)
|> select([r, r1], r)
|> Repo.delete_all()
intersecting_ranges
end
def min_max_block_query do
from(r in __MODULE__, select: %{min: min(r.to_number), max: max(r.from_number)})
end
@ -162,4 +198,25 @@ defmodule Explorer.Utility.MissingBlockRange do
def include_bound_query(bound) do
from(r in __MODULE__, where: r.from_number >= ^bound, where: r.to_number <= ^bound)
end
defp numbers_to_ranges([]), do: []
defp numbers_to_ranges(numbers) when is_list(numbers) do
numbers
|> Enum.sort()
|> Enum.chunk_while(
nil,
fn
number, nil ->
{:cont, number..number}
number, first..last when number == last + 1 ->
{:cont, first..number}
number, range ->
{:cont, range, number..number}
end,
fn range -> {:cont, range, nil} end
)
end
end

@ -0,0 +1,61 @@
defmodule Explorer.Utility.MissingRangesManipulator do
@moduledoc """
Performs concurrent-safe actions on missing block ranges.
"""
use GenServer
alias Explorer.Utility.MissingBlockRange
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def get_latest_batch do
GenServer.call(__MODULE__, :get_latest_batch)
end
def clear_batch(batch) do
GenServer.call(__MODULE__, {:clear_batch, batch}, timeout(batch))
end
def save_batch(batch) do
GenServer.call(__MODULE__, {:save_batch, batch}, timeout(batch))
end
def add_ranges_by_block_numbers(numbers) do
GenServer.cast(__MODULE__, {:add_ranges_by_block_numbers, numbers})
end
@impl true
def init(_) do
{:ok, %{}}
end
@impl true
def handle_call(:get_latest_batch, _from, state) do
{:reply, MissingBlockRange.get_latest_batch(), state}
end
def handle_call({:clear_batch, batch}, _from, state) do
{:reply, MissingBlockRange.clear_batch(batch), state}
end
def handle_call({:save_batch, batch}, _from, state) do
{:reply, MissingBlockRange.save_batch(batch), state}
end
@impl true
def handle_cast({:add_ranges_by_block_numbers, numbers}, state) do
MissingBlockRange.add_ranges_by_block_numbers(numbers)
{:noreply, state}
end
@default_timeout 5000
@timeout_by_range 2000
defp timeout(batch) do
@default_timeout + @timeout_by_range * Enum.count(batch)
end
end

@ -341,6 +341,8 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
insert_block(block, options)
insert_block(block2, options)
Process.sleep(100)
assert %{from_number: ^block_number, to_number: ^block_number} = Repo.one(MissingBlockRange)
end

@ -23,8 +23,9 @@ defmodule Indexer.Block.Catchup.Fetcher do
alias Ecto.Changeset
alias Explorer.Chain
alias Explorer.Utility.MissingRangesManipulator
alias Indexer.{Block, Tracer}
alias Indexer.Block.Catchup.{MissingRangesManipulator, Sequence, TaskSupervisor}
alias Indexer.Block.Catchup.{Sequence, TaskSupervisor}
alias Indexer.Memory.Shrinkable
alias Indexer.Prometheus
@ -75,8 +76,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
shrunk = Shrinkable.shrunk?(sequence)
MissingRangesManipulator.clear_batch(missing_ranges)
%{
first_block_number: first,
last_block_number: last,
@ -275,7 +274,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
success_numbers
|> numbers_to_ranges()
|> Enum.map(&MissingRangesManipulator.delete_range/1)
|> MissingRangesManipulator.clear_batch()
end
defp block_errors_to_block_number_ranges(block_errors) when is_list(block_errors) do

@ -8,7 +8,7 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
alias Explorer.{Chain, Repo}
alias Explorer.Chain.Cache.BlockNumber
alias Explorer.Helper, as: ExplorerHelper
alias Explorer.Utility.MissingBlockRange
alias Explorer.Utility.{MissingBlockRange, MissingRangesManipulator}
alias Indexer.Block.Catchup.Helper
@default_missing_ranges_batch_size 100_000
@ -40,6 +40,8 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
end
defp default_init do
MissingBlockRange.sanitize_missing_block_ranges()
{min_number, max_number} = get_initial_min_max()
clear_to_bounds(min_number, max_number)
@ -56,7 +58,7 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
ranges
|> Enum.reverse()
|> Enum.flat_map(fn f..l -> Chain.missing_block_number_ranges(l..f) end)
|> MissingBlockRange.save_batch()
|> MissingRangesManipulator.save_batch()
if not is_nil(max_fetched_block_number) do
Process.send_after(self(), :update_future, @future_check_interval)
@ -113,7 +115,7 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
%{min: nil, max: nil} ->
max_number = last_block()
{min_number, first_batch} = fetch_missing_ranges_batch(max_number, false)
MissingBlockRange.save_batch(first_batch)
MissingRangesManipulator.save_batch(first_batch)
{min_number, max_number}
%{min: min, max: max} ->
@ -125,7 +127,7 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
def handle_info(:update_future, %{max_fetched_block_number: max_number} = state) do
if continue_future_updating?(max_number) do
{new_max_number, batch} = fetch_missing_ranges_batch(max_number, true)
MissingBlockRange.save_batch(batch)
MissingRangesManipulator.save_batch(batch)
Process.send_after(self(), :update_future, @future_check_interval)
{:noreply, %{state | max_fetched_block_number: new_max_number}}
else
@ -137,7 +139,7 @@ defmodule Indexer.Block.Catchup.MissingRangesCollector do
if min_number > first_block() do
{new_min_number, batch} = fetch_missing_ranges_batch(min_number, false)
Process.send_after(self(), :update_past, @past_check_interval)
MissingBlockRange.save_batch(batch)
MissingRangesManipulator.save_batch(batch)
{:noreply, %{state | min_fetched_block_number: new_min_number}}
else
Process.send_after(self(), :update_past, @past_check_interval * 100)

@ -1,50 +0,0 @@
defmodule Indexer.Block.Catchup.MissingRangesManipulator do
@moduledoc """
Performs concurrent-safe actions on missing block ranges.
"""
use GenServer
alias Explorer.Utility.MissingBlockRange
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def get_latest_batch do
GenServer.call(__MODULE__, :get_latest_batch)
end
def delete_range(range) do
GenServer.cast(__MODULE__, {:delete_range, range})
end
def clear_batch(batch) do
GenServer.cast(__MODULE__, {:clear_batch, batch})
end
@impl true
def init(_) do
MissingBlockRange.sanitize_missing_block_ranges()
{:ok, %{}}
end
@impl true
def handle_call(:get_latest_batch, _from, state) do
{:reply, MissingBlockRange.get_latest_batch(), state}
end
@impl true
def handle_cast({:delete_range, range}, state) do
MissingBlockRange.delete_range(range)
{:noreply, state}
end
def handle_cast({:clear_batch, batch}, state) do
MissingBlockRange.clear_batch(batch)
{:noreply, state}
end
end

@ -5,7 +5,7 @@ defmodule Indexer.Block.Catchup.Supervisor do
use Supervisor
alias Indexer.Block.Catchup.{BoundIntervalSupervisor, MissingRangesCollector, MissingRangesManipulator}
alias Indexer.Block.Catchup.{BoundIntervalSupervisor, MissingRangesCollector}
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
@ -29,7 +29,6 @@ defmodule Indexer.Block.Catchup.Supervisor do
def init(bound_interval_supervisor_arguments) do
Supervisor.init(
[
{MissingRangesManipulator, []},
{MissingRangesCollector, []},
{Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor},
{BoundIntervalSupervisor, [bound_interval_supervisor_arguments, [name: BoundIntervalSupervisor]]}

@ -16,7 +16,7 @@ defmodule Indexer.Temporary.BlocksTransactionsMismatch do
alias EthereumJSONRPC.Blocks
alias Explorer.Chain.Block
alias Explorer.Repo
alias Explorer.Utility.MissingBlockRange
alias Explorer.Utility.MissingRangesManipulator
alias Indexer.BufferedTask
@behaviour BufferedTask
@ -117,7 +117,7 @@ defmodule Indexer.Temporary.BlocksTransactionsMismatch do
|> Enum.map(fn {hash, _trans_num} -> hash end)
|> update_in_order(refetch_needed: false, consensus: false)
MissingBlockRange.add_ranges_by_block_numbers(updated_numbers)
MissingRangesManipulator.add_ranges_by_block_numbers(updated_numbers)
end
if Enum.empty?(missing_blocks_data) do

@ -8,9 +8,10 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
alias Explorer.Chain.Block
alias Explorer.Repo
alias Explorer.Utility.MissingRangesManipulator
alias Indexer.BoundInterval
alias Indexer.Block.Catchup
alias Indexer.Block.Catchup.{MissingRangesCollector, MissingRangesManipulator}
alias Indexer.Block.Catchup.MissingRangesCollector
alias Indexer.Fetcher.{
CoinBalance,
@ -425,7 +426,6 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
insert(:block, number: 1)
MissingRangesCollector.start_link([])
MissingRangesManipulator.start_link([])
start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -518,7 +518,6 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
Application.put_env(:indexer, :block_ranges, "0..0")
MissingRangesCollector.start_link([])
MissingRangesManipulator.start_link([])
start_supervised({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -578,7 +577,6 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
defp supervisor(%{json_rpc_named_arguments: json_rpc_named_arguments}) do
start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
start_supervised!({MissingRangesManipulator, []})
pid =
start_supervised!(

@ -8,9 +8,10 @@ defmodule Indexer.Block.Catchup.FetcherTest do
alias Explorer.Chain
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Hash
alias Explorer.Utility.MissingRangesManipulator
alias Indexer.Block
alias Indexer.Block.Catchup.Fetcher
alias Indexer.Block.Catchup.{MissingRangesCollector, MissingRangesManipulator}
alias Indexer.Block.Catchup.MissingRangesCollector
alias Indexer.Fetcher.{BlockReward, CoinBalance, InternalTransaction, Token, TokenBalance, UncleBlock}
@moduletag capture_log: true

Loading…
Cancel
Save