[PAN-2271] Add metrics to Parallel Download pipeline (#985)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
S. Matthew English 6 years ago committed by GitHub
parent 9779d5e21b
commit c76ada2a4e
  1. 17
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractPipelinedTask.java
  2. 110
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/IncrementerTest.java

@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager.task;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.util.ArrayList;
@ -37,6 +39,9 @@ public abstract class AbstractPipelinedTask<I, O> extends AbstractEthTask<List<O
private boolean shuttingDown = false;
private final AtomicReference<Throwable> processingException = new AtomicReference<>(null);
private final Counter inboundQueueCounter;
private final Counter outboundQueueCounter;
protected AbstractPipelinedTask(
final BlockingQueue<I> inboundQueue,
final int outboundBacklogSize,
@ -45,6 +50,16 @@ public abstract class AbstractPipelinedTask<I, O> extends AbstractEthTask<List<O
this.inboundQueue = inboundQueue;
outboundQueue = new LinkedBlockingQueue<>(outboundBacklogSize);
results = new ArrayList<>();
this.inboundQueueCounter =
metricsSystem.createCounter(
MetricCategory.SYNCHRONIZER,
"inboundQueueCounter",
"count of queue items that started processing");
this.outboundQueueCounter =
metricsSystem.createCounter(
MetricCategory.SYNCHRONIZER,
"outboundQueueCounter",
"count of queue items that finished processing");
}
@Override
@ -62,6 +77,7 @@ public abstract class AbstractPipelinedTask<I, O> extends AbstractEthTask<List<O
// timed out waiting for a result
continue;
}
inboundQueueCounter.inc();
} catch (final InterruptedException e) {
// this is expected
continue;
@ -72,6 +88,7 @@ public abstract class AbstractPipelinedTask<I, O> extends AbstractEthTask<List<O
while (!isDone()) {
try {
if (outboundQueue.offer(o, 1, TimeUnit.SECONDS)) {
outboundQueueCounter.inc();
results.add(o);
break;
}

@ -0,0 +1,110 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fullsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.Observation;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.metrics.prometheus.MetricsConfiguration;
import tech.pegasys.pantheon.metrics.prometheus.PrometheusMetricsSystem;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.util.Lists;
import org.junit.Test;
public class IncrementerTest {
private final MetricsConfiguration metricsConfiguration = MetricsConfiguration.createDefault();
private ProtocolSchedule<Void> protocolSchedule;
private EthContext ethContext;
private ProtocolContext<Void> protocolContext;
private SyncState syncState;
private MutableBlockchain localBlockchain;
private MetricsSystem metricsSystem;
@Test
public void parallelDownloadPipelineCounterShouldIncrement() {
metricsConfiguration.setEnabled(true);
metricsSystem = PrometheusMetricsSystem.init(metricsConfiguration);
final BlockchainSetupUtil<Void> localBlockchainSetup = BlockchainSetupUtil.forTesting();
localBlockchain = spy(localBlockchainSetup.getBlockchain());
final BlockchainSetupUtil<Void> otherBlockchainSetup = BlockchainSetupUtil.forTesting();
final Blockchain otherBlockchain = otherBlockchainSetup.getBlockchain();
protocolSchedule = localBlockchainSetup.getProtocolSchedule();
protocolContext = localBlockchainSetup.getProtocolContext();
final EthProtocolManager ethProtocolManager =
EthProtocolManagerTestUtil.create(
localBlockchain,
localBlockchainSetup.getWorldArchive(),
new EthScheduler(1, 1, 1, new NoOpMetricsSystem()));
ethContext = ethProtocolManager.ethContext();
syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers());
otherBlockchainSetup.importFirstBlocks(15);
final long targetBlock = otherBlockchain.getChainHeadBlockNumber();
// Sanity check
assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber());
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(otherBlockchain);
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder().downloaderChainSegmentSize(10).build();
final FullSyncDownloader<?> downloader = downloader(syncConfig);
downloader.start();
peer.respondWhileOtherThreadsWork(responder, () -> !syncState.syncTarget().isPresent());
assertThat(syncState.syncTarget()).isPresent();
assertThat(syncState.syncTarget().get().peer()).isEqualTo(peer.getEthPeer());
peer.respondWhileOtherThreadsWork(
responder, () -> localBlockchain.getChainHeadBlockNumber() < targetBlock);
assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(targetBlock);
final List<Observation> metrics =
metricsSystem.getMetrics(MetricCategory.SYNCHRONIZER).collect(Collectors.toList());
final Observation inboundObservation =
new Observation(MetricCategory.SYNCHRONIZER, "inboundQueueCounter", 6.0, Lists.emptyList());
final Observation outboundObservation =
new Observation(
MetricCategory.SYNCHRONIZER, "outboundQueueCounter", 5.0, Lists.emptyList());
assertThat(metrics).contains(inboundObservation, outboundObservation);
}
private FullSyncDownloader<?> downloader(final SynchronizerConfiguration syncConfig) {
return new FullSyncDownloader<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
}
}
Loading…
Cancel
Save