Merge pull request #7485 from blockscout/fix-missing-ranges-deletion

Fix missing block ranges clearing
catchup-block-interval-env
Victor Baranov 2 years ago committed by GitHub
commit 1af483074a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CHANGELOG.md
  2. 44
      apps/explorer/lib/explorer/utility/missing_block_range.ex
  3. 9
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  4. 50
      apps/indexer/lib/indexer/block/catchup/missing_ranges_manipulator.ex
  5. 3
      apps/indexer/lib/indexer/block/catchup/supervisor.ex
  6. 5
      apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs
  7. 6
      apps/indexer/test/indexer/block/catchup/fetcher_test.exs

@ -11,7 +11,7 @@
- [#7355](https://github.com/blockscout/blockscout/pull/7355) - Add endpoint for token info import - [#7355](https://github.com/blockscout/blockscout/pull/7355) - Add endpoint for token info import
- [#7393](https://github.com/blockscout/blockscout/pull/7393) - Realtime fetcher max gap - [#7393](https://github.com/blockscout/blockscout/pull/7393) - Realtime fetcher max gap
- [#7436](https://github.com/blockscout/blockscout/pull/7436) - TokenBalanceOnDemand ERC-1155 support - [#7436](https://github.com/blockscout/blockscout/pull/7436) - TokenBalanceOnDemand ERC-1155 support
- [#7469](https://github.com/blockscout/blockscout/pull/7469) - Clear missing block ranges after every success import - [#7469](https://github.com/blockscout/blockscout/pull/7469), [#7485](https://github.com/blockscout/blockscout/pull/7485) - Clear missing block ranges after every success import
### Fixes ### Fixes

@ -57,6 +57,7 @@ defmodule Explorer.Utility.MissingBlockRange do
update_to_number_or_delete_range(range, max_number + 1) update_to_number_or_delete_range(range, max_number + 1)
{%__MODULE__{} = range_1, %__MODULE__{} = range_2} -> {%__MODULE__{} = range_1, %__MODULE__{} = range_2} ->
delete_ranges_between(range_2.to_number, range_1.from_number)
update_from_number_or_delete_range(range_1, min_number - 1) update_from_number_or_delete_range(range_1, min_number - 1)
update_to_number_or_delete_range(range_2, max_number + 1) update_to_number_or_delete_range(range_2, max_number + 1)
@ -107,6 +108,41 @@ defmodule Explorer.Utility.MissingBlockRange do
|> Repo.one() |> Repo.one()
end end
defp delete_ranges_between(from, to) do
from
|> from_number_below_query()
|> to_number_above_query(to)
|> Repo.delete_all()
end
def sanitize_missing_block_ranges do
__MODULE__
|> where([r], r.from_number < r.to_number)
|> update([r], set: [from_number: r.to_number, to_number: r.from_number])
|> Repo.update_all([])
__MODULE__
|> join(:inner, [r], r1 in __MODULE__,
on:
((r1.from_number <= r.from_number and r1.from_number >= r.to_number) or
(r1.to_number <= r.from_number and r1.to_number >= r.to_number)) and r1.id != r.id
)
|> select([r, r1], [r, r1])
|> Repo.all()
|> Enum.map(&Enum.sort/1)
|> Enum.uniq()
|> Enum.map(fn [range_1, range_2] ->
Repo.delete(range_2)
range_1
|> changeset(%{
from_number: max(range_1.from_number, range_2.from_number),
to_number: min(range_1.to_number, range_2.to_number)
})
|> Repo.update()
end)
end
def min_max_block_query do def min_max_block_query do
from(r in __MODULE__, select: %{min: min(r.to_number), max: max(r.from_number)}) from(r in __MODULE__, select: %{min: min(r.to_number), max: max(r.from_number)})
end end
@ -115,12 +151,12 @@ defmodule Explorer.Utility.MissingBlockRange do
from(r in __MODULE__, order_by: [desc: r.from_number], limit: ^size) from(r in __MODULE__, order_by: [desc: r.from_number], limit: ^size)
end end
def from_number_below_query(lower_bound) do def from_number_below_query(query \\ __MODULE__, lower_bound) do
from(r in __MODULE__, where: r.from_number < ^lower_bound) from(r in query, where: r.from_number < ^lower_bound)
end end
def to_number_above_query(upper_bound) do def to_number_above_query(query \\ __MODULE__, upper_bound) do
from(r in __MODULE__, where: r.to_number > ^upper_bound) from(r in query, where: r.to_number > ^upper_bound)
end end
def include_bound_query(bound) do def include_bound_query(bound) do

@ -23,9 +23,8 @@ defmodule Indexer.Block.Catchup.Fetcher do
alias Ecto.Changeset alias Ecto.Changeset
alias Explorer.Chain alias Explorer.Chain
alias Explorer.Utility.MissingBlockRange
alias Indexer.{Block, Tracer} alias Indexer.{Block, Tracer}
alias Indexer.Block.Catchup.{Sequence, TaskSupervisor} alias Indexer.Block.Catchup.{MissingRangesManipulator, Sequence, TaskSupervisor}
alias Indexer.Memory.Shrinkable alias Indexer.Memory.Shrinkable
alias Indexer.Prometheus alias Indexer.Prometheus
@ -46,7 +45,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
def task(state) do def task(state) do
Logger.metadata(fetcher: :block_catchup) Logger.metadata(fetcher: :block_catchup)
case MissingBlockRange.get_latest_batch() do case MissingRangesManipulator.get_latest_batch() do
[] -> [] ->
%{ %{
first_block_number: nil, first_block_number: nil,
@ -76,7 +75,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
shrunk = Shrinkable.shrunk?(sequence) shrunk = Shrinkable.shrunk?(sequence)
MissingBlockRange.clear_batch(missing_ranges) MissingRangesManipulator.clear_batch(missing_ranges)
%{ %{
first_block_number: first, first_block_number: first,
@ -276,7 +275,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
success_numbers success_numbers
|> numbers_to_ranges() |> numbers_to_ranges()
|> Enum.map(&MissingBlockRange.delete_range/1) |> Enum.map(&MissingRangesManipulator.delete_range/1)
end end
defp block_errors_to_block_number_ranges(block_errors) when is_list(block_errors) do defp block_errors_to_block_number_ranges(block_errors) when is_list(block_errors) do

@ -0,0 +1,50 @@
defmodule Indexer.Block.Catchup.MissingRangesManipulator do
@moduledoc """
Performs concurrent-safe actions on missing block ranges.
"""
use GenServer
alias Explorer.Utility.MissingBlockRange
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def get_latest_batch do
GenServer.call(__MODULE__, :get_latest_batch)
end
def delete_range(range) do
GenServer.cast(__MODULE__, {:delete_range, range})
end
def clear_batch(batch) do
GenServer.cast(__MODULE__, {:clear_batch, batch})
end
@impl true
def init(_) do
MissingBlockRange.sanitize_missing_block_ranges()
{:ok, %{}}
end
@impl true
def handle_call(:get_latest_batch, _from, state) do
{:reply, MissingBlockRange.get_latest_batch(), state}
end
@impl true
def handle_cast({:delete_range, range}, state) do
MissingBlockRange.delete_range(range)
{:noreply, state}
end
def handle_cast({:clear_batch, batch}, state) do
MissingBlockRange.clear_batch(batch)
{:noreply, state}
end
end

@ -5,7 +5,7 @@ defmodule Indexer.Block.Catchup.Supervisor do
use Supervisor use Supervisor
alias Indexer.Block.Catchup.{BoundIntervalSupervisor, MissingRangesCollector} alias Indexer.Block.Catchup.{BoundIntervalSupervisor, MissingRangesCollector, MissingRangesManipulator}
def child_spec([init_arguments]) do def child_spec([init_arguments]) do
child_spec([init_arguments, []]) child_spec([init_arguments, []])
@ -29,6 +29,7 @@ defmodule Indexer.Block.Catchup.Supervisor do
def init(bound_interval_supervisor_arguments) do def init(bound_interval_supervisor_arguments) do
Supervisor.init( Supervisor.init(
[ [
{MissingRangesManipulator, []},
{MissingRangesCollector, []}, {MissingRangesCollector, []},
{Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor}, {Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor},
{BoundIntervalSupervisor, [bound_interval_supervisor_arguments, [name: BoundIntervalSupervisor]]} {BoundIntervalSupervisor, [bound_interval_supervisor_arguments, [name: BoundIntervalSupervisor]]}

@ -10,7 +10,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
alias Explorer.Repo alias Explorer.Repo
alias Indexer.BoundInterval alias Indexer.BoundInterval
alias Indexer.Block.Catchup alias Indexer.Block.Catchup
alias Indexer.Block.Catchup.MissingRangesCollector alias Indexer.Block.Catchup.{MissingRangesCollector, MissingRangesManipulator}
alias Indexer.Fetcher.{ alias Indexer.Fetcher.{
CoinBalance, CoinBalance,
@ -425,6 +425,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
insert(:block, number: 1) insert(:block, number: 1)
MissingRangesCollector.start_link([]) MissingRangesCollector.start_link([])
MissingRangesManipulator.start_link([])
start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor}) start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -517,6 +518,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
Application.put_env(:indexer, :block_ranges, "0..0") Application.put_env(:indexer, :block_ranges, "0..0")
MissingRangesCollector.start_link([]) MissingRangesCollector.start_link([])
MissingRangesManipulator.start_link([])
start_supervised({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor}) start_supervised({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) CoinBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
@ -576,6 +578,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
defp supervisor(%{json_rpc_named_arguments: json_rpc_named_arguments}) do defp supervisor(%{json_rpc_named_arguments: json_rpc_named_arguments}) do
start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor}) start_supervised!({Task.Supervisor, name: Indexer.Block.Catchup.TaskSupervisor})
start_supervised!({MissingRangesManipulator, []})
pid = pid =
start_supervised!( start_supervised!(

@ -10,7 +10,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
alias Explorer.Chain.Hash alias Explorer.Chain.Hash
alias Indexer.Block alias Indexer.Block
alias Indexer.Block.Catchup.Fetcher alias Indexer.Block.Catchup.Fetcher
alias Indexer.Block.Catchup.MissingRangesCollector alias Indexer.Block.Catchup.{MissingRangesCollector, MissingRangesManipulator}
alias Indexer.Fetcher.{BlockReward, CoinBalance, InternalTransaction, Token, TokenBalance, UncleBlock} alias Indexer.Fetcher.{BlockReward, CoinBalance, InternalTransaction, Token, TokenBalance, UncleBlock}
@moduletag capture_log: true @moduletag capture_log: true
@ -49,6 +49,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
MissingRangesCollector.start_link([]) MissingRangesCollector.start_link([])
MissingRangesManipulator.start_link([])
parent = self() parent = self()
@ -150,6 +151,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
MissingRangesCollector.start_link([]) MissingRangesCollector.start_link([])
MissingRangesManipulator.start_link([])
latest_block_number = 2 latest_block_number = 2
latest_block_quantity = integer_to_quantity(latest_block_number) latest_block_quantity = integer_to_quantity(latest_block_number)
@ -307,6 +309,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
MissingRangesCollector.start_link([]) MissingRangesCollector.start_link([])
MissingRangesManipulator.start_link([])
latest_block_number = 2 latest_block_number = 2
latest_block_quantity = integer_to_quantity(latest_block_number) latest_block_quantity = integer_to_quantity(latest_block_number)
@ -461,6 +464,7 @@ defmodule Indexer.Block.Catchup.FetcherTest do
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) TokenBalance.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
MissingRangesCollector.start_link([]) MissingRangesCollector.start_link([])
MissingRangesManipulator.start_link([])
latest_block_number = 2 latest_block_number = 2
latest_block_quantity = integer_to_quantity(latest_block_number) latest_block_quantity = integer_to_quantity(latest_block_number)

Loading…
Cancel
Save