diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java index bba9eee2da..c995e4a0af 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java @@ -12,6 +12,8 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager.task; +import tech.pegasys.pantheon.metrics.Counter; +import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.util.ArrayList; @@ -37,6 +39,9 @@ public abstract class AbstractPipelinedTask extends AbstractEthTask processingException = new AtomicReference<>(null); + private final Counter inboundQueueCounter; + private final Counter outboundQueueCounter; + protected AbstractPipelinedTask( final BlockingQueue inboundQueue, final int outboundBacklogSize, @@ -45,6 +50,16 @@ public abstract class AbstractPipelinedTask extends AbstractEthTask(outboundBacklogSize); results = new ArrayList<>(); + this.inboundQueueCounter = + metricsSystem.createCounter( + MetricCategory.SYNCHRONIZER, + "inboundQueueCounter", + "count of queue items that started processing"); + this.outboundQueueCounter = + metricsSystem.createCounter( + MetricCategory.SYNCHRONIZER, + "outboundQueueCounter", + "count of queue items that finished processing"); } @Override @@ -62,6 +77,7 @@ public abstract class AbstractPipelinedTask extends AbstractEthTask extends AbstractEthTask protocolSchedule; + private EthContext ethContext; + private ProtocolContext protocolContext; + private SyncState syncState; + private MutableBlockchain localBlockchain; + private MetricsSystem metricsSystem; + + @Test + public void parallelDownloadPipelineCounterShouldIncrement() { + metricsConfiguration.setEnabled(true); + metricsSystem = PrometheusMetricsSystem.init(metricsConfiguration); + + final BlockchainSetupUtil localBlockchainSetup = BlockchainSetupUtil.forTesting(); + localBlockchain = spy(localBlockchainSetup.getBlockchain()); + final BlockchainSetupUtil otherBlockchainSetup = BlockchainSetupUtil.forTesting(); + final Blockchain otherBlockchain = otherBlockchainSetup.getBlockchain(); + + protocolSchedule = localBlockchainSetup.getProtocolSchedule(); + protocolContext = localBlockchainSetup.getProtocolContext(); + final EthProtocolManager ethProtocolManager = + EthProtocolManagerTestUtil.create( + localBlockchain, + localBlockchainSetup.getWorldArchive(), + new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); + ethContext = ethProtocolManager.ethContext(); + syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); + + otherBlockchainSetup.importFirstBlocks(15); + final long targetBlock = otherBlockchain.getChainHeadBlockNumber(); + // Sanity check + assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber()); + + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder(otherBlockchain); + + final SynchronizerConfiguration syncConfig = + SynchronizerConfiguration.builder().downloaderChainSegmentSize(10).build(); + final FullSyncDownloader downloader = downloader(syncConfig); + downloader.start(); + + peer.respondWhileOtherThreadsWork(responder, () -> !syncState.syncTarget().isPresent()); + assertThat(syncState.syncTarget()).isPresent(); + assertThat(syncState.syncTarget().get().peer()).isEqualTo(peer.getEthPeer()); + + peer.respondWhileOtherThreadsWork( + responder, () -> localBlockchain.getChainHeadBlockNumber() < targetBlock); + + assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(targetBlock); + + final List metrics = + metricsSystem.getMetrics(MetricCategory.SYNCHRONIZER).collect(Collectors.toList()); + final Observation inboundObservation = + new Observation(MetricCategory.SYNCHRONIZER, "inboundQueueCounter", 6.0, Lists.emptyList()); + final Observation outboundObservation = + new Observation( + MetricCategory.SYNCHRONIZER, "outboundQueueCounter", 5.0, Lists.emptyList()); + assertThat(metrics).contains(inboundObservation, outboundObservation); + } + + private FullSyncDownloader downloader(final SynchronizerConfiguration syncConfig) { + return new FullSyncDownloader<>( + syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem); + } +}