FullSync Future should stop when total terminal difficulty is reached (#3423)

* FullSync Future should stop when total terminal difficulty is reached

Signed-off-by: Jiri Peinlich <jiri.peinlich@gmail.com>
pull/3516/head
Jiri Peinlich 3 years ago committed by GitHub
parent b73f5f98ad
commit 605ff69d3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 3
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java
  3. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthMessages.java
  4. 38
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java
  5. 21
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSource.java
  6. 87
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  7. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java
  8. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  9. 37
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FlexibleBlockHashTerminalCondition.java
  10. 9
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStep.java
  11. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java
  12. 18
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java
  13. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java
  14. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManager.java
  15. 83
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/SyncTerminationCondition.java
  16. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSourceTest.java
  17. 10
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloaderTest.java
  18. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStepTest.java
  19. 8
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java
  20. 8
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java
  21. 182
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java
  22. 8
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java
  23. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java
  24. 5
      ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/DummySynchronizer.java

@ -51,6 +51,7 @@ import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValida
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
@ -371,7 +372,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
syncState,
dataDirectory,
clock,
metricsSystem);
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()));
final MiningCoordinator miningCoordinator =
createMiningCoordinator(
@ -416,6 +418,14 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
additionalPluginServices);
}
protected SyncTerminationCondition getFullSyncTerminationCondition(final Blockchain blockchain) {
return genesisConfig
.getConfigOptions()
.getTerminalTotalDifficulty()
.map(difficulty -> SyncTerminationCondition.difficulty(difficulty, blockchain))
.orElse(SyncTerminationCondition.never());
}
protected void prepForBuild() {}
protected JsonRpcMethods createAdditionalJsonRpcMethodFactory(

@ -18,6 +18,7 @@ import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/** Provides an interface to block synchronization processes. */
public interface Synchronizer {
@ -25,7 +26,7 @@ public interface Synchronizer {
// Default tolerance used to determine whether or not this node is "in sync"
long DEFAULT_IN_SYNC_TOLERANCE = 5;
void start();
CompletableFuture<Void> start();
void stop();

@ -41,8 +41,16 @@ public class EthMessages {
messageResponseConstructor.response(ethMessage.getData()));
}
public void subscribe(final int messageCode, final MessageCallback callback) {
listenersByCode.computeIfAbsent(messageCode, key -> Subscribers.create()).subscribe(callback);
public long subscribe(final int messageCode, final MessageCallback callback) {
return listenersByCode
.computeIfAbsent(messageCode, key -> Subscribers.create())
.subscribe(callback);
}
public void unsubsribe(final long id) {
for (Subscribers<MessageCallback> subscribers : listenersByCode.values()) {
subscribers.unsubscribe(id);
}
}
public void registerResponseConstructor(

@ -82,6 +82,9 @@ public class BlockPropagationManager {
private final Set<Long> requestedNonAnnouncedBlocks =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final PendingBlocksManager pendingBlocksManager;
private Optional<Long> onBlockAddedSId = Optional.empty();
private Optional<Long> newBlockSId;
private Optional<Long> newBlockHashesSId;
BlockPropagationManager(
final SynchronizerConfiguration config,
@ -111,12 +114,37 @@ public class BlockPropagationManager {
}
}
public void stop() {
if (started.get()) {
clearListeners();
started.set(false);
} else {
LOG.warn("Attempted to stop when we are not even running...");
}
}
private void setupListeners() {
protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded);
ethContext.getEthMessages().subscribe(EthPV62.NEW_BLOCK, this::handleNewBlockFromNetwork);
ethContext
.getEthMessages()
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork);
onBlockAddedSId =
Optional.of(protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded));
newBlockSId =
Optional.of(
ethContext
.getEthMessages()
.subscribe(EthPV62.NEW_BLOCK, this::handleNewBlockFromNetwork));
newBlockHashesSId =
Optional.of(
ethContext
.getEthMessages()
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork));
}
private void clearListeners() {
onBlockAddedSId.ifPresent(id -> protocolContext.getBlockchain().removeObserver(id));
newBlockSId.ifPresent(id -> ethContext.getEthMessages().unsubsribe(id));
newBlockHashesSId.ifPresent(id -> ethContext.getEthMessages().unsubsribe(id));
onBlockAddedSId = Optional.empty();
newBlockSId = Optional.empty();
newBlockHashesSId = Optional.empty();
}
private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {

@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import java.time.Duration;
import java.util.ArrayDeque;
@ -45,6 +46,7 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
private final EthScheduler ethScheduler;
private final int checkpointTimeoutsPermitted;
private final Duration newHeaderWaitDuration;
private final SyncTerminationCondition terminationCondition;
private final Queue<CheckpointRange> retrievedRanges = new ArrayDeque<>();
private BlockHeader lastRangeEnd;
@ -59,7 +61,8 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
final EthScheduler ethScheduler,
final EthPeer peer,
final BlockHeader commonAncestor,
final int checkpointTimeoutsPermitted) {
final int checkpointTimeoutsPermitted,
final SyncTerminationCondition terminationCondition) {
this(
checkpointFetcher,
syncTargetChecker,
@ -67,7 +70,8 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
peer,
commonAncestor,
checkpointTimeoutsPermitted,
Duration.ofSeconds(5));
Duration.ofSeconds(5),
terminationCondition);
}
CheckpointRangeSource(
@ -77,7 +81,8 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
final EthPeer peer,
final BlockHeader commonAncestor,
final int checkpointTimeoutsPermitted,
final Duration newHeaderWaitDuration) {
final Duration newHeaderWaitDuration,
final SyncTerminationCondition terminationCondition) {
this.checkpointFetcher = checkpointFetcher;
this.syncTargetChecker = syncTargetChecker;
this.ethScheduler = ethScheduler;
@ -85,14 +90,16 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
this.lastRangeEnd = commonAncestor;
this.checkpointTimeoutsPermitted = checkpointTimeoutsPermitted;
this.newHeaderWaitDuration = newHeaderWaitDuration;
this.terminationCondition = terminationCondition;
}
@Override
public boolean hasNext() {
return !retrievedRanges.isEmpty()
|| (requestFailureCount < checkpointTimeoutsPermitted
&& syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd)
&& !reachedEndOfCheckpoints);
return terminationCondition.shouldContinueDownload()
&& (!retrievedRanges.isEmpty()
|| (requestFailureCount < checkpointTimeoutsPermitted
&& syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd)
&& !reachedEndOfCheckpoints));
}
@Override

@ -21,9 +21,9 @@ import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncException;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.FullSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@ -33,11 +33,11 @@ import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.ExceptionUtils;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
@ -66,7 +66,8 @@ public class DefaultSynchronizer implements Synchronizer {
final SyncState syncState,
final Path dataDirectory,
final Clock clock,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition) {
this.maybePruner = maybePruner;
this.syncState = syncState;
@ -91,7 +92,13 @@ public class DefaultSynchronizer implements Synchronizer {
this.fullSyncDownloader =
new FullSyncDownloader(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
terminationCondition);
this.fastSyncDownloader =
FastDownloaderFactory.create(
syncConfig,
@ -123,25 +130,25 @@ public class DefaultSynchronizer implements Synchronizer {
}
@Override
public void start() {
public CompletableFuture<Void> start() {
if (running.compareAndSet(false, true)) {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
CompletableFuture<Void> future;
if (fastSyncDownloader.isPresent()) {
fastSyncDownloader
.get()
.start()
.whenComplete(this::handleFastSyncResult)
.exceptionally(
ex -> {
LOG.warn("Exiting FastSync process");
System.exit(0);
return null;
});
future = fastSyncDownloader.get().start().thenCompose(this::handleFastSyncResult);
} else {
startFullSync();
future = startFullSync();
}
future =
future.thenApply(
unused -> {
blockPropagationManager.stop();
running.set(false);
return null;
});
return future;
} else {
throw new IllegalStateException("Attempt to start an already started synchronizer.");
}
@ -154,6 +161,7 @@ public class DefaultSynchronizer implements Synchronizer {
fastSyncDownloader.ifPresent(FastSyncDownloader::stop);
fullSyncDownloader.stop();
maybePruner.ifPresent(Pruner::stop);
blockPropagationManager.stop();
}
}
@ -164,36 +172,37 @@ public class DefaultSynchronizer implements Synchronizer {
}
}
private void handleFastSyncResult(final FastSyncState result, final Throwable error) {
private CompletableFuture<Void> handleFastSyncResult(final FastSyncState result) {
if (!running.get()) {
// We've been shutdown which will have triggered the fast sync future to complete
return;
return CompletableFuture.completedFuture(null);
}
fastSyncDownloader.ifPresent(FastSyncDownloader::deleteFastSyncState);
final Throwable rootCause = ExceptionUtils.rootCause(error);
if (rootCause instanceof FastSyncException) {
LOG.error(
"Fast sync failed ({}), please try again.", ((FastSyncException) rootCause).getError());
throw new FastSyncException(rootCause);
} else if (error != null) {
LOG.error("Fast sync failed, please try again.", error);
throw new FastSyncException(error);
} else {
result
.getPivotBlockHeader()
.ifPresent(
blockHeader ->
protocolContext.getWorldStateArchive().setArchiveStateUnSafe(blockHeader));
LOG.info(
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
}
startFullSync();
result
.getPivotBlockHeader()
.ifPresent(
blockHeader ->
protocolContext.getWorldStateArchive().setArchiveStateUnSafe(blockHeader));
LOG.info(
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
return startFullSync();
}
private void startFullSync() {
private CompletableFuture<Void> startFullSync() {
maybePruner.ifPresent(Pruner::start);
fullSyncDownloader.start();
return fullSyncDownloader
.start()
.thenCompose(
unused -> {
maybePruner.ifPresent(Pruner::stop);
return null;
})
.thenApply(
o -> {
maybePruner.ifPresent(Pruner::stop);
return null;
});
}
@Override

@ -153,6 +153,9 @@ public class PipelineChainDownloader implements ChainDownloader {
return CompletableFuture.failedFuture(
new CancellationException("Chain download was cancelled"));
}
if (!syncTargetManager.shouldContinueDownloading()) {
return CompletableFuture.completedFuture(null);
}
syncState.setSyncTarget(target.peer(), target.commonAncestor());
currentDownloadPipeline = downloadPipelineFactory.createDownloadPipelineForSyncTarget(target);
debugLambda(

@ -33,6 +33,7 @@ import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.BesuMetricCategory;
@ -105,7 +106,8 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
ethContext.getScheduler(),
target.peer(),
target.commonAncestor(),
syncConfig.getDownloaderCheckpointTimeoutsPermitted());
syncConfig.getDownloaderCheckpointTimeoutsPermitted(),
SyncTerminationCondition.never());
final DownloadHeadersStep downloadHeadersStep =
new DownloadHeadersStep(
protocolSchedule,

@ -0,0 +1,37 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fullsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
public class FlexibleBlockHashTerminalCondition implements SyncTerminationCondition {
private Hash blockHash;
private final Blockchain blockchain;
public FlexibleBlockHashTerminalCondition(final Hash blockHash, final Blockchain blockchain) {
this.blockHash = blockHash;
this.blockchain = blockchain;
}
public synchronized void setBlockHash(final Hash blockHash) {
this.blockHash = blockHash;
}
@Override
public synchronized boolean getAsBoolean() {
return blockchain.contains(blockHash);
}
}

@ -35,18 +35,25 @@ public class FullImportBlockStep implements Consumer<Block> {
private final EthContext ethContext;
private long gasAccumulator = 0;
private long lastReportMillis = 0;
private final SyncTerminationCondition fullSyncTerminationCondition;
public FullImportBlockStep(
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext) {
final EthContext ethContext,
final SyncTerminationCondition syncTerminationCondition) {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.fullSyncTerminationCondition = syncTerminationCondition;
}
@Override
public void accept(final Block block) {
if (fullSyncTerminationCondition.shouldStopDownload()) {
LOG.debug("Not importing another block, because terminal condition was reached.");
return;
}
final long blockNumber = block.getHeader().getNumber();
final String blockHash = block.getHash().toHexString();
final BlockImporter importer =

@ -32,17 +32,28 @@ public class FullSyncChainDownloader {
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition) {
final FullSyncTargetManager syncTargetManager =
new FullSyncTargetManager(
config, protocolSchedule, protocolContext, ethContext, metricsSystem);
config,
protocolSchedule,
protocolContext,
ethContext,
metricsSystem,
terminationCondition);
return new PipelineChainDownloader(
syncState,
syncTargetManager,
new FullSyncDownloadPipelineFactory(
config, protocolSchedule, protocolContext, ethContext, metricsSystem),
config,
protocolSchedule,
protocolContext,
ethContext,
metricsSystem,
terminationCondition),
ethContext.getScheduler(),
metricsSystem);
}

@ -48,18 +48,21 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
private final ValidationPolicy detachedValidationPolicy =
() -> HeaderValidationMode.DETACHED_ONLY;
private final BetterSyncTargetEvaluator betterSyncTargetEvaluator;
private final SyncTerminationCondition fullSyncTerminationCondition;
public FullSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final SyncTerminationCondition syncTerminationCondition) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.fullSyncTerminationCondition = syncTerminationCondition;
betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
}
@ -75,7 +78,8 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
ethContext.getScheduler(),
target.peer(),
target.commonAncestor(),
syncConfig.getDownloaderCheckpointTimeoutsPermitted());
syncConfig.getDownloaderCheckpointTimeoutsPermitted(),
fullSyncTerminationCondition);
final DownloadHeadersStep downloadHeadersStep =
new DownloadHeadersStep(
protocolSchedule,
@ -91,7 +95,8 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep();
final FullImportBlockStep importBlockStep =
new FullImportBlockStep(protocolSchedule, protocolContext, ethContext);
new FullImportBlockStep(
protocolSchedule, protocolContext, ethContext, fullSyncTerminationCondition);
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",
@ -115,18 +120,19 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
private boolean shouldContinueDownloadingFromPeer(
final EthPeer peer, final BlockHeader lastCheckpointHeader) {
final boolean shouldTerminate = fullSyncTerminationCondition.shouldStopDownload();
final boolean caughtUpToPeer =
peer.chainState().getEstimatedHeight() <= lastCheckpointHeader.getNumber();
final boolean isDisconnected = peer.isDisconnected();
final boolean shouldSwitchSyncTarget = betterSyncTargetEvaluator.shouldSwitchSyncTarget(peer);
LOG.debug(
"shouldContinueDownloadingFromPeer? {}, disconnected {}, caughtUp {}, shouldSwitchSyncTarget {}",
"shouldTerminate {}, shouldContinueDownloadingFromPeer? {}, disconnected {}, caughtUp {}, shouldSwitchSyncTarget {}",
shouldTerminate,
peer,
isDisconnected,
caughtUpToPeer,
shouldSwitchSyncTarget);
return !isDisconnected && !caughtUpToPeer && !shouldSwitchSyncTarget;
return !shouldTerminate && !isDisconnected && !caughtUpToPeer && !shouldSwitchSyncTarget;
}
}

@ -23,6 +23,8 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,19 +42,26 @@ public class FullSyncDownloader {
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition) {
this.syncConfig = syncConfig;
this.protocolContext = protocolContext;
this.syncState = syncState;
this.chainDownloader =
FullSyncChainDownloader.create(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
terminationCondition);
}
public void start() {
public CompletableFuture<Void> start() {
LOG.info("Starting full sync.");
chainDownloader.start();
return chainDownloader.start();
}
public void stop() {

@ -39,16 +39,19 @@ class FullSyncTargetManager extends SyncTargetManager {
private static final Logger LOG = LoggerFactory.getLogger(FullSyncTargetManager.class);
private final ProtocolContext protocolContext;
private final EthContext ethContext;
private final SyncTerminationCondition terminationCondition;
FullSyncTargetManager(
final SynchronizerConfiguration config,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition) {
super(config, protocolSchedule, protocolContext, ethContext, metricsSystem);
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.terminationCondition = terminationCondition;
}
@Override
@ -105,6 +108,6 @@ class FullSyncTargetManager extends SyncTargetManager {
@Override
public boolean shouldContinueDownloading() {
return true;
return terminationCondition.shouldContinueDownload();
}
}

@ -0,0 +1,83 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fullsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Difficulty;
import java.util.function.BooleanSupplier;
import org.apache.tuweni.units.bigints.UInt256;
/** return true when termination condition is fullfilled and the full sync should stop */
public interface SyncTerminationCondition extends BooleanSupplier {
default boolean shouldContinueDownload() {
return !shouldStopDownload();
}
default boolean shouldStopDownload() {
return getAsBoolean();
}
/**
* When we want full sync to continue forever (for instance when we don't want to merge)
*
* @return always false therefore continues forever *
*/
static SyncTerminationCondition never() {
return () -> false;
}
/**
* When we want full sync to finish after reaching a difficulty. For instance when we merge on
* total terminal difficulty.
*
* @param targetDifficulty target difficulty to reach
* @param blockchain blockchain to reach the difficulty on
* @return true when blockchain reaches difficulty
*/
static SyncTerminationCondition difficulty(
final UInt256 targetDifficulty, final Blockchain blockchain) {
return difficulty(Difficulty.of(targetDifficulty), blockchain);
}
/**
* When we want full sync to finish after reaching a difficulty. For instance when we merge on
* total terminal difficulty.
*
* @param targetDifficulty target difficulty to reach
* @param blockchain blockchain to reach the difficulty on*
* @return true when blockchain reaches difficulty
*/
static SyncTerminationCondition difficulty(
final Difficulty targetDifficulty, final Blockchain blockchain) {
return () -> blockchain.getChainHead().getTotalDifficulty().greaterThan(targetDifficulty);
}
/**
* When we want the full sync to finish on a target hash. For instance when we reach a merge
* checkpoint.
*
* @param blockHash target hash to look for
* @param blockchain blockchain to reach the difficulty on
* @return true when blockchain contains target hash (target hash can be changed)
*/
static FlexibleBlockHashTerminalCondition blockHash(
final Hash blockHash, final Blockchain blockchain) {
return new FlexibleBlockHashTerminalCondition(blockHash, blockchain);
}
}

@ -32,6 +32,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import java.time.Duration;
import java.util.List;
@ -60,7 +61,8 @@ public class CheckpointRangeSourceTest {
peer,
commonAncestor,
CHECKPOINT_TIMEOUTS_PERMITTED,
Duration.ofMillis(1));
Duration.ofMillis(1),
SyncTerminationCondition.never());
@Test
public void shouldHaveNextWhenNoCheckpointsLoadedButSyncTargetCheckerSaysToContinue() {

@ -46,6 +46,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@ -91,6 +92,7 @@ public class PipelineChainDownloaderTest {
public void shouldStartChainDownloadWhenTargetSelected() {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.findSyncTarget()).thenReturn(selectTargetFuture);
when(syncTargetManager.shouldContinueDownloading()).thenReturn(true);
expectPipelineCreation(syncTarget, downloadPipeline);
when(scheduler.startPipeline(downloadPipeline)).thenReturn(new CompletableFuture<>());
chainDownloader.start();
@ -106,11 +108,11 @@ public class PipelineChainDownloaderTest {
public void shouldUpdateSyncStateWhenTargetSelected() {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.findSyncTarget()).thenReturn(selectTargetFuture);
when(syncTargetManager.shouldContinueDownloading()).thenReturn(true);
expectPipelineCreation(syncTarget, downloadPipeline);
when(scheduler.startPipeline(downloadPipeline)).thenReturn(new CompletableFuture<>());
chainDownloader.start();
verifyNoInteractions(downloadPipelineFactory);
selectTargetFuture.complete(syncTarget);
verify(syncState).setSyncTarget(peer1, commonAncestor);
@ -156,10 +158,9 @@ public class PipelineChainDownloaderTest {
verify(syncTargetManager).findSyncTarget();
when(syncTargetManager.shouldContinueDownloading()).thenReturn(false);
pipelineFuture.complete(null);
verify(syncTargetManager).shouldContinueDownloading();
verify(syncTargetManager, Mockito.times(2)).shouldContinueDownloading();
verify(syncState).clearSyncTarget();
verifyNoMoreInteractions(syncTargetManager);
assertThat(result).isCompleted();
@ -187,6 +188,7 @@ public class PipelineChainDownloaderTest {
@Test
public void shouldNotNestExceptionHandling() {
when(syncTargetManager.shouldContinueDownloading())
.thenReturn(true)
.thenReturn(true) // Allow continuing after first successful download
.thenReturn(false); // But not after finding the second sync target fails
@ -213,7 +215,7 @@ public class PipelineChainDownloaderTest {
// Should only need to check if it should continue twice.
// We'll wind up doing this check more than necessary if we keep wrapping additional exception
// handlers when restarting the sequence which wastes memory.
verify(syncTargetManager, times(2)).shouldContinueDownloading();
verify(syncTargetManager, times(3)).shouldContinueDownloading();
}
@Test

@ -50,7 +50,9 @@ public class FullImportBlockStepTest {
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec);
when(protocolSpec.getBlockImporter()).thenReturn(blockImporter);
importBlocksStep = new FullImportBlockStep(protocolSchedule, protocolContext, null);
importBlocksStep =
new FullImportBlockStep(
protocolSchedule, protocolContext, null, SyncTerminationCondition.never());
}
@Test

@ -80,7 +80,13 @@ public class FullSyncChainDownloaderForkTest {
private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) {
return FullSyncChainDownloader.create(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
SyncTerminationCondition.never());
}
private ChainDownloader downloader() {

@ -117,7 +117,13 @@ public class FullSyncChainDownloaderTest {
private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) {
return FullSyncChainDownloader.create(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
SyncTerminationCondition.never());
}
private ChainDownloader downloader() {

@ -0,0 +1,182 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fullsync;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class FullSyncChainDownloaderTotalTerminalDifficultyTest {
protected ProtocolSchedule protocolSchedule;
protected EthProtocolManager ethProtocolManager;
protected EthContext ethContext;
protected ProtocolContext protocolContext;
private SyncState syncState;
private BlockchainSetupUtil localBlockchainSetup;
protected MutableBlockchain localBlockchain;
private BlockchainSetupUtil otherBlockchainSetup;
protected Blockchain otherBlockchain;
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();
private static final Difficulty TARGET_TERMINAL_DIFFICULTY = Difficulty.of(1_000_000L);
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {{DataStorageFormat.BONSAI}, {DataStorageFormat.FOREST}});
}
private final DataStorageFormat storageFormat;
public FullSyncChainDownloaderTotalTerminalDifficultyTest(final DataStorageFormat storageFormat) {
this.storageFormat = storageFormat;
}
@Before
public void setupTest() {
localBlockchainSetup = BlockchainSetupUtil.forTesting(storageFormat);
localBlockchain = localBlockchainSetup.getBlockchain();
otherBlockchainSetup = BlockchainSetupUtil.forTesting(storageFormat);
otherBlockchain = otherBlockchainSetup.getBlockchain();
protocolSchedule = localBlockchainSetup.getProtocolSchedule();
protocolContext = localBlockchainSetup.getProtocolContext();
ethProtocolManager =
EthProtocolManagerTestUtil.create(
localBlockchain,
new EthScheduler(1, 1, 1, 1, new NoOpMetricsSystem()),
localBlockchainSetup.getWorldArchive(),
localBlockchainSetup.getTransactionPool(),
EthProtocolConfiguration.defaultConfig());
ethContext = ethProtocolManager.ethContext();
syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers());
}
@After
public void tearDown() {
ethProtocolManager.stop();
}
private ChainDownloader downloader(
final SynchronizerConfiguration syncConfig,
final SyncTerminationCondition terminalCondition) {
return FullSyncChainDownloader.create(
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
terminalCondition);
}
private SynchronizerConfiguration.Builder syncConfigBuilder() {
return SynchronizerConfiguration.builder();
}
@Test
public void syncsFullyAndStopsWhenTTDReached() {
otherBlockchainSetup.importFirstBlocks(30);
final long targetBlock = otherBlockchain.getChainHeadBlockNumber();
// Sanity check
assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber());
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(otherBlockchain);
final SynchronizerConfiguration syncConfig =
syncConfigBuilder().downloaderChainSegmentSize(1).downloaderParallelism(1).build();
final ChainDownloader downloader =
downloader(
syncConfig,
SyncTerminationCondition.difficulty(TARGET_TERMINAL_DIFFICULTY, localBlockchain));
final CompletableFuture<Void> future = downloader.start();
assertThat(future.isDone()).isFalse();
peer.respondWhileOtherThreadsWork(responder, () -> syncState.syncTarget().isEmpty());
assertThat(syncState.syncTarget()).isPresent();
assertThat(syncState.syncTarget().get().peer()).isEqualTo(peer.getEthPeer());
peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone());
assertThat(localBlockchain.getChainHead().getTotalDifficulty())
.isGreaterThan(TARGET_TERMINAL_DIFFICULTY);
assertThat(future.isDone()).isTrue();
}
@Test
public void syncsFullyAndContinuesWhenTTDNotSpecified() {
otherBlockchainSetup.importFirstBlocks(30);
final long targetBlock = otherBlockchain.getChainHeadBlockNumber();
// Sanity check
assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber());
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(otherBlockchain);
final SynchronizerConfiguration syncConfig =
syncConfigBuilder().downloaderChainSegmentSize(1).downloaderParallelism(1).build();
final ChainDownloader downloader = downloader(syncConfig, SyncTerminationCondition.never());
final CompletableFuture<Void> future = downloader.start();
assertThat(future.isDone()).isFalse();
peer.respondWhileOtherThreadsWork(responder, () -> !syncState.syncTarget().isPresent());
assertThat(syncState.syncTarget()).isPresent();
assertThat(syncState.syncTarget().get().peer()).isEqualTo(peer.getEthPeer());
peer.respondWhileOtherThreadsWork(
responder, () -> localBlockchain.getChainHeadBlockNumber() < targetBlock);
assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(targetBlock);
assertThat(future.isDone()).isFalse();
}
}

@ -92,7 +92,13 @@ public class FullSyncDownloaderTest {
private FullSyncDownloader downloader(final SynchronizerConfiguration syncConfig) {
return new FullSyncDownloader(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
SyncTerminationCondition.never());
}
@Test

@ -97,7 +97,8 @@ public class FullSyncTargetManagerTest {
protocolSchedule,
protocolContext,
ethContext,
new NoOpMetricsSystem());
new NoOpMetricsSystem(),
SyncTerminationCondition.never());
}
@After

@ -20,6 +20,7 @@ import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* Naive implementation of Synchronizer used by retesteth. Because retesteth is not implemented in
@ -28,7 +29,9 @@ import java.util.Optional;
*/
public class DummySynchronizer implements Synchronizer {
@Override
public void start() {}
public CompletableFuture<Void> start() {
return CompletableFuture.completedFuture(null);
}
@Override
public void stop() {}

Loading…
Cancel
Save