Merge pull request #6111 from blockscout/mf-add-prometheus-metrics-for-indexer

Add Prometheus metrics to indexer
pull/6124/head
Victor Baranov 2 years ago committed by GitHub
commit a39246f709
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      apps/indexer/lib/indexer/application.ex
  3. 11
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  4. 14
      apps/indexer/lib/indexer/block/fetcher.ex
  5. 7
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  6. 61
      apps/indexer/lib/indexer/prometheus/instrumenter.ex
  7. 29
      apps/indexer/lib/indexer/prometheus/pending_block_operations_collector.ex

@ -4,6 +4,7 @@
- [#6092](https://github.com/blockscout/blockscout/pull/6092) - Blockscout Account functionality
- [#6073](https://github.com/blockscout/blockscout/pull/6073) - Add vyper support for rust verifier microservice integration
- [#6111](https://github.com/blockscout/blockscout/pull/6111) - Add Prometheus metrics to indexer
### Fixes

@ -6,9 +6,13 @@ defmodule Indexer.Application do
use Application
alias Indexer.Memory
alias Indexer.Prometheus.PendingBlockOperationsCollector
alias Prometheus.Registry
@impl Application
def start(_type, _args) do
Registry.register_collector(PendingBlockOperationsCollector)
memory_monitor_options =
case Application.get_env(:indexer, :memory_limit) do
nil -> %{}

@ -26,6 +26,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
alias Indexer.{Block, Tracer}
alias Indexer.Block.Catchup.Sequence
alias Indexer.Memory.Shrinkable
alias Indexer.Prometheus
@behaviour Block.Fetcher
@ -97,6 +98,8 @@ defmodule Indexer.Block.Catchup.Fetcher do
|> Stream.map(&Enum.count/1)
|> Enum.sum()
Prometheus.Instrumenter.missing_blocks(missing_block_count)
Logger.debug(fn -> "Missed blocks in ranges." end,
missing_block_range_count: range_count,
missing_block_count: missing_block_count
@ -206,7 +209,11 @@ defmodule Indexer.Block.Catchup.Fetcher do
) do
Logger.metadata(fetcher: :block_catchup, first_block_number: first, last_block_number: last)
case fetch_and_import_range(block_fetcher, range) do
{fetch_duration, result} = :timer.tc(fn -> fetch_and_import_range(block_fetcher, range) end)
Prometheus.Instrumenter.block_full_process(fetch_duration, __MODULE__)
case result do
{:ok, %{inserted: inserted, errors: errors}} ->
errors = cap_seq(sequence, errors)
retry(sequence, errors)
@ -214,6 +221,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
{:ok, inserted: inserted}
{:error, {:import = step, [%Changeset{} | _] = changesets}} = error ->
Prometheus.Instrumenter.import_errors()
Logger.error(fn -> ["failed to validate: ", inspect(changesets), ". Retrying."] end, step: step)
push_back(sequence, range)
@ -221,6 +229,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
error
{:error, {:import = step, reason}} = error ->
Prometheus.Instrumenter.import_errors()
Logger.error(fn -> [inspect(reason), ". Retrying."] end, step: step)
push_back(sequence, range)

@ -29,7 +29,7 @@ defmodule Indexer.Block.Fetcher do
UncleBlock
}
alias Indexer.Tracer
alias Indexer.{Prometheus, Tracer}
alias Indexer.Transform.{
AddressCoinBalances,
@ -121,6 +121,9 @@ defmodule Indexer.Block.Fetcher do
_.._ = range
)
when callback_module != nil do
{fetch_time, fetched_blocks} =
:timer.tc(fn -> EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments) end)
with {:blocks,
{:ok,
%Blocks{
@ -128,7 +131,7 @@ defmodule Indexer.Block.Fetcher do
transactions_params: transactions_params_without_receipts,
block_second_degree_relations_params: block_second_degree_relations_params,
errors: blocks_errors
}}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
}}} <- {:blocks, fetched_blocks},
blocks = TransformBlocks.transform_blocks(blocks_params),
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_params_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params,
@ -180,6 +183,7 @@ defmodule Indexer.Block.Fetcher do
transactions: %{params: transactions_with_receipts}
}
) do
Prometheus.Instrumenter.block_batch_fetch(fetch_time, callback_module)
result = {:ok, %{inserted: inserted, errors: blocks_errors}}
update_block_cache(inserted[:blocks])
update_transactions_cache(inserted[:transactions])
@ -231,7 +235,11 @@ defmodule Indexer.Block.Fetcher do
}
)
callback_module.import(state, options_with_broadcast)
{import_time, result} = :timer.tc(fn -> callback_module.import(state, options_with_broadcast) end)
no_blocks_to_import = length(options_with_broadcast.blocks.params)
Prometheus.Instrumenter.block_import(import_time / no_blocks_to_import, callback_module)
result
end
def async_import_token_instances(%{token_transfers: token_transfers}) do

@ -33,6 +33,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
alias Indexer.{Block, Tracer}
alias Indexer.Block.Realtime.TaskSupervisor
alias Indexer.Fetcher.CoinBalance
alias Indexer.Prometheus
alias Indexer.Transform.Addresses
alias Timex.Duration
@ -295,6 +296,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
{fetch_duration, result} =
:timer.tc(fn -> fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) end)
Prometheus.Instrumenter.block_full_process(fetch_duration, __MODULE__)
case result do
{:ok, %{inserted: inserted, errors: []}} ->
log_import_timings(inserted, fetch_duration, time_before)
@ -310,6 +313,8 @@ defmodule Indexer.Block.Realtime.Fetcher do
end)
{:error, {:import = step, [%Changeset{} | _] = changesets}} ->
Prometheus.Instrumenter.import_errors()
params = %{
changesets: changesets,
block_number_to_fetch: block_number_to_fetch,
@ -333,6 +338,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
end
{:error, {:import = step, reason}} ->
Prometheus.Instrumenter.import_errors()
Logger.error(fn -> inspect(reason) end, step: step)
{:error, {step, reason}} ->
@ -363,6 +369,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
defp log_import_timings(%{blocks: [%{number: number, timestamp: timestamp}]}, fetch_duration, time_before) do
node_delay = Timex.diff(time_before, timestamp, :seconds)
Prometheus.Instrumenter.node_delay(node_delay)
Logger.debug("Block #{number} fetching duration: #{fetch_duration / 1_000_000}s. Node delay: #{node_delay}s.",
fetcher: :block_import_timings

@ -0,0 +1,61 @@
defmodule Indexer.Prometheus.Instrumenter do
@moduledoc """
Blocks fetch and import metrics for `Prometheus`.
"""
use Prometheus.Metric
@histogram [
name: :block_full_processing_duration_microseconds,
labels: [:fetcher],
buckets: [1000, 5000, 10000, 100_000],
duration_unit: :microseconds,
help: "Block whole processing time including fetch and import"
]
@histogram [
name: :block_import_duration_microseconds,
labels: [:fetcher],
buckets: [1000, 5000, 10000, 100_000],
duration_unit: :microseconds,
help: "Block import time"
]
@histogram [
name: :block_batch_fetch_request_duration_microseconds,
labels: [:fetcher],
buckets: [1000, 5000, 10000, 100_000],
duration_unit: :microseconds,
help: "Block fetch batch request processing time"
]
@gauge [name: :missing_block_count, help: "Number of missing blocks in the database"]
@gauge [name: :delay_from_last_node_block, help: "Delay from the last block on the node in seconds"]
@counter [name: :import_errors_count, help: "Number of database import errors"]
def block_full_process(time, fetcher) do
Histogram.observe([name: :block_full_processing_duration_microseconds, labels: [fetcher]], time)
end
def block_import(time, fetcher) do
Histogram.observe([name: :block_import_duration_microseconds, labels: [fetcher]], time)
end
def block_batch_fetch(time, fetcher) do
Histogram.observe([name: :block_batch_fetch_request_duration_microseconds, labels: [fetcher]], time)
end
def missing_blocks(missing_block_count) do
Gauge.set([name: :missing_block_count], missing_block_count)
end
def node_delay(delay) do
Gauge.set([name: :delay_from_last_node_block], delay)
end
def import_errors(error_count \\ 1) do
Counter.inc([name: :import_errors_count], error_count)
end
end

@ -0,0 +1,29 @@
defmodule Indexer.Prometheus.PendingBlockOperationsCollector do
@moduledoc """
Custom collector to count number of records in pending_block_operations table.
"""
use Prometheus.Collector
alias Explorer.Chain.PendingBlockOperation
alias Explorer.Repo
alias Prometheus.Model
def collect_mf(_registry, callback) do
callback.(
create_gauge(
:pending_block_operations_count,
"Number of records in pending_block_operations table",
Repo.aggregate(PendingBlockOperation, :count)
)
)
end
def collect_metrics(:pending_block_operations_count, count) do
Model.gauge_metrics([{count}])
end
defp create_gauge(name, help, data) do
Model.create_mf(name, help, :gauge, __MODULE__, data)
end
end
Loading…
Cancel
Save