Time all tasks (#361)

* Time all tasks

This is fairly high touch consisting of 3 things:
* Moving to Prometheus's Summary for timers
  * Timing at .2, .5, .8, .9, .99, and 1.0 (1.0 actually gets max I believe)
* Timing all abstract EthTasks
* The bulk of the changes: plumbing the timing context everywhere we need it
Danno Ferrin 6 years ago committed by GitHub
parent aac4fd40c5
commit 17a4e88798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTask.java
  2. 9
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerRequestTask.java
  3. 6
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractPeerTask.java
  4. 21
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java
  5. 28
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java
  6. 17
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTracker.java
  7. 12
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java
  8. 21
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/Downloader.java
  9. 7
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/AbstractGetHeadersFromPeerTask.java
  10. 24
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java
  11. 18
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTask.java
  12. 29
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java
  13. 23
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTask.java
  14. 12
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTask.java
  15. 58
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTask.java
  16. 27
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTask.java
  17. 22
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java
  18. 38
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTask.java
  19. 33
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTask.java
  20. 11
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTask.java
  21. 15
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTask.java
  22. 3
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractEthTaskTest.java
  23. 6
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockEthTask.java
  24. 5
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java
  25. 10
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java
  26. 7
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/ChainHeadTrackerTest.java
  27. 9
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/DownloaderTest.java
  28. 7
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java
  29. 8
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskParameterizedTest.java
  30. 27
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskTest.java
  31. 30
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java
  32. 3
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBlockFromPeerTaskTest.java
  33. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetBodiesFromPeerTaskTest.java
  34. 6
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByHashTaskTest.java
  35. 15
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetHeadersFromPeerByNumberTaskTest.java
  36. 6
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java
  37. 57
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PersistBlockTaskTest.java
  38. 12
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/PipelinedImportChainSegmentTaskTest.java
  39. 11
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeerTaskTest.java
  40. 13
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/WaitForPeersTaskTest.java
  41. 3
      metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java
  42. 1
      metrics/src/main/java/tech/pegasys/pantheon/metrics/noop/NoOpMetricsSystem.java
  43. 37
      metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystem.java
  44. 11
      metrics/src/main/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusTimer.java
  45. 44
      metrics/src/test/java/tech/pegasys/pantheon/metrics/prometheus/PrometheusMetricsSystemTest.java
  46. 4
      pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java
  47. 4
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java
  48. 4
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java
  49. 4
      pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java

@ -12,6 +12,9 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@ -22,14 +25,21 @@ import java.util.function.Supplier;
public abstract class AbstractEthTask<T> implements EthTask<T> {
protected double taskTimeInSec = -1.0D;
protected OperationTimer taskTimer;
protected final AtomicReference<CompletableFuture<T>> result = new AtomicReference<>();
protected volatile Collection<CompletableFuture<?>> subTaskFutures =
new ConcurrentLinkedDeque<>();
/** @param ethTasksTimer The metrics timer to use to time the duration of the task. */
protected AbstractEthTask(final LabelledMetric<OperationTimer> ethTasksTimer) {
taskTimer = ethTasksTimer.labels(getClass().getSimpleName());
}
@Override
public final CompletableFuture<T> run() {
if (result.compareAndSet(null, new CompletableFuture<>())) {
executeTask();
executeTaskTimed();
result
.get()
.whenComplete(
@ -117,6 +127,20 @@ public abstract class AbstractEthTask<T> implements EthTask<T> {
/** Execute core task logic. */
protected abstract void executeTask();
/** Executes the task while timed by a timer. */
public void executeTaskTimed() {
final OperationTimer.TimingContext timingContext = taskTimer.startTimer();
try {
executeTask();
} finally {
taskTimeInSec = timingContext.stopTimer();
}
}
public double getTaskTimeInSec() {
return taskTimeInSec;
}
/** Cleanup any resources when task completes. */
protected void cleanup() {
for (final CompletableFuture<?> subTaskFuture : subTaskFutures) {

@ -18,6 +18,8 @@ import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.util.Optional;
@ -29,8 +31,11 @@ public abstract class AbstractPeerRequestTask<R> extends AbstractPeerTask<R> {
private final int requestCode;
private volatile ResponseStream responseStream;
protected AbstractPeerRequestTask(final EthContext ethContext, final int requestCode) {
super(ethContext);
protected AbstractPeerRequestTask(
final EthContext ethContext,
final int requestCode,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, ethTasksTimer);
this.requestCode = requestCode;
}

@ -16,6 +16,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResul
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.Optional;
@ -23,7 +25,9 @@ public abstract class AbstractPeerTask<R> extends AbstractEthTask<PeerTaskResult
protected Optional<EthPeer> assignedPeer = Optional.empty();
protected final EthContext ethContext;
protected AbstractPeerTask(final EthContext ethContext) {
protected AbstractPeerTask(
final EthContext ethContext, final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethTasksTimer);
this.ethContext = ethContext;
}

@ -15,6 +15,8 @@ package tech.pegasys.pantheon.ethereum.eth.manager;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.time.Duration;
@ -37,13 +39,20 @@ public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends
private final EthContext ethContext;
private final int maxRetries;
private int retryCount = 0;
private final LabelledMetric<OperationTimer> ethTasksTimer;
/**
* @param ethContext The context of the current Eth network we are attached to.
* @param maxRetries Maximum number of retries to accept before completing exceptionally.
* @param ethTasksTimer The metrics timer to use to time the duration of the task.
*/
public AbstractRetryingPeerTask(final EthContext ethContext, final int maxRetries) {
public AbstractRetryingPeerTask(
final EthContext ethContext,
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethTasksTimer);
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
this.maxRetries = maxRetries;
}
@ -69,7 +78,7 @@ public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends
if (peerResult.size() > 0) {
retryCount = 0;
}
executeTask();
executeTaskTimed();
}
});
}
@ -87,13 +96,13 @@ public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends
if (cause instanceof NoAvailablePeersException) {
LOG.info("No peers available, wait for peer.");
// Wait for new peer to connect
final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext);
final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, ethTasksTimer);
executeSubTask(
() ->
ethContext
.getScheduler()
.timeout(waitTask, Duration.ofSeconds(5))
.whenComplete((r, t) -> executeTask()));
.whenComplete((r, t) -> executeTaskTimed()));
return;
}
@ -104,7 +113,9 @@ public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends
// Wait before retrying on failure
executeSubTask(
() ->
ethContext.getScheduler().scheduleFutureTask(this::executeTask, Duration.ofSeconds(1)));
ethContext
.getScheduler()
.scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1)));
}
protected abstract boolean isRetryableError(Throwable error);

@ -34,8 +34,7 @@ import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.uint.UInt256;
@ -62,12 +61,12 @@ public class BlockPropagationManager<C> {
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Set<Hash> requestedBlocks = new ConcurrentSet<>();
private final PendingBlocks pendingBlocks;
private final OperationTimer announcedBlockIngestTimer;
BlockPropagationManager(
final SynchronizerConfiguration config,
@ -76,20 +75,15 @@ public class BlockPropagationManager<C> {
final EthContext ethContext,
final SyncState syncState,
final PendingBlocks pendingBlocks,
final MetricsSystem metricsSystem) {
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
this.syncState = syncState;
this.pendingBlocks = pendingBlocks;
this.announcedBlockIngestTimer =
metricsSystem.createTimer(
MetricCategory.BLOCKCHAIN,
"pantheon_blockchain_announcedBlock_ingest",
"Time to ingest a single announced block");
}
public void start() {
@ -125,7 +119,11 @@ public class BlockPropagationManager<C> {
if (!readyForImport.isEmpty()) {
final Supplier<CompletableFuture<List<Block>>> importBlocksTask =
PersistBlockTask.forUnorderedBlocks(
protocolSchedule, protocolContext, readyForImport, HeaderValidationMode.FULL);
protocolSchedule,
protocolContext,
readyForImport,
HeaderValidationMode.FULL,
ethTasksTimer);
ethContext
.getScheduler()
.scheduleSyncWorkerTask(importBlocksTask)
@ -225,7 +223,8 @@ public class BlockPropagationManager<C> {
private CompletableFuture<Block> processAnnouncedBlock(
final EthPeer peer, final NewBlockHash newBlock) {
final AbstractPeerTask<Block> getBlockTask =
GetBlockFromPeerTask.create(protocolSchedule, ethContext, newBlock.hash()).assignPeer(peer);
GetBlockFromPeerTask.create(protocolSchedule, ethContext, newBlock.hash(), ethTasksTimer)
.assignPeer(peer);
return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult()));
}
@ -251,8 +250,7 @@ public class BlockPropagationManager<C> {
// Import block
final PersistBlockTask<C> importTask =
PersistBlockTask.create(
protocolSchedule, protocolContext, block, HeaderValidationMode.FULL);
final OperationTimer.TimingContext blockTimer = announcedBlockIngestTimer.startTimer();
protocolSchedule, protocolContext, block, HeaderValidationMode.FULL, ethTasksTimer);
return ethContext
.getScheduler()
.scheduleSyncWorkerTask(importTask::run)
@ -265,7 +263,7 @@ public class BlockPropagationManager<C> {
block.getHeader().getNumber(),
block.getHash());
} else {
final double timeInMs = blockTimer.stopTimer() * 1000;
final double timeInMs = importTask.getTaskTimeInSec() * 1000;
LOG.info(
String.format(
"Successfully imported announced block %d (%s) in %01.3fms.",

@ -23,6 +23,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers.ConnectCallback;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetHeadersFromPeerByHashTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import org.apache.logging.log4j.Logger;
@ -33,21 +35,25 @@ public class ChainHeadTracker implements ConnectCallback {
private final EthContext ethContext;
private final ProtocolSchedule<?> protocolSchedule;
private final TrailingPeerLimiter trailingPeerLimiter;
private final LabelledMetric<OperationTimer> ethTasksTimer;
public ChainHeadTracker(
final EthContext ethContext,
final ProtocolSchedule<?> protocolSchedule,
final TrailingPeerLimiter trailingPeerLimiter) {
final TrailingPeerLimiter trailingPeerLimiter,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.ethContext = ethContext;
this.protocolSchedule = protocolSchedule;
this.trailingPeerLimiter = trailingPeerLimiter;
this.ethTasksTimer = ethTasksTimer;
}
public static void trackChainHeadForPeers(
final EthContext ethContext,
final ProtocolSchedule<?> protocolSchedule,
final Blockchain blockchain,
final SynchronizerConfiguration syncConfiguration) {
final SynchronizerConfiguration syncConfiguration,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final TrailingPeerLimiter trailingPeerLimiter =
new TrailingPeerLimiter(
ethContext.getEthPeers(),
@ -55,7 +61,7 @@ public class ChainHeadTracker implements ConnectCallback {
syncConfiguration.trailingPeerBlocksBehindThreshold(),
syncConfiguration.maxTrailingPeers());
final ChainHeadTracker tracker =
new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter);
new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter, ethTasksTimer);
ethContext.getEthPeers().subscribeConnect(tracker);
blockchain.observeBlockAdded(trailingPeerLimiter);
}
@ -64,7 +70,10 @@ public class ChainHeadTracker implements ConnectCallback {
public void onPeerConnected(final EthPeer peer) {
LOG.debug("Requesting chain head info for {}", peer);
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, Hash.wrap(peer.chainState().getBestBlock().getHash()))
protocolSchedule,
ethContext,
Hash.wrap(peer.chainState().getBestBlock().getHash()),
ethTasksTimer)
.assignPeer(peer)
.run()
.whenComplete(

@ -19,7 +19,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@ -42,7 +43,7 @@ public class DefaultSynchronizer<C> implements Synchronizer {
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem) {
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.syncState = syncState;
this.blockPropagationManager =
new BlockPropagationManager<>(
@ -52,12 +53,13 @@ public class DefaultSynchronizer<C> implements Synchronizer {
ethContext,
syncState,
new PendingBlocks(),
metricsSystem);
ethTasksTimer);
this.downloader =
new Downloader<>(syncConfig, protocolSchedule, protocolContext, ethContext, syncState);
new Downloader<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer);
ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig);
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);
if (syncConfig.syncMode().equals(SyncMode.FAST)) {
LOG.info("Fast sync enabled.");
}

@ -33,6 +33,8 @@ import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeersTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.uint.UInt256;
@ -60,6 +62,7 @@ public class Downloader<C> {
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final Deque<BlockHeader> checkpointHeaders = new ConcurrentLinkedDeque<>();
private int checkpointTimeouts = 0;
@ -75,7 +78,9 @@ public class Downloader<C> {
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState) {
final SyncState syncState,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.ethTasksTimer = ethTasksTimer;
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
@ -124,13 +129,13 @@ public class Downloader<C> {
}
private CompletableFuture<?> waitForPeers() {
return WaitForPeersTask.create(ethContext, 1).run();
return WaitForPeersTask.create(ethContext, 1, ethTasksTimer).run();
}
private CompletableFuture<?> waitForNewPeer() {
return ethContext
.getScheduler()
.timeout(WaitForPeerTask.create(ethContext), Duration.ofSeconds(5));
.timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5));
}
private CompletableFuture<SyncTarget> findSyncTarget() {
@ -159,7 +164,8 @@ public class Downloader<C> {
protocolContext,
ethContext,
bestPeer,
config.downloaderHeaderRequestSize())
config.downloaderHeaderRequestSize(),
ethTasksTimer)
.run()
.handle((r, t) -> r)
.thenCompose(
@ -325,7 +331,8 @@ public class Downloader<C> {
lastHeader.getHash(),
lastHeader.getNumber(),
config.downloaderHeaderRequestSize() + 1,
config.downloaderChainSegmentSize() - 1)
config.downloaderChainSegmentSize() - 1,
ethTasksTimer)
.assignPeer(syncTarget.peer());
}
@ -344,7 +351,8 @@ public class Downloader<C> {
protocolContext,
ethContext,
checkpointHeaders.getFirst(),
config.downloaderChainSegmentSize());
config.downloaderChainSegmentSize(),
ethTasksTimer);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final PipelinedImportChainSegmentTask<C> importTask =
@ -353,6 +361,7 @@ public class Downloader<C> {
protocolContext,
ethContext,
config.downloaderParallelism(),
ethTasksTimer,
Lists.newArrayList(checkpointHeaders));
importedBlocks = importTask.run();
}

@ -22,6 +22,8 @@ import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.ArrayList;
import java.util.Collections;
@ -50,8 +52,9 @@ public abstract class AbstractGetHeadersFromPeerTask
final long minimumRequiredBlockNumber,
final int count,
final int skip,
final boolean reverse) {
super(ethContext, EthPV62.GET_BLOCK_HEADERS);
final boolean reverse,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, EthPV62.GET_BLOCK_HEADERS, ethTasksTimer);
checkArgument(count > 0);
this.protocolSchedule = protocolSchedule;
this.count = count;

@ -24,6 +24,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersExc
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerBreachedProtocolException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.HashMap;
import java.util.List;
@ -48,6 +50,7 @@ public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>>
private final EthContext ethContext;
private final ProtocolSchedule<C> protocolSchedule;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final List<BlockHeader> headers;
private final Map<Long, Block> blocks;
@ -57,11 +60,13 @@ public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>>
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final List<BlockHeader> headers,
final int maxRetries) {
super(ethContext, maxRetries);
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, maxRetries, ethTasksTimer);
checkArgument(headers.size() > 0, "Must supply a non-empty headers list");
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
this.headers = headers;
this.blocks = new HashMap<>();
@ -71,15 +76,19 @@ public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>>
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final List<BlockHeader> headers,
final int maxRetries) {
return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers, maxRetries);
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new CompleteBlocksTask<>(
protocolSchedule, ethContext, headers, maxRetries, ethTasksTimer);
}
public static <C> CompleteBlocksTask<C> forHeaders(
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final List<BlockHeader> headers) {
return new CompleteBlocksTask<>(protocolSchedule, ethContext, headers, DEFAULT_RETRIES);
final List<BlockHeader> headers,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new CompleteBlocksTask<>(
protocolSchedule, ethContext, headers, DEFAULT_RETRIES, ethTasksTimer);
}
@Override
@ -111,7 +120,8 @@ public class CompleteBlocksTask<C> extends AbstractRetryingPeerTask<List<Block>>
return executeSubTask(
() -> {
final GetBodiesFromPeerTask<C> task =
GetBodiesFromPeerTask.forHeaders(protocolSchedule, ethContext, incompleteHeaders);
GetBodiesFromPeerTask.forHeaders(
protocolSchedule, ethContext, incompleteHeaders, ethTasksTimer);
assignedPeer.ifPresent(task::assignPeer);
return task.run();
});

@ -20,6 +20,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.util.BlockchainUtil;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.List;
import java.util.OptionalInt;
@ -34,6 +36,7 @@ public class DetermineCommonAncestorTask<C> extends AbstractEthTask<BlockHeader>
private final EthContext ethContext;
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final EthPeer peer;
private final int headerRequestSize;
@ -47,10 +50,13 @@ public class DetermineCommonAncestorTask<C> extends AbstractEthTask<BlockHeader>
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final EthPeer peer,
final int headerRequestSize) {
final int headerRequestSize,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethTasksTimer);
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.protocolContext = protocolContext;
this.ethTasksTimer = ethTasksTimer;
this.peer = peer;
this.headerRequestSize = headerRequestSize;
@ -65,9 +71,10 @@ public class DetermineCommonAncestorTask<C> extends AbstractEthTask<BlockHeader>
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final EthPeer peer,
final int headerRequestSize) {
final int headerRequestSize,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new DetermineCommonAncestorTask<>(
protocolSchedule, protocolContext, ethContext, peer, headerRequestSize);
protocolSchedule, protocolContext, ethContext, peer, headerRequestSize, ethTasksTimer);
}
@Override
@ -89,7 +96,7 @@ public class DetermineCommonAncestorTask<C> extends AbstractEthTask<BlockHeader>
if (error != null) {
result.get().completeExceptionally(error);
} else if (!result.get().isDone()) {
executeTask();
executeTaskTimed();
}
});
}
@ -112,7 +119,8 @@ public class DetermineCommonAncestorTask<C> extends AbstractEthTask<BlockHeader>
ethContext,
maximumPossibleCommonAncestorNumber,
count,
skipInterval)
skipInterval,
ethTasksTimer)
.assignPeer(peer)
.run());
}

@ -29,6 +29,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.Arrays;
import java.util.List;
@ -52,6 +54,7 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
private final EthContext ethContext;
private final ProtocolContext<C> protocolContext;
private final ProtocolSchedule<C> protocolSchedule;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final BlockHeader[] headers;
private final BlockHeader referenceHeader;
@ -66,13 +69,15 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
final EthContext ethContext,
final BlockHeader referenceHeader,
final int segmentLength,
final int maxRetries) {
super(ethContext, maxRetries);
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, maxRetries, ethTasksTimer);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.referenceHeader = referenceHeader;
this.segmentLength = segmentLength;
this.ethTasksTimer = ethTasksTimer;
startingBlockNumber = referenceHeader.getNumber() - segmentLength;
headers = new BlockHeader[segmentLength];
@ -85,9 +90,16 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
final EthContext ethContext,
final BlockHeader referenceHeader,
final int segmentLength,
final int maxRetries) {
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new DownloadHeaderSequenceTask<>(
protocolSchedule, protocolContext, ethContext, referenceHeader, segmentLength, maxRetries);
protocolSchedule,
protocolContext,
ethContext,
referenceHeader,
segmentLength,
maxRetries,
ethTasksTimer);
}
public static <C> DownloadHeaderSequenceTask<C> endingAtHeader(
@ -95,14 +107,16 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final BlockHeader referenceHeader,
final int segmentLength) {
final int segmentLength,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new DownloadHeaderSequenceTask<>(
protocolSchedule,
protocolContext,
ethContext,
referenceHeader,
segmentLength,
DEFAULT_RETRIES);
DEFAULT_RETRIES,
ethTasksTimer);
}
@Override
@ -149,7 +163,8 @@ public class DownloadHeaderSequenceTask<C> extends AbstractRetryingPeerTask<List
ethContext,
referenceHash,
referenceHeaderForNextRequest.getNumber(),
count + 1);
count + 1,
ethTasksTimer);
return headersTask.run();
});
}

@ -21,6 +21,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.IncompleteResultsException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -33,18 +35,26 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
private static final Logger LOG = LogManager.getLogger();
private final ProtocolSchedule<?> protocolSchedule;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final Hash hash;
protected GetBlockFromPeerTask(
final ProtocolSchedule<?> protocolSchedule, final EthContext ethContext, final Hash hash) {
super(ethContext);
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final Hash hash,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, ethTasksTimer);
this.protocolSchedule = protocolSchedule;
this.ethTasksTimer = ethTasksTimer;
this.hash = hash;
}
public static GetBlockFromPeerTask create(
final ProtocolSchedule<?> protocolSchedule, final EthContext ethContext, final Hash hash) {
return new GetBlockFromPeerTask(protocolSchedule, ethContext, hash);
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final Hash hash,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetBlockFromPeerTask(protocolSchedule, ethContext, hash, ethTasksTimer);
}
@Override
@ -70,7 +80,8 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
private CompletableFuture<PeerTaskResult<List<BlockHeader>>> downloadHeader(final EthPeer peer) {
return executeSubTask(
() ->
GetHeadersFromPeerByHashTask.forSingleHash(protocolSchedule, ethContext, hash)
GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule, ethContext, hash, ethTasksTimer)
.assignPeer(peer)
.run());
}
@ -87,7 +98,7 @@ public class GetBlockFromPeerTask extends AbstractPeerTask<Block> {
() -> {
final GetBodiesFromPeerTask<?> task =
GetBodiesFromPeerTask.forHeaders(
protocolSchedule, ethContext, headerResult.getResult());
protocolSchedule, ethContext, headerResult.getResult(), ethTasksTimer);
task.assignPeer(headerResult.getPeer());
return task.run();
});

@ -29,6 +29,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.BodyValidation;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import java.util.ArrayList;
@ -59,8 +61,9 @@ public class GetBodiesFromPeerTask<C> extends AbstractPeerRequestTask<List<Block
private GetBodiesFromPeerTask(
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final List<BlockHeader> headers) {
super(ethContext, EthPV62.GET_BLOCK_BODIES);
final List<BlockHeader> headers,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, EthPV62.GET_BLOCK_BODIES, ethTasksTimer);
checkArgument(headers.size() > 0);
this.protocolSchedule = protocolSchedule;
@ -76,8 +79,9 @@ public class GetBodiesFromPeerTask<C> extends AbstractPeerRequestTask<List<Block
public static <C> GetBodiesFromPeerTask<C> forHeaders(
final ProtocolSchedule<C> protocolSchedule,
final EthContext ethContext,
final List<BlockHeader> headers) {
return new GetBodiesFromPeerTask<>(protocolSchedule, ethContext, headers);
final List<BlockHeader> headers,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetBodiesFromPeerTask<>(protocolSchedule, ethContext, headers, ethTasksTimer);
}
@Override

@ -21,6 +21,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
@ -40,8 +42,16 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask
final long minimumRequiredBlockNumber,
final int count,
final int skip,
final boolean reverse) {
super(protocolSchedule, ethContext, minimumRequiredBlockNumber, count, skip, reverse);
final boolean reverse,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(
protocolSchedule,
ethContext,
minimumRequiredBlockNumber,
count,
skip,
reverse,
ethTasksTimer);
checkNotNull(referenceHash);
this.referenceHash = referenceHash;
}
@ -51,9 +61,17 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask
final EthContext ethContext,
final Hash firstHash,
final long firstBlockNumber,
final int segmentLength) {
final int segmentLength,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetHeadersFromPeerByHashTask(
protocolSchedule, ethContext, firstHash, firstBlockNumber, segmentLength, 0, false);
protocolSchedule,
ethContext,
firstHash,
firstBlockNumber,
segmentLength,
0,
false,
ethTasksTimer);
}
public static AbstractGetHeadersFromPeerTask startingAtHash(
@ -62,9 +80,17 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask
final Hash firstHash,
final long firstBlockNumber,
final int segmentLength,
final int skip) {
final int skip,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetHeadersFromPeerByHashTask(
protocolSchedule, ethContext, firstHash, firstBlockNumber, segmentLength, skip, false);
protocolSchedule,
ethContext,
firstHash,
firstBlockNumber,
segmentLength,
skip,
false,
ethTasksTimer);
}
public static AbstractGetHeadersFromPeerTask endingAtHash(
@ -72,14 +98,26 @@ public class GetHeadersFromPeerByHashTask extends AbstractGetHeadersFromPeerTask
final EthContext ethContext,
final Hash lastHash,
final long lastBlockNumber,
final int segmentLength) {
final int segmentLength,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetHeadersFromPeerByHashTask(
protocolSchedule, ethContext, lastHash, lastBlockNumber, segmentLength, 0, true);
protocolSchedule,
ethContext,
lastHash,
lastBlockNumber,
segmentLength,
0,
true,
ethTasksTimer);
}
public static AbstractGetHeadersFromPeerTask forSingleHash(
final ProtocolSchedule<?> protocolSchedule, final EthContext ethContext, final Hash hash) {
return new GetHeadersFromPeerByHashTask(protocolSchedule, ethContext, hash, 0, 1, 0, false);
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final Hash hash,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetHeadersFromPeerByHashTask(
protocolSchedule, ethContext, hash, 0, 1, 0, false, ethTasksTimer);
}
@Override

@ -18,6 +18,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
@ -36,8 +38,9 @@ public class GetHeadersFromPeerByNumberTask extends AbstractGetHeadersFromPeerTa
final long blockNumber,
final int count,
final int skip,
final boolean reverse) {
super(protocolSchedule, ethContext, blockNumber, count, skip, reverse);
final boolean reverse,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(protocolSchedule, ethContext, blockNumber, count, skip, reverse, ethTasksTimer);
this.blockNumber = blockNumber;
}
@ -45,18 +48,20 @@ public class GetHeadersFromPeerByNumberTask extends AbstractGetHeadersFromPeerTa
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final long firstBlockNumber,
final int segmentLength) {
final int segmentLength,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetHeadersFromPeerByNumberTask(
protocolSchedule, ethContext, firstBlockNumber, segmentLength, 0, false);
protocolSchedule, ethContext, firstBlockNumber, segmentLength, 0, false, ethTasksTimer);
}
public static AbstractGetHeadersFromPeerTask endingAtNumber(
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final long lastlockNumber,
final int segmentLength) {
final int segmentLength,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetHeadersFromPeerByNumberTask(
protocolSchedule, ethContext, lastlockNumber, segmentLength, 0, true);
protocolSchedule, ethContext, lastlockNumber, segmentLength, 0, true, ethTasksTimer);
}
public static AbstractGetHeadersFromPeerTask endingAtNumber(
@ -64,17 +69,19 @@ public class GetHeadersFromPeerByNumberTask extends AbstractGetHeadersFromPeerTa
final EthContext ethContext,
final long lastlockNumber,
final int segmentLength,
final int skip) {
final int skip,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetHeadersFromPeerByNumberTask(
protocolSchedule, ethContext, lastlockNumber, segmentLength, skip, true);
protocolSchedule, ethContext, lastlockNumber, segmentLength, skip, true, ethTasksTimer);
}
public static AbstractGetHeadersFromPeerTask forSingleNumber(
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final long blockNumber) {
final long blockNumber,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetHeadersFromPeerByNumberTask(
protocolSchedule, ethContext, blockNumber, 1, 0, false);
protocolSchedule, ethContext, blockNumber, 1, 0, false, ethTasksTimer);
}
@Override

@ -21,6 +21,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.Collections;
import java.util.List;
@ -40,6 +42,7 @@ public class ImportBlocksTask<C> extends AbstractPeerTask<List<Block>> {
private final ProtocolContext<C> protocolContext;
private final ProtocolSchedule<C> protocolSchedule;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final long startNumber;
private final BlockHeader referenceHeader;
@ -51,12 +54,14 @@ public class ImportBlocksTask<C> extends AbstractPeerTask<List<Block>> {
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final BlockHeader referenceHeader,
final int maxBlocks) {
super(ethContext);
final int maxBlocks,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, ethTasksTimer);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.referenceHeader = referenceHeader;
this.maxBlocks = maxBlocks;
this.ethTasksTimer = ethTasksTimer;
this.startNumber = referenceHeader.getNumber();
}
@ -66,9 +71,10 @@ public class ImportBlocksTask<C> extends AbstractPeerTask<List<Block>> {
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final BlockHeader previousHeader,
final int maxBlocks) {
final int maxBlocks,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new ImportBlocksTask<>(
protocolSchedule, protocolContext, ethContext, previousHeader, maxBlocks);
protocolSchedule, protocolContext, ethContext, previousHeader, maxBlocks, ethTasksTimer);
}
@Override
@ -97,7 +103,8 @@ public class ImportBlocksTask<C> extends AbstractPeerTask<List<Block>> {
ethContext,
referenceHeader.getHash(),
referenceHeader.getNumber(),
maxBlocks)
maxBlocks,
ethTasksTimer)
.assignPeer(peer);
return executeSubTask(task::run);
}
@ -108,7 +115,8 @@ public class ImportBlocksTask<C> extends AbstractPeerTask<List<Block>> {
return CompletableFuture.completedFuture(Collections.emptyList());
}
final CompleteBlocksTask<C> task =
CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers.getResult())
CompleteBlocksTask.forHeaders(
protocolSchedule, ethContext, headers.getResult(), ethTasksTimer)
.assignPeer(peer);
return executeSubTask(() -> ethContext.getScheduler().timeout(task));
}
@ -123,7 +131,7 @@ public class ImportBlocksTask<C> extends AbstractPeerTask<List<Block>> {
}
final Supplier<CompletableFuture<List<Block>>> task =
PersistBlockTask.forSequentialBlocks(
protocolSchedule, protocolContext, blocks, HeaderValidationMode.FULL);
protocolSchedule, protocolContext, blocks, HeaderValidationMode.FULL, ethTasksTimer);
return executeWorkerSubTask(ethContext.getScheduler(), task);
}
}

@ -22,6 +22,8 @@ import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockExce
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import java.util.ArrayList;
import java.util.List;
@ -39,7 +41,9 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> {
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final Block block,
final HeaderValidationMode headerValidationMode) {
final HeaderValidationMode headerValidationMode,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethTasksTimer);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.block = block;
@ -50,15 +54,18 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> {
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final Block block,
final HeaderValidationMode headerValidationMode) {
return new PersistBlockTask<>(protocolSchedule, protocolContext, block, headerValidationMode);
final HeaderValidationMode headerValidationMode,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new PersistBlockTask<>(
protocolSchedule, protocolContext, block, headerValidationMode, ethTasksTimer);
}
public static <C> Supplier<CompletableFuture<List<Block>>> forSequentialBlocks(
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final List<Block> blocks,
final HeaderValidationMode headerValidationMode) {
final HeaderValidationMode headerValidationMode,
final LabelledMetric<OperationTimer> ethTasksTimer) {
checkArgument(blocks.size() > 0);
return () -> {
final List<Block> successfulImports = new ArrayList<>();
@ -71,7 +78,8 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> {
protocolContext,
block,
successfulImports,
headerValidationMode);
headerValidationMode,
ethTasksTimer);
continue;
}
future =
@ -82,7 +90,8 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> {
protocolContext,
block,
successfulImports,
headerValidationMode));
headerValidationMode,
ethTasksTimer));
}
return future.thenApply(r -> successfulImports);
};
@ -93,8 +102,10 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> {
final ProtocolContext<C> protocolContext,
final Block block,
final List<Block> list,
final HeaderValidationMode headerValidationMode) {
return PersistBlockTask.create(protocolSchedule, protocolContext, block, headerValidationMode)
final HeaderValidationMode headerValidationMode,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return PersistBlockTask.create(
protocolSchedule, protocolContext, block, headerValidationMode, ethTasksTimer)
.run()
.whenComplete(
(r, t) -> {
@ -108,7 +119,8 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> {
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final List<Block> blocks,
final HeaderValidationMode headerValidationMode) {
final HeaderValidationMode headerValidationMode,
final LabelledMetric<OperationTimer> ethTasksTimer) {
checkArgument(blocks.size() > 0);
return () -> {
final CompletableFuture<List<Block>> finalResult = new CompletableFuture<>();
@ -118,7 +130,7 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> {
if (future == null) {
future =
PersistBlockTask.create(
protocolSchedule, protocolContext, block, headerValidationMode)
protocolSchedule, protocolContext, block, headerValidationMode, ethTasksTimer)
.run();
continue;
}
@ -131,7 +143,11 @@ public class PersistBlockTask<C> extends AbstractEthTask<Block> {
successfulImports.add(r);
}
return PersistBlockTask.create(
protocolSchedule, protocolContext, block, headerValidationMode)
protocolSchedule,
protocolContext,
block,
headerValidationMode,
ethTasksTimer)
.run();
});
}

@ -23,6 +23,8 @@ import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;
import java.util.ArrayList;
@ -45,6 +47,7 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
private final ProtocolContext<C> protocolContext;
private final ProtocolSchedule<C> protocolSchedule;
private final List<Block> importedBlocks = new ArrayList<>();
private final LabelledMetric<OperationTimer> ethTasksTimer;
// First header is assumed to already be imported
private final List<BlockHeader> checkpointHeaders;
@ -67,10 +70,13 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final int maxActiveChunks,
final List<BlockHeader> checkpointHeaders) {
final List<BlockHeader> checkpointHeaders,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethTasksTimer);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
this.checkpointHeaders = checkpointHeaders;
this.chunksInTotal = checkpointHeaders.size() - 1;
this.chunksIssued = 0;
@ -83,12 +89,14 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final int maxActiveChunks,
final LabelledMetric<OperationTimer> ethTasksTimer,
final BlockHeader... checkpointHeaders) {
return forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
maxActiveChunks,
ethTasksTimer,
Arrays.asList(checkpointHeaders));
}
@ -97,9 +105,15 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final int maxActiveChunks,
final LabelledMetric<OperationTimer> ethTasksTimer,
final List<BlockHeader> checkpointHeaders) {
return new PipelinedImportChainSegmentTask<>(
protocolSchedule, protocolContext, ethContext, maxActiveChunks, checkpointHeaders);
protocolSchedule,
protocolContext,
ethContext,
maxActiveChunks,
checkpointHeaders,
ethTasksTimer);
}
@Override
@ -195,7 +209,12 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
}
final DownloadHeaderSequenceTask<C> task =
DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule, protocolContext, ethContext, lastChunkHeader, segmentLength);
protocolSchedule,
protocolContext,
ethContext,
lastChunkHeader,
segmentLength,
ethTasksTimer);
return executeSubTask(task::run)
.thenApply(
headers -> {
@ -239,7 +258,7 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
final CompleteBlocksTask<C> task =
CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers);
CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, headers, ethTasksTimer);
return executeSubTask(task::run);
}
@ -250,7 +269,11 @@ public class PipelinedImportChainSegmentTask<C> extends AbstractEthTask<List<Blo
blocks.get(blocks.size() - 1).getHeader().getNumber());
final Supplier<CompletableFuture<List<Block>>> task =
PersistBlockTask.forSequentialBlocks(
protocolSchedule, protocolContext, blocks, HeaderValidationMode.SKIP_DETACHED);
protocolSchedule,
protocolContext,
blocks,
HeaderValidationMode.SKIP_DETACHED,
ethTasksTimer);
return executeWorkerSubTask(ethContext.getScheduler(), task);
}

@ -15,6 +15,8 @@ package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractEthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -26,12 +28,15 @@ public class WaitForPeerTask extends AbstractEthTask<Void> {
private final EthContext ethContext;
private volatile Long peerListenerId;
private WaitForPeerTask(final EthContext ethContext) {
private WaitForPeerTask(
final EthContext ethContext, final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethTasksTimer);
this.ethContext = ethContext;
}
public static WaitForPeerTask create(final EthContext ethContext) {
return new WaitForPeerTask(ethContext);
public static WaitForPeerTask create(
final EthContext ethContext, final LabelledMetric<OperationTimer> ethTasksTimer) {
return new WaitForPeerTask(ethContext, ethTasksTimer);
}
@Override

@ -15,6 +15,8 @@ package tech.pegasys.pantheon.ethereum.eth.sync.tasks;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractEthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -27,13 +29,20 @@ public class WaitForPeersTask extends AbstractEthTask<Void> {
private final EthContext ethContext;
private volatile Long peerListenerId;
private WaitForPeersTask(final EthContext ethContext, final int targetPeerCount) {
private WaitForPeersTask(
final EthContext ethContext,
final int targetPeerCount,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethTasksTimer);
this.targetPeerCount = targetPeerCount;
this.ethContext = ethContext;
}
public static WaitForPeersTask create(final EthContext ethContext, final int targetPeerCount) {
return new WaitForPeersTask(ethContext, targetPeerCount);
public static WaitForPeersTask create(
final EthContext ethContext,
final int targetPeerCount,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new WaitForPeersTask(ethContext, targetPeerCount, ethTasksTimer);
}
@Override

@ -14,6 +14,8 @@ package tech.pegasys.pantheon.ethereum.eth.manager;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@ -81,6 +83,7 @@ public class AbstractEthTaskTest {
private final List<CompletableFuture<?>> subtasks;
private EthTaskWithMultipleSubtasks(final List<CompletableFuture<?>> subtasks) {
super(NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
this.subtasks = subtasks;
}

@ -12,10 +12,16 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
public class MockEthTask extends AbstractEthTask<Object> {
private boolean executed = false;
protected MockEthTask() {
super(NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
}
@Override
protected void executeTask() {
executed = true;

@ -24,6 +24,9 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -43,6 +46,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
protected static Blockchain blockchain;
protected static ProtocolSchedule<Void> protocolSchedule;
protected static ProtocolContext<Void> protocolContext;
protected static LabelledMetric<OperationTimer> ethTasksTimer;
protected EthProtocolManager ethProtocolManager;
protected EthContext ethContext;
protected AtomicBoolean peersDoTimeout;
@ -55,6 +59,7 @@ public abstract class AbstractMessageTaskTest<T, R> {
blockchain = blockchainSetupUtil.getBlockchain();
protocolSchedule = blockchainSetupUtil.getProtocolSchedule();
protocolContext = blockchainSetupUtil.getProtocolContext();
ethTasksTimer = NoOpMetricsSystem.NO_OP_LABELLED_TIMER;
assert (blockchainSetupUtil.getMaxBlockNumber() >= 20L);
}

@ -35,7 +35,8 @@ import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator.BlockOptions;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.uint.UInt256;
@ -58,7 +59,8 @@ public class BlockPropagationManagerTest {
private SynchronizerConfiguration syncConfig;
private final PendingBlocks pendingBlocks = new PendingBlocks();
private SyncState syncState;
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private final LabelledMetric<OperationTimer> ethTasksTimer =
NoOpMetricsSystem.NO_OP_LABELLED_TIMER;
@BeforeClass
public static void setupSuite() {
@ -91,7 +93,7 @@ public class BlockPropagationManagerTest {
ethProtocolManager.ethContext(),
syncState,
pendingBlocks,
metricsSystem);
ethTasksTimer);
}
@Test
@ -467,7 +469,7 @@ public class BlockPropagationManagerTest {
ethProtocolManager.ethContext(),
syncState,
pendingBlocks,
metricsSystem);
ethTasksTimer);
final BlockDataGenerator gen = new BlockDataGenerator();
// Import some blocks

@ -26,6 +26,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.uint.UInt256;
import org.junit.Test;
@ -46,7 +47,11 @@ public class ChainHeadTrackerTest {
DevelopmentProtocolSchedule.create(GenesisConfigFile.DEFAULT.getConfigOptions());
private final TrailingPeerLimiter trailingPeerLimiter = mock(TrailingPeerLimiter.class);
private final ChainHeadTracker chainHeadTracker =
new ChainHeadTracker(ethProtocolManager.ethContext(), protocolSchedule, trailingPeerLimiter);
new ChainHeadTracker(
ethProtocolManager.ethContext(),
protocolSchedule,
trailingPeerLimiter,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
@Test
public void shouldRequestHeaderChainHeadWhenNewPeerConnects() {

@ -40,6 +40,9 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.ArrayList;
@ -65,6 +68,7 @@ public class DownloaderTest {
protected MutableBlockchain localBlockchain;
private BlockchainSetupUtil<Void> otherBlockchainSetup;
protected Blockchain otherBlockchain;
private LabelledMetric<OperationTimer> ethTashsTimer;
@Before
public void setupTest() {
@ -79,10 +83,13 @@ public class DownloaderTest {
ethProtocolManager = EthProtocolManagerTestUtil.create(localBlockchain);
ethContext = ethProtocolManager.ethContext();
syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers());
ethTashsTimer = NoOpMetricsSystem.NO_OP_LABELLED_TIMER;
}
private Downloader<?> downloader(final SynchronizerConfiguration syncConfig) {
return new Downloader<>(syncConfig, protocolSchedule, protocolContext, ethContext, syncState);
return new Downloader<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTashsTimer);
}
private Downloader<?> downloader() {

@ -17,6 +17,7 @@ import tech.pegasys.pantheon.ethereum.core.BlockBody;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.ArrayList;
import java.util.List;
@ -41,6 +42,10 @@ public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>>
final List<BlockHeader> headersToComplete =
requestedData.stream().map(Block::getHeader).collect(Collectors.toList());
return CompleteBlocksTask.forHeaders(
protocolSchedule, ethContext, headersToComplete, maxRetries);
protocolSchedule,
ethContext,
headersToComplete,
maxRetries,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
}
}

@ -31,6 +31,9 @@ import tech.pegasys.pantheon.ethereum.mainnet.MainnetBlockHashFunction;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.io.IOException;
@ -52,6 +55,8 @@ import org.junit.runners.Parameterized.Parameters;
public class DetermineCommonAncestorTaskParameterizedTest {
private final ProtocolSchedule<Void> protocolSchedule = MainnetProtocolSchedule.create();
private static final BlockDataGenerator blockDataGenerator = new BlockDataGenerator();
private final LabelledMetric<OperationTimer> ethTasksTimer =
NoOpMetricsSystem.NO_OP_LABELLED_TIMER;
private static Block genesisBlock;
private static MutableBlockchain localBlockchain;
@ -154,7 +159,8 @@ public class DetermineCommonAncestorTaskParameterizedTest {
protocolContext,
ethContext,
respondingEthPeer.getEthPeer(),
headerRequestSize);
headerRequestSize,
ethTasksTimer);
final CompletableFuture<BlockHeader> future = task.run();
respondingEthPeer.respondWhile(responder, () -> !future.isDone());

@ -41,6 +41,9 @@ import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.uint.UInt256;
@ -56,6 +59,8 @@ public class DetermineCommonAncestorTaskTest {
private final ProtocolSchedule<Void> protocolSchedule = MainnetProtocolSchedule.create();
private final BlockDataGenerator blockDataGenerator = new BlockDataGenerator();
private final LabelledMetric<OperationTimer> ethTasksTimer =
NoOpMetricsSystem.NO_OP_LABELLED_TIMER;
private MutableBlockchain localBlockchain;
private final int defaultHeaderRequestSize = 10;
Block genesisBlock;
@ -110,7 +115,8 @@ public class DetermineCommonAncestorTaskTest {
protocolContext,
ethContext,
respondingEthPeer.getEthPeer(),
defaultHeaderRequestSize);
defaultHeaderRequestSize,
ethTasksTimer);
final CompletableFuture<BlockHeader> future = task.run();
respondingEthPeer.respondWhile(responder, () -> !future.isDone());
@ -146,7 +152,8 @@ public class DetermineCommonAncestorTaskTest {
protocolContext,
ethContext,
respondingEthPeer.getEthPeer(),
defaultHeaderRequestSize);
defaultHeaderRequestSize,
ethTasksTimer);
// Execute task and wait for response
final AtomicReference<Throwable> failure = new AtomicReference<>();
@ -216,7 +223,8 @@ public class DetermineCommonAncestorTaskTest {
protocolContext,
ethContext,
respondingEthPeer.getEthPeer(),
defaultHeaderRequestSize);
defaultHeaderRequestSize,
ethTasksTimer);
final DetermineCommonAncestorTask<Void> spy = spy(task);
// Execute task
@ -272,7 +280,8 @@ public class DetermineCommonAncestorTaskTest {
protocolContext,
ethContext,
respondingEthPeer.getEthPeer(),
defaultHeaderRequestSize);
defaultHeaderRequestSize,
ethTasksTimer);
final DetermineCommonAncestorTask<Void> spy = spy(task);
// Execute task
@ -343,7 +352,8 @@ public class DetermineCommonAncestorTaskTest {
protocolContext,
ethContext,
respondingEthPeer.getEthPeer(),
defaultHeaderRequestSize);
defaultHeaderRequestSize,
ethTasksTimer);
final DetermineCommonAncestorTask<Void> spy = spy(task);
// Execute task
@ -370,7 +380,12 @@ public class DetermineCommonAncestorTaskTest {
final EthTask<BlockHeader> task =
DetermineCommonAncestorTask.create(
protocolSchedule, protocolContext, ethContext, peer, defaultHeaderRequestSize);
protocolSchedule,
protocolContext,
ethContext,
peer,
defaultHeaderRequestSize,
ethTasksTimer);
final CompletableFuture<BlockHeader> result = task.run();
assertThat(result).isCompletedWithValue(genesisBlock.getHeader());

@ -59,7 +59,8 @@ public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest<List
ethContext,
referenceHeader,
requestedData.size(),
maxRetries);
maxRetries,
ethTasksTimer);
}
@Test
@ -68,10 +69,16 @@ public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest<List
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Execute task and wait for response
BlockHeader referenceHeader = blockchain.getChainHeadHeader();
final BlockHeader referenceHeader = blockchain.getChainHeadHeader();
final EthTask<List<BlockHeader>> task =
DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule, protocolContext, ethContext, referenceHeader, 10, maxRetries);
protocolSchedule,
protocolContext,
ethContext,
referenceHeader,
10,
maxRetries,
ethTasksTimer);
final CompletableFuture<List<BlockHeader>> future = task.run();
// Respond with only the reference header
@ -91,23 +98,30 @@ public class DownloadHeaderSequenceTaskTest extends RetryingMessageTaskTest<List
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
// Execute task and wait for response
BlockHeader referenceHeader = blockchain.getChainHeadHeader();
final BlockHeader referenceHeader = blockchain.getChainHeadHeader();
final EthTask<List<BlockHeader>> task =
DownloadHeaderSequenceTask.endingAtHeader(
protocolSchedule, protocolContext, ethContext, referenceHeader, 10, maxRetries);
protocolSchedule,
protocolContext,
ethContext,
referenceHeader,
10,
maxRetries,
ethTasksTimer);
final CompletableFuture<List<BlockHeader>> future = task.run();
// Filter response to include only reference header and previous header
final Responder fullResponder = RespondingEthPeer.blockchainResponder(blockchain);
final Responder responder =
(cap, message) -> {
Optional<MessageData> fullResponse = fullResponder.respond(cap, message);
final Optional<MessageData> fullResponse = fullResponder.respond(cap, message);
if (!fullResponse.isPresent() || message.getCode() != EthPV62.GET_BLOCK_HEADERS) {
return fullResponse;
}
BlockHeadersMessage headersMessage = BlockHeadersMessage.readFrom(fullResponse.get());
final BlockHeadersMessage headersMessage =
BlockHeadersMessage.readFrom(fullResponse.get());
// Filter for a subset of headers
List<BlockHeader> headerSubset =
final List<BlockHeader> headerSubset =
Streams.stream(headersMessage.getHeaders(protocolSchedule))
.filter(h -> h.getNumber() >= referenceHeader.getNumber() - 1L)
.collect(Collectors.toList());

@ -47,7 +47,8 @@ public class GetBlockFromPeerTaskTest
@Override
protected EthTask<PeerTaskResult<Block>> createTask(final Block requestedData) {
return GetBlockFromPeerTask.create(protocolSchedule, ethContext, requestedData.getHash());
return GetBlockFromPeerTask.create(
protocolSchedule, ethContext, requestedData.getHash(), ethTasksTimer);
}
@Override

@ -20,6 +20,7 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.ArrayList;
import java.util.List;
@ -42,7 +43,8 @@ public class GetBodiesFromPeerTaskTest extends PeerMessageTaskTest<List<Block>>
protected EthTask<PeerTaskResult<List<Block>>> createTask(final List<Block> requestedData) {
final List<BlockHeader> headersToComplete =
requestedData.stream().map(Block::getHeader).collect(Collectors.toList());
return GetBodiesFromPeerTask.forHeaders(protocolSchedule, ethContext, headersToComplete);
return GetBodiesFromPeerTask.forHeaders(
protocolSchedule, ethContext, headersToComplete, NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
}
@Override

@ -61,7 +61,8 @@ public class GetHeadersFromPeerByHashTaskTest extends PeerMessageTaskTest<List<B
ethContext,
firstHeader.getHash(),
firstHeader.getNumber(),
requestedData.size());
requestedData.size(),
ethTasksTimer);
}
@Test
@ -108,7 +109,8 @@ public class GetHeadersFromPeerByHashTaskTest extends PeerMessageTaskTest<List<B
startNumber,
count,
skip,
reverse);
reverse,
ethTasksTimer);
final AtomicReference<PeerTaskResult<List<BlockHeader>>> actualResult = new AtomicReference<>();
final AtomicBoolean done = new AtomicBoolean(false);
final CompletableFuture<PeerTaskResult<List<BlockHeader>>> future = task.run();

@ -20,6 +20,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.ArrayList;
import java.util.List;
@ -56,7 +57,11 @@ public class GetHeadersFromPeerByNumberTaskTest extends PeerMessageTaskTest<List
final List<BlockHeader> requestedData) {
final BlockHeader firstHeader = requestedData.get(0);
return GetHeadersFromPeerByNumberTask.startingAtNumber(
protocolSchedule, ethContext, firstHeader.getNumber(), requestedData.size());
protocolSchedule,
ethContext,
firstHeader.getNumber(),
requestedData.size(),
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
}
@Test
@ -97,7 +102,13 @@ public class GetHeadersFromPeerByNumberTaskTest extends PeerMessageTaskTest<List
// Execute task and wait for response
final AbstractGetHeadersFromPeerTask task =
new GetHeadersFromPeerByNumberTask(
protocolSchedule, ethContext, startNumber, count, skip, reverse);
protocolSchedule,
ethContext,
startNumber,
count,
skip,
reverse,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
final AtomicReference<AbstractPeerTask.PeerTaskResult<List<BlockHeader>>> actualResult =
new AtomicReference<>();
final AtomicBoolean done = new AtomicBoolean(false);

@ -32,6 +32,7 @@ import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.ArrayList;
import java.util.List;
@ -71,12 +72,13 @@ public class ImportBlocksTaskTest
shortBlockchain,
protocolContext.getWorldStateArchive(),
protocolContext.getConsensusState());
return ImportBlocksTask.<Void>fromHeader(
return ImportBlocksTask.fromHeader(
protocolSchedule,
modifiedContext,
ethContext,
firstBlock.getHeader(),
requestedData.size());
requestedData.size(),
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
}
@Override

@ -24,6 +24,9 @@ import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockExce
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.Arrays;
import java.util.Collections;
@ -41,6 +44,8 @@ public class PersistBlockTaskTest {
private ProtocolSchedule<Void> protocolSchedule;
private ProtocolContext<Void> protocolContext;
private MutableBlockchain blockchain;
private final LabelledMetric<OperationTimer> ethTasksTimer =
NoOpMetricsSystem.NO_OP_LABELLED_TIMER;
@Before
public void setup() {
@ -61,7 +66,7 @@ public class PersistBlockTaskTest {
// Create task
final PersistBlockTask<Void> task =
PersistBlockTask.create(
protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL);
protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL, ethTasksTimer);
final CompletableFuture<Block> result = task.run();
Awaitility.await().atMost(30, SECONDS).until(result::isDone);
@ -83,7 +88,7 @@ public class PersistBlockTaskTest {
// Create task
final PersistBlockTask<Void> task =
PersistBlockTask.create(
protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL);
protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL, ethTasksTimer);
final CompletableFuture<Block> result = task.run();
Awaitility.await().atMost(30, SECONDS).until(result::isDone);
@ -107,7 +112,11 @@ public class PersistBlockTaskTest {
// Create task
final CompletableFuture<List<Block>> task =
PersistBlockTask.forSequentialBlocks(
protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL)
protocolSchedule,
protocolContext,
nextBlocks,
HeaderValidationMode.FULL,
ethTasksTimer)
.get();
Awaitility.await().atMost(30, SECONDS).until(task::isDone);
@ -133,7 +142,11 @@ public class PersistBlockTaskTest {
// Create task
final CompletableFuture<List<Block>> task =
PersistBlockTask.forSequentialBlocks(
protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL)
protocolSchedule,
protocolContext,
nextBlocks,
HeaderValidationMode.FULL,
ethTasksTimer)
.get();
Awaitility.await().atMost(30, SECONDS).until(task::isDone);
@ -158,7 +171,11 @@ public class PersistBlockTaskTest {
// Create task
final CompletableFuture<List<Block>> task =
PersistBlockTask.forSequentialBlocks(
protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL)
protocolSchedule,
protocolContext,
nextBlocks,
HeaderValidationMode.FULL,
ethTasksTimer)
.get();
Awaitility.await().atMost(30, SECONDS).until(task::isDone);
@ -183,7 +200,11 @@ public class PersistBlockTaskTest {
// Create task
final CompletableFuture<List<Block>> task =
PersistBlockTask.forUnorderedBlocks(
protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL)
protocolSchedule,
protocolContext,
nextBlocks,
HeaderValidationMode.FULL,
ethTasksTimer)
.get();
Awaitility.await().atMost(30, SECONDS).until(task::isDone);
@ -211,7 +232,11 @@ public class PersistBlockTaskTest {
// Create task
final CompletableFuture<List<Block>> task =
PersistBlockTask.forUnorderedBlocks(
protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL)
protocolSchedule,
protocolContext,
nextBlocks,
HeaderValidationMode.FULL,
ethTasksTimer)
.get();
Awaitility.await().atMost(30, SECONDS).until(task::isDone);
@ -236,7 +261,11 @@ public class PersistBlockTaskTest {
// Create task
final CompletableFuture<List<Block>> task =
PersistBlockTask.forUnorderedBlocks(
protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL)
protocolSchedule,
protocolContext,
nextBlocks,
HeaderValidationMode.FULL,
ethTasksTimer)
.get();
Awaitility.await().atMost(30, SECONDS).until(task::isDone);
@ -263,7 +292,11 @@ public class PersistBlockTaskTest {
// Create task
final CompletableFuture<List<Block>> task =
PersistBlockTask.forUnorderedBlocks(
protocolSchedule, protocolContext, nextBlocks, HeaderValidationMode.FULL)
protocolSchedule,
protocolContext,
nextBlocks,
HeaderValidationMode.FULL,
ethTasksTimer)
.get();
Awaitility.await().atMost(30, SECONDS).until(task::isDone);
@ -286,7 +319,7 @@ public class PersistBlockTaskTest {
// Create task
final PersistBlockTask<Void> task =
PersistBlockTask.create(
protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL);
protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL, ethTasksTimer);
task.cancel();
final CompletableFuture<Block> result = task.run();
@ -306,9 +339,9 @@ public class PersistBlockTaskTest {
// Create task
final PersistBlockTask<Void> task =
PersistBlockTask.create(
protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL);
protocolSchedule, protocolContext, nextBlock, HeaderValidationMode.FULL, ethTasksTimer);
final PersistBlockTask<Void> taskSpy = Mockito.spy(task);
Mockito.doNothing().when(taskSpy).executeTask();
Mockito.doNothing().when(taskSpy).executeTaskTimed();
final CompletableFuture<Block> result = taskSpy.run();
taskSpy.cancel();

@ -84,7 +84,9 @@ public class PipelinedImportChainSegmentTaskTest
modifiedContext,
ethContext,
1,
new BlockHeader[] {previousBlock.getHeader(), lastBlock.getHeader()});
ethTasksTimer,
previousBlock.getHeader(),
lastBlock.getHeader());
}
@Override
@ -116,6 +118,7 @@ public class PipelinedImportChainSegmentTaskTest
modifiedContext,
ethContext,
1,
ethTasksTimer,
firstBlock.getHeader(),
secondBlock.getHeader());
@ -165,6 +168,7 @@ public class PipelinedImportChainSegmentTaskTest
modifiedContext,
ethContext,
1,
ethTasksTimer,
fakeFirstBlock.getHeader(),
thirdBlock.getHeader());
@ -214,7 +218,7 @@ public class PipelinedImportChainSegmentTaskTest
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule, modifiedContext, ethContext, 1, checkpointHeaders);
protocolSchedule, modifiedContext, ethContext, 1, ethTasksTimer, checkpointHeaders);
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
@ -270,7 +274,7 @@ public class PipelinedImportChainSegmentTaskTest
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule, modifiedContext, ethContext, 2, checkpointHeaders);
protocolSchedule, modifiedContext, ethContext, 2, ethTasksTimer, checkpointHeaders);
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();
@ -330,7 +334,7 @@ public class PipelinedImportChainSegmentTaskTest
protocolContext.getConsensusState());
final EthTask<List<Block>> task =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule, modifiedContext, ethContext, 3, checkpointHeaders);
protocolSchedule, modifiedContext, ethContext, 3, ethTasksTimer, checkpointHeaders);
// Execute task and wait for response
final AtomicReference<List<Block>> actualResult = new AtomicReference<>();

@ -18,6 +18,9 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -29,18 +32,20 @@ import org.junit.Test;
public class WaitForPeerTaskTest {
private EthProtocolManager ethProtocolManager;
private EthContext ethContext;
private LabelledMetric<OperationTimer> ethTasksTimer;
@Before
public void setupTest() {
ethProtocolManager = EthProtocolManagerTestUtil.create();
ethContext = ethProtocolManager.ethContext();
ethTasksTimer = NoOpMetricsSystem.NO_OP_LABELLED_TIMER;
}
@Test
public void completesWhenPeerConnects() throws ExecutionException, InterruptedException {
// Execute task and wait for response
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeerTask.create(ethContext);
final EthTask<Void> task = WaitForPeerTask.create(ethContext, ethTasksTimer);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
@ -55,7 +60,7 @@ public class WaitForPeerTaskTest {
@Test
public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, InterruptedException {
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeerTask.create(ethContext);
final EthTask<Void> task = WaitForPeerTask.create(ethContext, ethTasksTimer);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
@ -70,7 +75,7 @@ public class WaitForPeerTaskTest {
@Test
public void cancel() throws ExecutionException, InterruptedException {
// Execute task
final EthTask<Void> task = WaitForPeerTask.create(ethContext);
final EthTask<Void> task = WaitForPeerTask.create(ethContext, ethTasksTimer);
final CompletableFuture<Void> future = task.run();
assertThat(future.isDone()).isFalse();

@ -18,6 +18,9 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -29,18 +32,20 @@ import org.junit.Test;
public class WaitForPeersTaskTest {
private EthProtocolManager ethProtocolManager;
private EthContext ethContext;
private LabelledMetric<OperationTimer> ethTasksTimer;
@Before
public void setupTest() {
ethProtocolManager = EthProtocolManagerTestUtil.create();
ethContext = ethProtocolManager.ethContext();
ethTasksTimer = NoOpMetricsSystem.NO_OP_LABELLED_TIMER;
}
@Test
public void completesWhenPeersConnects() throws ExecutionException, InterruptedException {
// Execute task and wait for response
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2, ethTasksTimer);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
@ -56,7 +61,7 @@ public class WaitForPeersTaskTest {
@Test
public void doesNotCompleteWhenNoPeerConnects() throws ExecutionException, InterruptedException {
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2, ethTasksTimer);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
@ -72,7 +77,7 @@ public class WaitForPeersTaskTest {
public void doesNotCompleteWhenSomePeersConnects()
throws ExecutionException, InterruptedException {
final AtomicBoolean successful = new AtomicBoolean(false);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2, ethTasksTimer);
final CompletableFuture<Void> future = task.run();
future.whenComplete(
(result, error) -> {
@ -88,7 +93,7 @@ public class WaitForPeersTaskTest {
@Test
public void cancel() throws ExecutionException, InterruptedException {
// Execute task
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2);
final EthTask<Void> task = WaitForPeersTask.create(ethContext, 2, ethTasksTimer);
final CompletableFuture<Void> future = task.run();
assertThat(future.isDone()).isFalse();

@ -17,7 +17,8 @@ public enum MetricCategory {
RPC("rpc"),
JVM("jvm", false),
PROCESS("process", false),
BLOCKCHAIN("blockchain");
BLOCKCHAIN("blockchain"),
SYNCHRONIZER("synchronizer");
private final String name;
private final boolean pantheonSpecific;

@ -28,6 +28,7 @@ public class NoOpMetricsSystem implements MetricsSystem {
private static final Counter NO_OP_COUNTER = new NoOpCounter();
private static final TimingContext NO_OP_TIMING_CONTEXT = () -> 0;
private static final OperationTimer NO_OP_TIMER = () -> NO_OP_TIMING_CONTEXT;
public static final LabelledMetric<OperationTimer> NO_OP_LABELLED_TIMER = label -> NO_OP_TIMER;
@Override
public LabelledMetric<Counter> createLabelledCounter(

@ -35,7 +35,7 @@ import io.prometheus.client.Collector.MetricFamilySamples;
import io.prometheus.client.Collector.MetricFamilySamples.Sample;
import io.prometheus.client.Collector.Type;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import io.prometheus.client.Summary;
import io.prometheus.client.hotspot.BufferPoolsExports;
import io.prometheus.client.hotspot.ClassLoadingExports;
import io.prometheus.client.hotspot.GarbageCollectorExports;
@ -84,9 +84,18 @@ public class PrometheusMetricsSystem implements MetricsSystem {
final String name,
final String help,
final String... labelNames) {
final Histogram histogram = Histogram.build(name, help).labelNames(labelNames).create();
addCollector(category, histogram);
return new PrometheusTimer(histogram);
final Summary summary =
Summary.build(convertToPrometheusName(category, name), help)
.quantile(0.2, 0.02)
.quantile(0.5, 0.05)
.quantile(0.8, 0.02)
.quantile(0.95, 0.005)
.quantile(0.99, 0.001)
.quantile(1.0, 0)
.labelNames(labelNames)
.create();
addCollector(category, summary);
return new PrometheusTimer(summary);
}
@Override
@ -127,6 +136,9 @@ public class PrometheusMetricsSystem implements MetricsSystem {
if (familySamples.type == Type.HISTOGRAM) {
return convertHistogramSampleNamesToLabels(category, sample, familySamples);
}
if (familySamples.type == Type.SUMMARY) {
return convertSummarySampleNamesToLabels(category, sample, familySamples);
}
return new Observation(
category,
convertFromPrometheusName(category, sample.name),
@ -149,6 +161,23 @@ public class PrometheusMetricsSystem implements MetricsSystem {
labelValues);
}
private Observation convertSummarySampleNamesToLabels(
final MetricCategory category, final Sample sample, final MetricFamilySamples familySamples) {
final List<String> labelValues = new ArrayList<>(sample.labelValues);
if (sample.name.endsWith("_sum")) {
labelValues.add("sum");
} else if (sample.name.endsWith("_count")) {
labelValues.add("count");
} else {
labelValues.add(labelValues.size() - 1, "quantile");
}
return new Observation(
category,
convertFromPrometheusName(category, familySamples.name),
sample.value,
labelValues);
}
private String convertToPrometheusName(final MetricCategory category, final String name) {
return prometheusPrefix(category) + name;
}

@ -15,20 +15,19 @@ package tech.pegasys.pantheon.metrics.prometheus;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import io.prometheus.client.Histogram;
import io.prometheus.client.Histogram.Child;
import io.prometheus.client.Summary;
class PrometheusTimer implements LabelledMetric<OperationTimer> {
private final Histogram histogram;
private final Summary summary;
public PrometheusTimer(final Histogram histogram) {
this.histogram = histogram;
public PrometheusTimer(final Summary summary) {
this.summary = summary;
}
@Override
public OperationTimer labels(final String... labels) {
final Child metric = histogram.labels(labels);
final Summary.Child metric = summary.labels(labels);
return () -> metric.startTimer()::observeDuration;
}
}

@ -91,23 +91,14 @@ public class PrometheusMetricsSystemTest {
assertThat(metricsSystem.getMetrics())
.usingElementComparator(IGNORE_VALUES)
.containsExactlyInAnyOrder(
new Observation(RPC, "request", null, asList("bucket", "0.005")),
new Observation(RPC, "request", null, asList("bucket", "0.01")),
new Observation(RPC, "request", null, asList("bucket", "0.025")),
new Observation(RPC, "request", null, asList("bucket", "0.05")),
new Observation(RPC, "request", null, asList("bucket", "0.075")),
new Observation(RPC, "request", null, asList("bucket", "0.1")),
new Observation(RPC, "request", null, asList("bucket", "0.25")),
new Observation(RPC, "request", null, asList("bucket", "0.5")),
new Observation(RPC, "request", null, asList("bucket", "0.75")),
new Observation(RPC, "request", null, asList("bucket", "1.0")),
new Observation(RPC, "request", null, asList("bucket", "2.5")),
new Observation(RPC, "request", null, asList("bucket", "5.0")),
new Observation(RPC, "request", null, asList("bucket", "7.5")),
new Observation(RPC, "request", null, asList("bucket", "10.0")),
new Observation(RPC, "request", null, asList("bucket", "+Inf")),
new Observation(RPC, "request", null, asList("quantile", "0.2")),
new Observation(RPC, "request", null, asList("quantile", "0.5")),
new Observation(RPC, "request", null, asList("quantile", "0.8")),
new Observation(RPC, "request", null, asList("quantile", "0.95")),
new Observation(RPC, "request", null, asList("quantile", "0.99")),
new Observation(RPC, "request", null, asList("quantile", "1.0")),
new Observation(RPC, "request", null, singletonList("sum")),
new Observation(RPC, "request", 1d, singletonList("count")));
new Observation(RPC, "request", null, singletonList("count")));
}
@Test
@ -120,21 +111,12 @@ public class PrometheusMetricsSystemTest {
assertThat(metricsSystem.getMetrics())
.usingElementComparator(IGNORE_VALUES) // We don't know how long it will actually take.
.containsExactlyInAnyOrder(
new Observation(RPC, "request", null, asList("method", "bucket", "0.005")),
new Observation(RPC, "request", null, asList("method", "bucket", "0.01")),
new Observation(RPC, "request", null, asList("method", "bucket", "0.025")),
new Observation(RPC, "request", null, asList("method", "bucket", "0.05")),
new Observation(RPC, "request", null, asList("method", "bucket", "0.075")),
new Observation(RPC, "request", null, asList("method", "bucket", "0.1")),
new Observation(RPC, "request", null, asList("method", "bucket", "0.25")),
new Observation(RPC, "request", null, asList("method", "bucket", "0.5")),
new Observation(RPC, "request", null, asList("method", "bucket", "0.75")),
new Observation(RPC, "request", null, asList("method", "bucket", "1.0")),
new Observation(RPC, "request", null, asList("method", "bucket", "2.5")),
new Observation(RPC, "request", null, asList("method", "bucket", "5.0")),
new Observation(RPC, "request", null, asList("method", "bucket", "7.5")),
new Observation(RPC, "request", null, asList("method", "bucket", "10.0")),
new Observation(RPC, "request", null, asList("method", "bucket", "+Inf")),
new Observation(RPC, "request", null, asList("method", "quantile", "0.2")),
new Observation(RPC, "request", null, asList("method", "quantile", "0.5")),
new Observation(RPC, "request", null, asList("method", "quantile", "0.8")),
new Observation(RPC, "request", null, asList("method", "quantile", "0.95")),
new Observation(RPC, "request", null, asList("method", "quantile", "0.99")),
new Observation(RPC, "request", null, asList("method", "quantile", "1.0")),
new Observation(RPC, "request", null, asList("method", "sum")),
new Observation(RPC, "request", null, asList("method", "count")));
}

@ -54,6 +54,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager;
import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration;
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.io.IOException;
@ -156,7 +157,8 @@ public class CliquePantheonController implements PantheonController<CliqueContex
protocolContext,
ethProtocolManager.ethContext(),
syncState,
metricsSystem);
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"));
final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(

@ -53,6 +53,7 @@ import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.io.IOException;
@ -167,7 +168,8 @@ public class IbftLegacyPantheonController implements PantheonController<IbftCont
protocolContext,
ethProtocolManager.ethContext(),
syncState,
metricsSystem);
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"));
final Runnable closer =
() -> {

@ -61,6 +61,7 @@ import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.io.IOException;
@ -164,7 +165,8 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
protocolContext,
ethProtocolManager.ethContext(),
syncState,
metricsSystem);
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"));
final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(

@ -41,6 +41,7 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager;
import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration;
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.io.IOException;
@ -129,7 +130,8 @@ public class MainnetPantheonController implements PantheonController<Void> {
protocolContext,
ethProtocolManager.ethContext(),
syncState,
metricsSystem);
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"));
final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(

Loading…
Cancel
Save