@ -18,6 +18,7 @@ import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.metrics.Observation ;
import org.hyperledger.besu.metrics.StandardMetricCategory ;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem ;
import org.hyperledger.besu.plugin.services.metrics.ExternalSummary ;
import org.hyperledger.besu.plugin.services.metrics.LabelledGauge ;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric ;
import org.hyperledger.besu.plugin.services.metrics.MetricCategory ;
@ -34,6 +35,7 @@ import java.util.function.DoubleSupplier;
import java.util.function.Supplier ;
import java.util.stream.Stream ;
import com.google.common.cache.Cache ;
import com.google.common.collect.ImmutableSet ;
import io.prometheus.client.Collector ;
import io.prometheus.client.Collector.MetricFamilySamples ;
@ -42,6 +44,7 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter ;
import io.prometheus.client.Histogram ;
import io.prometheus.client.Summary ;
import io.prometheus.client.guava.cache.CacheMetricsCollector ;
import io.prometheus.client.hotspot.BufferPoolsExports ;
import io.prometheus.client.hotspot.ClassLoadingExports ;
import io.prometheus.client.hotspot.GarbageCollectorExports ;
@ -52,6 +55,7 @@ import io.vertx.core.impl.ConcurrentHashSet;
/** The Prometheus metrics system. */
public class PrometheusMetricsSystem implements ObservableMetricsSystem {
private static final List < String > EXTERNAL_SUMMARY_LABELS = List . of ( "quantile" ) ;
private final Map < MetricCategory , Collection < Collector > > collectors = new ConcurrentHashMap < > ( ) ;
private final CollectorRegistry registry = new CollectorRegistry ( true ) ;
@ -60,6 +64,9 @@ public class PrometheusMetricsSystem implements ObservableMetricsSystem {
private final Map < String , LabelledMetric < OperationTimer > > cachedTimers =
new ConcurrentHashMap < > ( ) ;
private final Set < String > totalSuffixedCounters = new ConcurrentHashSet < > ( ) ;
private final Map < MetricCategory , CacheMetricsCollector > guavaCacheCollectors =
new ConcurrentHashMap < > ( ) ;
private final Set < String > guavaCacheNames = new ConcurrentHashSet < > ( ) ;
private final Set < MetricCategory > enabledCategories ;
private final boolean timersEnabled ;
@ -78,12 +85,16 @@ public class PrometheusMetricsSystem implements ObservableMetricsSystem {
/** Init. */
public void init ( ) {
addCollector ( StandardMetricCategory . PROCESS , StandardExports : : new ) ;
addCollector ( StandardMetricCategory . JVM , MemoryPoolsExports : : new ) ;
addCollector ( StandardMetricCategory . JVM , BufferPoolsExports : : new ) ;
addCollector ( StandardMetricCategory . JVM , GarbageCollectorExports : : new ) ;
addCollector ( StandardMetricCategory . JVM , ThreadExports : : new ) ;
addCollector ( StandardMetricCategory . JVM , ClassLoadingExports : : new ) ;
if ( isCategoryEnabled ( StandardMetricCategory . PROCESS ) ) {
registerCollector ( StandardMetricCategory . PROCESS , new StandardExports ( ) ) ;
}
if ( isCategoryEnabled ( StandardMetricCategory . JVM ) ) {
registerCollector ( StandardMetricCategory . JVM , new MemoryPoolsExports ( ) ) ;
registerCollector ( StandardMetricCategory . JVM , new BufferPoolsExports ( ) ) ;
registerCollector ( StandardMetricCategory . JVM , new GarbageCollectorExports ( ) ) ;
registerCollector ( StandardMetricCategory . JVM , new ThreadExports ( ) ) ;
registerCollector ( StandardMetricCategory . JVM , new ClassLoadingExports ( ) ) ;
}
}
@Override
@ -103,7 +114,7 @@ public class PrometheusMetricsSystem implements ObservableMetricsSystem {
( k ) - > {
if ( isCategoryEnabled ( category ) ) {
final Counter counter = Counter . build ( metricName , help ) . labelNames ( labelNames ) . create ( ) ;
addCollectorUnchecked ( category , counter ) ;
registerCollector ( category , counter ) ;
return new PrometheusCounter ( counter ) ;
} else {
return NoOpMetricsSystem . getCounterLabelledMetric ( labelNames . length ) ;
@ -132,7 +143,7 @@ public class PrometheusMetricsSystem implements ObservableMetricsSystem {
. quantile ( 1 . 0 , 0 )
. labelNames ( labelNames )
. create ( ) ;
addCollectorUnchecked ( category , summary ) ;
registerCollector ( category , summary ) ;
return new PrometheusTimer ( summary ) ;
} else {
return NoOpMetricsSystem . getOperationTimerLabelledMetric ( labelNames . length ) ;
@ -153,7 +164,7 @@ public class PrometheusMetricsSystem implements ObservableMetricsSystem {
if ( timersEnabled & & isCategoryEnabled ( category ) ) {
final Histogram histogram =
Histogram . build ( metricName , help ) . labelNames ( labelNames ) . buckets ( 1D ) . create ( ) ;
addCollectorUnchecked ( category , histogram ) ;
registerCollector ( category , histogram ) ;
return new PrometheusSimpleTimer ( histogram ) ;
} else {
return NoOpMetricsSystem . getOperationTimerLabelledMetric ( labelNames . length ) ;
@ -170,7 +181,61 @@ public class PrometheusMetricsSystem implements ObservableMetricsSystem {
final String metricName = convertToPrometheusName ( category , name ) ;
if ( isCategoryEnabled ( category ) ) {
final Collector collector = new CurrentValueCollector ( metricName , help , valueSupplier ) ;
addCollectorUnchecked ( category , collector ) ;
registerCollector ( category , collector ) ;
}
}
@Override
public void trackExternalSummary (
final MetricCategory category ,
final String name ,
final String help ,
final Supplier < ExternalSummary > summarySupplier ) {
if ( isCategoryEnabled ( category ) ) {
final var externalSummaryCollector =
new Collector ( ) {
@Override
public List < MetricFamilySamples > collect ( ) {
final var externalSummary = summarySupplier . get ( ) ;
final var quantileValues =
externalSummary . quantiles ( ) . stream ( )
. map (
quantile - >
new Sample (
name ,
EXTERNAL_SUMMARY_LABELS ,
List . of ( Double . toString ( quantile . quantile ( ) ) ) ,
quantile . value ( ) ) )
. toList ( ) ;
return List . of (
new MetricFamilySamples (
name , Type . SUMMARY , "RocksDB histogram for " + name , quantileValues ) ) ;
}
} ;
registerCollector ( category , externalSummaryCollector ) ;
}
}
@Override
public void createGuavaCacheCollector (
final MetricCategory category , final String name , final Cache < ? , ? > cache ) {
if ( isCategoryEnabled ( category ) ) {
if ( guavaCacheNames . contains ( name ) ) {
throw new IllegalStateException ( "Cache already registered: " + name ) ;
}
guavaCacheNames . add ( name ) ;
final var guavaCacheCollector =
guavaCacheCollectors . computeIfAbsent (
category ,
unused - > {
final var cmc = new CacheMetricsCollector ( ) ;
registerCollector ( category , cmc ) ;
return cmc ;
} ) ;
guavaCacheCollector . addCache ( name , cache ) ;
}
}
@ -183,46 +248,33 @@ public class PrometheusMetricsSystem implements ObservableMetricsSystem {
final String metricName = convertToPrometheusName ( category , name ) ;
if ( isCategoryEnabled ( category ) ) {
final PrometheusGauge gauge = new PrometheusGauge ( metricName , help , List . of ( labelNames ) ) ;
addCollectorUnchecked ( category , gauge ) ;
registerCollector ( category , gauge ) ;
return gauge ;
}
return NoOpMetricsSystem . getLabelledGauge ( labelNames . length ) ;
}
/ * *
* Add collector .
*
* @param category the category
* @param metricSupplier the metric supplier
* /
public void addCollector (
final MetricCategory category , final Supplier < Collector > metricSupplier ) {
if ( isCategoryEnabled ( category ) ) {
addCollectorUnchecked ( category , metricSupplier . get ( ) ) ;
}
}
private void addCollectorUnchecked ( final MetricCategory category , final Collector metric ) {
final Collection < Collector > metrics =
private void registerCollector ( final MetricCategory category , final Collector collector ) {
final Collection < Collector > categoryCollectors =
this . collectors . computeIfAbsent (
category , key - > Collections . newSetFromMap ( new ConcurrentHashMap < > ( ) ) ) ;
final List < String > newSamples =
metric . collect ( ) . stream ( ) . map ( metricFamilySamples - > metricFamilySamples . name ) . toList ( ) ;
collector . collect ( ) . stream ( ) . map ( metricFamilySamples - > metricFamilySamples . name ) . toList ( ) ;
metric s. stream ( )
categoryCollectors . stream ( )
. filter (
collector - >
collector . collect ( ) . stream ( )
c - >
c . collect ( ) . stream ( )
. anyMatch ( metricFamilySamples - > newSamples . contains ( metricFamilySamples . name ) ) )
. findFirst ( )
. ifPresent (
collector - > {
metric s. remove ( collector ) ;
registry . unregister ( collector ) ;
c - > {
categoryCollector s. remove ( c ) ;
registry . unregister ( c ) ;
} ) ;
metrics . add ( metric . register ( registry ) ) ;
categoryCollectors . add ( collector . register ( registry ) ) ;
}
@Override
@ -237,6 +289,16 @@ public class PrometheusMetricsSystem implements ObservableMetricsSystem {
return collectors . keySet ( ) . stream ( ) . flatMap ( this : : streamObservations ) ;
}
@Override
public void shutdown ( ) {
registry . clear ( ) ;
collectors . clear ( ) ;
cachedCounters . clear ( ) ;
cachedTimers . clear ( ) ;
guavaCacheCollectors . clear ( ) ;
guavaCacheNames . clear ( ) ;
}
private Stream < Observation > convertSamplesToObservations (
final MetricCategory category , final MetricFamilySamples familySamples ) {
return familySamples . samples . stream ( )