Empty blocks sanitizer

pull/4724/head
Viktor Baranov 3 years ago
parent 2bd8c5ff34
commit dee0af3bf9
  1. 1
      CHANGELOG.md
  2. 25
      apps/explorer/lib/explorer/chain.ex
  3. 4
      apps/explorer/lib/explorer/chain/block.ex
  4. 10
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  5. 11
      apps/explorer/priv/repo/migrations/20211006121008_add_block_is_empty_flag.exs
  6. 17
      apps/explorer/priv/repo/migrations/20211018072347_add_is_empty_index.exs
  7. 5
      apps/indexer/config/dev.exs
  8. 15
      apps/indexer/config/prod.exs
  9. 143
      apps/indexer/lib/indexer/empty_blocks_sanitizer.ex
  10. 7
      apps/indexer/lib/indexer/pending_transactions_sanitizer.ex
  11. 2
      apps/indexer/lib/indexer/supervisor.ex
  12. 3
      config/config.exs

@ -30,6 +30,7 @@
- [#4711](https://github.com/blockscout/blockscout/pull/4711) - Add trimming to the contract functions inputs
- [#4729](https://github.com/blockscout/blockscout/pull/4729) - Fix bugs with fees in cases of txs with `gas price = 0`
- [#4725](https://github.com/blockscout/blockscout/pull/4725) - Fix hardcoded coin name on transaction's and block's page
- [#4724](https://github.com/blockscout/blockscout/pull/4724) - An empty blocks sanitizer
- [#4717](https://github.com/blockscout/blockscout/pull/4717) - Contract verification fix: check only success creation tx
- [#4713](https://github.com/blockscout/blockscout/pull/4713) - Search input field: sanitize input
- [#4703](https://github.com/blockscout/blockscout/pull/4703) - Block Details page: Fix pagination on the Transactions tab

@ -3309,6 +3309,31 @@ defmodule Explorer.Chain do
|> Repo.all(timeout: :infinity)
end
@doc """
Returns the list of empty blocks from the DB which have not marked with `t:Explorer.Chain.Block.is_empty/0`.
This query used for initializtion of Indexer.EmptyBlocksSanitizer
"""
def unprocessed_empty_blocks_query_list(limit) do
query =
from(block in Block,
as: :block,
where: block.consensus == true,
where: is_nil(block.is_empty),
where:
not exists(
from(transaction in Transaction,
where: transaction.block_number == parent_as(:block).number
)
),
select: {block.number, block.hash},
order_by: [desc: block.number],
limit: ^limit
)
query
|> Repo.all(timeout: :infinity)
end
@doc """
The `string` must start with `0x`, then is converted to an integer and then to `t:Explorer.Chain.Hash.Address.t/0`.

@ -64,7 +64,8 @@ defmodule Explorer.Chain.Block do
total_difficulty: difficulty(),
transactions: %Ecto.Association.NotLoaded{} | [Transaction.t()],
refetch_needed: boolean(),
base_fee_per_gas: Wei.t()
base_fee_per_gas: Wei.t(),
is_empty: boolean()
}
@primary_key {:hash, Hash.Full, autogenerate: false}
@ -80,6 +81,7 @@ defmodule Explorer.Chain.Block do
field(:total_difficulty, :decimal)
field(:refetch_needed, :boolean)
field(:base_fee_per_gas, Wei)
field(:is_empty, :boolean)
timestamps()

@ -13,6 +13,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
alias Explorer.Chain.Import.Runner
alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances
alias Explorer.Chain.Import.Runner.Tokens
alias Explorer.Repo, as: ExplorerRepo
@behaviour Runner
@ -309,6 +310,15 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
{:error, %{exception: postgrex_error, consensus_block_numbers: consensus_block_numbers}}
end
def invalidate_consensus_blocks(block_numbers) do
opts = %{
timeout: 60_000,
timestamps: %{updated_at: DateTime.utc_now()}
}
lose_consensus(ExplorerRepo, [], block_numbers, [], opts)
end
defp new_pending_operations(repo, nonconsensus_hashes, hashes, %{timeout: timeout, timestamps: timestamps}) do
if Application.get_env(:explorer, :json_rpc_named_arguments)[:variant] == EthereumJSONRPC.RSK do
{:ok, []}

@ -0,0 +1,11 @@
defmodule Explorer.Repo.Migrations.AddBlockIsEmptyFlag do
use Ecto.Migration
def change do
alter table(:blocks) do
add(:is_empty, :bool, null: true)
end
create(index(:blocks, [:is_empty]))
end
end

@ -0,0 +1,17 @@
defmodule Explorer.Repo.Migrations.AddIsEmptyIndex do
use Ecto.Migration
@disable_ddl_transaction true
@disable_migration_lock true
def change do
create(
index(
:blocks,
~w(consensus)a,
name: :empty_consensus_blocks,
where: "is_empty IS NULL",
concurrently: true
)
)
end
end

@ -26,6 +26,11 @@ config :logger, :pending_transactions_to_refetch,
path: Path.absname("logs/dev/indexer/pending_transactions_to_refetch.log"),
metadata_filter: [fetcher: :pending_transactions_to_refetch]
config :logger, :empty_blocks_to_refetch,
level: :debug,
path: Path.absname("logs/dev/indexer/empty_blocks_to_refetch.log"),
metadata_filter: [fetcher: :empty_blocks_to_refetch]
variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do
"ganache"

@ -16,17 +16,26 @@ config :logger, :indexer_token_balances,
config :logger, :failed_contract_creations,
level: :debug,
path: Path.absname("logs/prod/indexer/failed_contract_creations.log"),
metadata_filter: [fetcher: :failed_created_addresses]
metadata_filter: [fetcher: :failed_created_addresses],
rotate: %{max_bytes: 52_428_800, keep: 19}
config :logger, :addresses_without_code,
level: :debug,
path: Path.absname("logs/prod/indexer/addresses_without_code.log"),
metadata_filter: [fetcher: :addresses_without_code]
metadata_filter: [fetcher: :addresses_without_code],
rotate: %{max_bytes: 52_428_800, keep: 19}
config :logger, :pending_transactions_to_refetch,
level: :debug,
path: Path.absname("logs/prod/indexer/pending_transactions_to_refetch.log"),
metadata_filter: [fetcher: :pending_transactions_to_refetch]
metadata_filter: [fetcher: :pending_transactions_to_refetch],
rotate: %{max_bytes: 52_428_800, keep: 19}
config :logger, :empty_blocks_to_refetch,
level: :info,
path: Path.absname("logs/prod/indexer/empty_blocks_to_refetch.log"),
metadata_filter: [fetcher: :empty_blocks_to_refetch],
rotate: %{max_bytes: 52_428_800, keep: 19}
variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do

@ -0,0 +1,143 @@
defmodule Indexer.EmptyBlocksSanitizer do
@moduledoc """
Periodically checks empty blocks starting from the head of the chain, detects for which blocks transactions should be refetched
and loose consensus for block in order to refetch transactions.
"""
use GenServer
require Logger
import Ecto.Query,
only: [
from: 2
]
import EthereumJSONRPC, only: [integer_to_quantity: 1, json_rpc: 2, request: 1]
alias Explorer.{Chain, Repo}
alias Explorer.Chain.Block
alias Explorer.Chain.Import.Runner.Blocks
# milliseconds
@timeout 1_000
# unprocessed emty blocks to fetch at once
@limit 400
@interval :timer.minutes(2)
defstruct interval: @interval,
json_rpc_named_arguments: []
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments}
}
Supervisor.child_spec(default, [])
end
def start_link(init_opts, gen_server_opts \\ []) do
GenServer.start_link(__MODULE__, init_opts, gen_server_opts)
end
def init(opts) when is_list(opts) do
state = %__MODULE__{
json_rpc_named_arguments: Keyword.fetch!(opts, :json_rpc_named_arguments),
interval: opts[:interval] || @interval
}
Process.send_after(self(), :sanitize_empty_blocks, state.interval)
{:ok, state}
end
def handle_info(
:sanitize_empty_blocks,
%{interval: interval, json_rpc_named_arguments: json_rpc_named_arguments} = state
) do
Logger.info("Start sanitizing of empty blocks. Batch size is #{@limit}",
fetcher: :empty_blocks_to_refetch
)
sanitize_empty_blocks(json_rpc_named_arguments)
Process.send_after(self(), :sanitize_empty_blocks, interval)
{:noreply, state}
end
defp sanitize_empty_blocks(json_rpc_named_arguments) do
unprocessed_empty_blocks_from_db = Chain.unprocessed_empty_blocks_query_list(@limit)
unprocessed_empty_blocks_from_db
|> Enum.with_index()
|> Enum.each(fn {{block_number, block_hash}, ind} ->
with {:ok, %{"transactions" => transactions}} <-
%{id: ind, method: "eth_getBlockByNumber", params: [integer_to_quantity(block_number), false]}
|> request()
|> json_rpc(json_rpc_named_arguments) do
transactions_count =
transactions
|> Enum.count()
if transactions_count > 0 do
Logger.info(
"Block with number #{block_number} and hash #{to_string(block_hash)} is full of transactions. We should set consensus=false for it in order to refetch.",
fetcher: :empty_blocks_to_refetch
)
Blocks.invalidate_consensus_blocks([block_number])
else
Logger.debug(
"Block with number #{block_number} and hash #{to_string(block_hash)} is empty. We should set is_empty=true for it.",
fetcher: :empty_blocks_to_refetch
)
set_is_empty_for_block(block_hash)
end
end
end)
Logger.info("Batch of empty blocks is sanitized",
fetcher: :empty_blocks_to_refetch
)
end
defp set_is_empty_for_block(block_hash) do
query =
from(
block in Block,
select: block,
where: block.hash == ^block_hash,
lock: "FOR UPDATE"
)
updated_at = DateTime.utc_now()
update_query =
from(
b in Block,
join: s in subquery(query),
on: b.hash == s.hash,
update: [
set: [
is_empty: true,
updated_at: ^updated_at
]
],
select: s
)
Repo.update_all(update_query, [], timeout: @timeout)
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error}}
end
end

@ -152,12 +152,7 @@ defmodule Indexer.PendingTransactionsSanitizer do
defp invalidate_block(block_number, block_hash, consensus, pending_tx, tx) do
if consensus do
opts = %{
timeout: 60_000,
timestamps: %{updated_at: DateTime.utc_now()}
}
Blocks.lose_consensus(Repo, [], [block_number], [], opts)
Blocks.invalidate_consensus_blocks([block_number])
else
{:ok, hash} = Hash.cast(block_hash)
tx_info = to_elixir(tx)

@ -10,6 +10,7 @@ defmodule Indexer.Supervisor do
alias Indexer.{
Block,
CalcLpTokensTotalLiqudity,
EmptyBlocksSanitizer,
PendingOpsCleaner,
PendingTransactionsSanitizer,
SetAmbBridgedMetadataForTokens,
@ -126,6 +127,7 @@ defmodule Indexer.Supervisor do
# Out-of-band fetchers
{CoinBalanceOnDemand.Supervisor, [json_rpc_named_arguments]},
{EmptyBlocksSanitizer, [[json_rpc_named_arguments: json_rpc_named_arguments]]},
{TokenTotalSupplyOnDemand.Supervisor, [json_rpc_named_arguments]},
{PendingTransactionsSanitizer, [[json_rpc_named_arguments: json_rpc_named_arguments]]},

@ -30,7 +30,8 @@ config :logger,
{LoggerFileBackend, :indexer_token_balances},
{LoggerFileBackend, :token_instances},
{LoggerFileBackend, :reading_token_functions},
{LoggerFileBackend, :pending_transactions_to_refetch}
{LoggerFileBackend, :pending_transactions_to_refetch},
{LoggerFileBackend, :empty_blocks_to_refetch}
]
config :logger, :console,

Loading…
Cancel
Save