From 605ff69d3ff92d4cda60db06f76ba59f7afb17dc Mon Sep 17 00:00:00 2001 From: Jiri Peinlich Date: Wed, 2 Mar 2022 22:37:14 +0000 Subject: [PATCH] FullSync Future should stop when total terminal difficulty is reached (#3423) * FullSync Future should stop when total terminal difficulty is reached Signed-off-by: Jiri Peinlich --- .../controller/BesuControllerBuilder.java | 12 +- .../besu/ethereum/core/Synchronizer.java | 3 +- .../ethereum/eth/manager/EthMessages.java | 12 +- .../eth/sync/BlockPropagationManager.java | 38 +++- .../eth/sync/CheckpointRangeSource.java | 21 +- .../eth/sync/DefaultSynchronizer.java | 87 +++++---- .../eth/sync/PipelineChainDownloader.java | 3 + .../FastSyncDownloadPipelineFactory.java | 4 +- .../FlexibleBlockHashTerminalCondition.java | 37 ++++ .../sync/fullsync/FullImportBlockStep.java | 9 +- .../fullsync/FullSyncChainDownloader.java | 17 +- .../FullSyncDownloadPipelineFactory.java | 18 +- .../eth/sync/fullsync/FullSyncDownloader.java | 17 +- .../sync/fullsync/FullSyncTargetManager.java | 7 +- .../fullsync/SyncTerminationCondition.java | 83 ++++++++ .../eth/sync/CheckpointRangeSourceTest.java | 4 +- .../eth/sync/PipelineChainDownloaderTest.java | 10 +- .../fullsync/FullImportBlockStepTest.java | 4 +- .../FullSyncChainDownloaderForkTest.java | 8 +- .../fullsync/FullSyncChainDownloaderTest.java | 8 +- ...DownloaderTotalTerminalDifficultyTest.java | 182 ++++++++++++++++++ .../sync/fullsync/FullSyncDownloaderTest.java | 8 +- .../fullsync/FullSyncTargetManagerTest.java | 3 +- .../ethereum/retesteth/DummySynchronizer.java | 5 +- 24 files changed, 517 insertions(+), 83 deletions(-) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FlexibleBlockHashTerminalCondition.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/SyncTerminationCondition.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 336ea1c8ba..680ad14ef4 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -51,6 +51,7 @@ import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValida import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; @@ -371,7 +372,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides syncState, dataDirectory, clock, - metricsSystem); + metricsSystem, + getFullSyncTerminationCondition(protocolContext.getBlockchain())); final MiningCoordinator miningCoordinator = createMiningCoordinator( @@ -416,6 +418,14 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides additionalPluginServices); } + protected SyncTerminationCondition getFullSyncTerminationCondition(final Blockchain blockchain) { + return genesisConfig + .getConfigOptions() + .getTerminalTotalDifficulty() + .map(difficulty -> SyncTerminationCondition.difficulty(difficulty, blockchain)) + .orElse(SyncTerminationCondition.never()); + } + protected void prepForBuild() {} protected JsonRpcMethods createAdditionalJsonRpcMethodFactory( diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java index 24f60f0ae8..72f9de06dc 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.services.BesuEvents; import java.util.Optional; +import java.util.concurrent.CompletableFuture; /** Provides an interface to block synchronization processes. */ public interface Synchronizer { @@ -25,7 +26,7 @@ public interface Synchronizer { // Default tolerance used to determine whether or not this node is "in sync" long DEFAULT_IN_SYNC_TOLERANCE = 5; - void start(); + CompletableFuture start(); void stop(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthMessages.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthMessages.java index ae4cee41a0..f1eca234bc 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthMessages.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthMessages.java @@ -41,8 +41,16 @@ public class EthMessages { messageResponseConstructor.response(ethMessage.getData())); } - public void subscribe(final int messageCode, final MessageCallback callback) { - listenersByCode.computeIfAbsent(messageCode, key -> Subscribers.create()).subscribe(callback); + public long subscribe(final int messageCode, final MessageCallback callback) { + return listenersByCode + .computeIfAbsent(messageCode, key -> Subscribers.create()) + .subscribe(callback); + } + + public void unsubsribe(final long id) { + for (Subscribers subscribers : listenersByCode.values()) { + subscribers.unsubscribe(id); + } } public void registerResponseConstructor( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java index bcde21b494..2b82ddc883 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java @@ -82,6 +82,9 @@ public class BlockPropagationManager { private final Set requestedNonAnnouncedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final PendingBlocksManager pendingBlocksManager; + private Optional onBlockAddedSId = Optional.empty(); + private Optional newBlockSId; + private Optional newBlockHashesSId; BlockPropagationManager( final SynchronizerConfiguration config, @@ -111,12 +114,37 @@ public class BlockPropagationManager { } } + public void stop() { + if (started.get()) { + clearListeners(); + started.set(false); + } else { + LOG.warn("Attempted to stop when we are not even running..."); + } + } + private void setupListeners() { - protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded); - ethContext.getEthMessages().subscribe(EthPV62.NEW_BLOCK, this::handleNewBlockFromNetwork); - ethContext - .getEthMessages() - .subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork); + onBlockAddedSId = + Optional.of(protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded)); + newBlockSId = + Optional.of( + ethContext + .getEthMessages() + .subscribe(EthPV62.NEW_BLOCK, this::handleNewBlockFromNetwork)); + newBlockHashesSId = + Optional.of( + ethContext + .getEthMessages() + .subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork)); + } + + private void clearListeners() { + onBlockAddedSId.ifPresent(id -> protocolContext.getBlockchain().removeObserver(id)); + newBlockSId.ifPresent(id -> ethContext.getEthMessages().unsubsribe(id)); + newBlockHashesSId.ifPresent(id -> ethContext.getEthMessages().unsubsribe(id)); + onBlockAddedSId = Optional.empty(); + newBlockSId = Optional.empty(); + newBlockHashesSId = Optional.empty(); } private void onBlockAdded(final BlockAddedEvent blockAddedEvent) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSource.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSource.java index 523d8b71f3..977c189b0a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSource.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSource.java @@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; import java.time.Duration; import java.util.ArrayDeque; @@ -45,6 +46,7 @@ public class CheckpointRangeSource implements Iterator { private final EthScheduler ethScheduler; private final int checkpointTimeoutsPermitted; private final Duration newHeaderWaitDuration; + private final SyncTerminationCondition terminationCondition; private final Queue retrievedRanges = new ArrayDeque<>(); private BlockHeader lastRangeEnd; @@ -59,7 +61,8 @@ public class CheckpointRangeSource implements Iterator { final EthScheduler ethScheduler, final EthPeer peer, final BlockHeader commonAncestor, - final int checkpointTimeoutsPermitted) { + final int checkpointTimeoutsPermitted, + final SyncTerminationCondition terminationCondition) { this( checkpointFetcher, syncTargetChecker, @@ -67,7 +70,8 @@ public class CheckpointRangeSource implements Iterator { peer, commonAncestor, checkpointTimeoutsPermitted, - Duration.ofSeconds(5)); + Duration.ofSeconds(5), + terminationCondition); } CheckpointRangeSource( @@ -77,7 +81,8 @@ public class CheckpointRangeSource implements Iterator { final EthPeer peer, final BlockHeader commonAncestor, final int checkpointTimeoutsPermitted, - final Duration newHeaderWaitDuration) { + final Duration newHeaderWaitDuration, + final SyncTerminationCondition terminationCondition) { this.checkpointFetcher = checkpointFetcher; this.syncTargetChecker = syncTargetChecker; this.ethScheduler = ethScheduler; @@ -85,14 +90,16 @@ public class CheckpointRangeSource implements Iterator { this.lastRangeEnd = commonAncestor; this.checkpointTimeoutsPermitted = checkpointTimeoutsPermitted; this.newHeaderWaitDuration = newHeaderWaitDuration; + this.terminationCondition = terminationCondition; } @Override public boolean hasNext() { - return !retrievedRanges.isEmpty() - || (requestFailureCount < checkpointTimeoutsPermitted - && syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd) - && !reachedEndOfCheckpoints); + return terminationCondition.shouldContinueDownload() + && (!retrievedRanges.isEmpty() + || (requestFailureCount < checkpointTimeoutsPermitted + && syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd) + && !reachedEndOfCheckpoints)); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 59440e65d4..76a3ef734e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -21,9 +21,9 @@ import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastDownloaderFactory; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader; -import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncException; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; import org.hyperledger.besu.ethereum.eth.sync.fullsync.FullSyncDownloader; +import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; @@ -33,11 +33,11 @@ import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener; import org.hyperledger.besu.plugin.services.MetricsSystem; -import org.hyperledger.besu.util.ExceptionUtils; import java.nio.file.Path; import java.time.Clock; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; @@ -66,7 +66,8 @@ public class DefaultSynchronizer implements Synchronizer { final SyncState syncState, final Path dataDirectory, final Clock clock, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final SyncTerminationCondition terminationCondition) { this.maybePruner = maybePruner; this.syncState = syncState; @@ -91,7 +92,13 @@ public class DefaultSynchronizer implements Synchronizer { this.fullSyncDownloader = new FullSyncDownloader( - syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem); + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + syncState, + metricsSystem, + terminationCondition); this.fastSyncDownloader = FastDownloaderFactory.create( syncConfig, @@ -123,25 +130,25 @@ public class DefaultSynchronizer implements Synchronizer { } @Override - public void start() { + public CompletableFuture start() { if (running.compareAndSet(false, true)) { LOG.info("Starting synchronizer."); blockPropagationManager.start(); + CompletableFuture future; if (fastSyncDownloader.isPresent()) { - fastSyncDownloader - .get() - .start() - .whenComplete(this::handleFastSyncResult) - .exceptionally( - ex -> { - LOG.warn("Exiting FastSync process"); - System.exit(0); - return null; - }); + future = fastSyncDownloader.get().start().thenCompose(this::handleFastSyncResult); } else { - startFullSync(); + future = startFullSync(); } + future = + future.thenApply( + unused -> { + blockPropagationManager.stop(); + running.set(false); + return null; + }); + return future; } else { throw new IllegalStateException("Attempt to start an already started synchronizer."); } @@ -154,6 +161,7 @@ public class DefaultSynchronizer implements Synchronizer { fastSyncDownloader.ifPresent(FastSyncDownloader::stop); fullSyncDownloader.stop(); maybePruner.ifPresent(Pruner::stop); + blockPropagationManager.stop(); } } @@ -164,36 +172,37 @@ public class DefaultSynchronizer implements Synchronizer { } } - private void handleFastSyncResult(final FastSyncState result, final Throwable error) { + private CompletableFuture handleFastSyncResult(final FastSyncState result) { if (!running.get()) { // We've been shutdown which will have triggered the fast sync future to complete - return; + return CompletableFuture.completedFuture(null); } fastSyncDownloader.ifPresent(FastSyncDownloader::deleteFastSyncState); - final Throwable rootCause = ExceptionUtils.rootCause(error); - if (rootCause instanceof FastSyncException) { - LOG.error( - "Fast sync failed ({}), please try again.", ((FastSyncException) rootCause).getError()); - throw new FastSyncException(rootCause); - } else if (error != null) { - LOG.error("Fast sync failed, please try again.", error); - throw new FastSyncException(error); - } else { - result - .getPivotBlockHeader() - .ifPresent( - blockHeader -> - protocolContext.getWorldStateArchive().setArchiveStateUnSafe(blockHeader)); - LOG.info( - "Fast sync completed successfully with pivot block {}", - result.getPivotBlockNumber().getAsLong()); - } - startFullSync(); + result + .getPivotBlockHeader() + .ifPresent( + blockHeader -> + protocolContext.getWorldStateArchive().setArchiveStateUnSafe(blockHeader)); + LOG.info( + "Fast sync completed successfully with pivot block {}", + result.getPivotBlockNumber().getAsLong()); + return startFullSync(); } - private void startFullSync() { + private CompletableFuture startFullSync() { maybePruner.ifPresent(Pruner::start); - fullSyncDownloader.start(); + return fullSyncDownloader + .start() + .thenCompose( + unused -> { + maybePruner.ifPresent(Pruner::stop); + return null; + }) + .thenApply( + o -> { + maybePruner.ifPresent(Pruner::stop); + return null; + }); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java index f32804ba7f..aaeb16f2c1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java @@ -153,6 +153,9 @@ public class PipelineChainDownloader implements ChainDownloader { return CompletableFuture.failedFuture( new CancellationException("Chain download was cancelled")); } + if (!syncTargetManager.shouldContinueDownloading()) { + return CompletableFuture.completedFuture(null); + } syncState.setSyncTarget(target.peer(), target.commonAncestor()); currentDownloadPipeline = downloadPipelineFactory.createDownloadPipelineForSyncTarget(target); debugLambda( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java index 11bd55c6f5..39e0f297c3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java @@ -33,6 +33,7 @@ import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep; import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep; import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.metrics.BesuMetricCategory; @@ -105,7 +106,8 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory ethContext.getScheduler(), target.peer(), target.commonAncestor(), - syncConfig.getDownloaderCheckpointTimeoutsPermitted()); + syncConfig.getDownloaderCheckpointTimeoutsPermitted(), + SyncTerminationCondition.never()); final DownloadHeadersStep downloadHeadersStep = new DownloadHeadersStep( protocolSchedule, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FlexibleBlockHashTerminalCondition.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FlexibleBlockHashTerminalCondition.java new file mode 100644 index 0000000000..d0ef7c64fc --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FlexibleBlockHashTerminalCondition.java @@ -0,0 +1,37 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.fullsync; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.chain.Blockchain; + +public class FlexibleBlockHashTerminalCondition implements SyncTerminationCondition { + private Hash blockHash; + private final Blockchain blockchain; + + public FlexibleBlockHashTerminalCondition(final Hash blockHash, final Blockchain blockchain) { + this.blockHash = blockHash; + this.blockchain = blockchain; + } + + public synchronized void setBlockHash(final Hash blockHash) { + this.blockHash = blockHash; + } + + @Override + public synchronized boolean getAsBoolean() { + return blockchain.contains(blockHash); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStep.java index ac3faf6d33..fc37ecaac7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStep.java @@ -35,18 +35,25 @@ public class FullImportBlockStep implements Consumer { private final EthContext ethContext; private long gasAccumulator = 0; private long lastReportMillis = 0; + private final SyncTerminationCondition fullSyncTerminationCondition; public FullImportBlockStep( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, - final EthContext ethContext) { + final EthContext ethContext, + final SyncTerminationCondition syncTerminationCondition) { this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; + this.fullSyncTerminationCondition = syncTerminationCondition; } @Override public void accept(final Block block) { + if (fullSyncTerminationCondition.shouldStopDownload()) { + LOG.debug("Not importing another block, because terminal condition was reached."); + return; + } final long blockNumber = block.getHeader().getNumber(); final String blockHash = block.getHash().toHexString(); final BlockImporter importer = diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java index 9ae54249a1..f4a7011626 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java @@ -32,17 +32,28 @@ public class FullSyncChainDownloader { final ProtocolContext protocolContext, final EthContext ethContext, final SyncState syncState, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final SyncTerminationCondition terminationCondition) { final FullSyncTargetManager syncTargetManager = new FullSyncTargetManager( - config, protocolSchedule, protocolContext, ethContext, metricsSystem); + config, + protocolSchedule, + protocolContext, + ethContext, + metricsSystem, + terminationCondition); return new PipelineChainDownloader( syncState, syncTargetManager, new FullSyncDownloadPipelineFactory( - config, protocolSchedule, protocolContext, ethContext, metricsSystem), + config, + protocolSchedule, + protocolContext, + ethContext, + metricsSystem, + terminationCondition), ethContext.getScheduler(), metricsSystem); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java index ac12ddbb49..85b3bbbe94 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java @@ -48,18 +48,21 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory private final ValidationPolicy detachedValidationPolicy = () -> HeaderValidationMode.DETACHED_ONLY; private final BetterSyncTargetEvaluator betterSyncTargetEvaluator; + private final SyncTerminationCondition fullSyncTerminationCondition; public FullSyncDownloadPipelineFactory( final SynchronizerConfiguration syncConfig, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final SyncTerminationCondition syncTerminationCondition) { this.syncConfig = syncConfig; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; this.metricsSystem = metricsSystem; + this.fullSyncTerminationCondition = syncTerminationCondition; betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers()); } @@ -75,7 +78,8 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory ethContext.getScheduler(), target.peer(), target.commonAncestor(), - syncConfig.getDownloaderCheckpointTimeoutsPermitted()); + syncConfig.getDownloaderCheckpointTimeoutsPermitted(), + fullSyncTerminationCondition); final DownloadHeadersStep downloadHeadersStep = new DownloadHeadersStep( protocolSchedule, @@ -91,7 +95,8 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem); final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep(); final FullImportBlockStep importBlockStep = - new FullImportBlockStep(protocolSchedule, protocolContext, ethContext); + new FullImportBlockStep( + protocolSchedule, protocolContext, ethContext, fullSyncTerminationCondition); return PipelineBuilder.createPipelineFrom( "fetchCheckpoints", @@ -115,18 +120,19 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory private boolean shouldContinueDownloadingFromPeer( final EthPeer peer, final BlockHeader lastCheckpointHeader) { + final boolean shouldTerminate = fullSyncTerminationCondition.shouldStopDownload(); final boolean caughtUpToPeer = peer.chainState().getEstimatedHeight() <= lastCheckpointHeader.getNumber(); final boolean isDisconnected = peer.isDisconnected(); final boolean shouldSwitchSyncTarget = betterSyncTargetEvaluator.shouldSwitchSyncTarget(peer); - LOG.debug( - "shouldContinueDownloadingFromPeer? {}, disconnected {}, caughtUp {}, shouldSwitchSyncTarget {}", + "shouldTerminate {}, shouldContinueDownloadingFromPeer? {}, disconnected {}, caughtUp {}, shouldSwitchSyncTarget {}", + shouldTerminate, peer, isDisconnected, caughtUpToPeer, shouldSwitchSyncTarget); - return !isDisconnected && !caughtUpToPeer && !shouldSwitchSyncTarget; + return !shouldTerminate && !isDisconnected && !caughtUpToPeer && !shouldSwitchSyncTarget; } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java index cdcbf861ae..4484c19446 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java @@ -23,6 +23,8 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; +import java.util.concurrent.CompletableFuture; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,19 +42,26 @@ public class FullSyncDownloader { final ProtocolContext protocolContext, final EthContext ethContext, final SyncState syncState, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final SyncTerminationCondition terminationCondition) { this.syncConfig = syncConfig; this.protocolContext = protocolContext; this.syncState = syncState; this.chainDownloader = FullSyncChainDownloader.create( - syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem); + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + syncState, + metricsSystem, + terminationCondition); } - public void start() { + public CompletableFuture start() { LOG.info("Starting full sync."); - chainDownloader.start(); + return chainDownloader.start(); } public void stop() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManager.java index 404a595de8..5c61caa17e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManager.java @@ -39,16 +39,19 @@ class FullSyncTargetManager extends SyncTargetManager { private static final Logger LOG = LoggerFactory.getLogger(FullSyncTargetManager.class); private final ProtocolContext protocolContext; private final EthContext ethContext; + private final SyncTerminationCondition terminationCondition; FullSyncTargetManager( final SynchronizerConfiguration config, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final SyncTerminationCondition terminationCondition) { super(config, protocolSchedule, protocolContext, ethContext, metricsSystem); this.protocolContext = protocolContext; this.ethContext = ethContext; + this.terminationCondition = terminationCondition; } @Override @@ -105,6 +108,6 @@ class FullSyncTargetManager extends SyncTargetManager { @Override public boolean shouldContinueDownloading() { - return true; + return terminationCondition.shouldContinueDownload(); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/SyncTerminationCondition.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/SyncTerminationCondition.java new file mode 100644 index 0000000000..ae1184fc14 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/SyncTerminationCondition.java @@ -0,0 +1,83 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.fullsync; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.chain.Blockchain; +import org.hyperledger.besu.ethereum.core.Difficulty; + +import java.util.function.BooleanSupplier; + +import org.apache.tuweni.units.bigints.UInt256; + +/** return true when termination condition is fullfilled and the full sync should stop */ +public interface SyncTerminationCondition extends BooleanSupplier { + + default boolean shouldContinueDownload() { + return !shouldStopDownload(); + } + + default boolean shouldStopDownload() { + return getAsBoolean(); + } + + /** + * When we want full sync to continue forever (for instance when we don't want to merge) + * + * @return always false therefore continues forever * + */ + static SyncTerminationCondition never() { + return () -> false; + } + + /** + * When we want full sync to finish after reaching a difficulty. For instance when we merge on + * total terminal difficulty. + * + * @param targetDifficulty target difficulty to reach + * @param blockchain blockchain to reach the difficulty on + * @return true when blockchain reaches difficulty + */ + static SyncTerminationCondition difficulty( + final UInt256 targetDifficulty, final Blockchain blockchain) { + return difficulty(Difficulty.of(targetDifficulty), blockchain); + } + + /** + * When we want full sync to finish after reaching a difficulty. For instance when we merge on + * total terminal difficulty. + * + * @param targetDifficulty target difficulty to reach + * @param blockchain blockchain to reach the difficulty on* + * @return true when blockchain reaches difficulty + */ + static SyncTerminationCondition difficulty( + final Difficulty targetDifficulty, final Blockchain blockchain) { + return () -> blockchain.getChainHead().getTotalDifficulty().greaterThan(targetDifficulty); + } + + /** + * When we want the full sync to finish on a target hash. For instance when we reach a merge + * checkpoint. + * + * @param blockHash target hash to look for + * @param blockchain blockchain to reach the difficulty on + * @return true when blockchain contains target hash (target hash can be changed) + */ + static FlexibleBlockHashTerminalCondition blockHash( + final Hash blockHash, final Blockchain blockchain) { + return new FlexibleBlockHashTerminalCondition(blockHash, blockchain); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSourceTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSourceTest.java index 3493a94885..ea1d85dd1c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSourceTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointRangeSourceTest.java @@ -32,6 +32,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; +import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; import java.time.Duration; import java.util.List; @@ -60,7 +61,8 @@ public class CheckpointRangeSourceTest { peer, commonAncestor, CHECKPOINT_TIMEOUTS_PERMITTED, - Duration.ofMillis(1)); + Duration.ofMillis(1), + SyncTerminationCondition.never()); @Test public void shouldHaveNextWhenNoCheckpointsLoadedButSyncTargetCheckerSaysToContinue() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloaderTest.java index 60297938bb..b06b9f8ec6 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloaderTest.java @@ -46,6 +46,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -91,6 +92,7 @@ public class PipelineChainDownloaderTest { public void shouldStartChainDownloadWhenTargetSelected() { final CompletableFuture selectTargetFuture = new CompletableFuture<>(); when(syncTargetManager.findSyncTarget()).thenReturn(selectTargetFuture); + when(syncTargetManager.shouldContinueDownloading()).thenReturn(true); expectPipelineCreation(syncTarget, downloadPipeline); when(scheduler.startPipeline(downloadPipeline)).thenReturn(new CompletableFuture<>()); chainDownloader.start(); @@ -106,11 +108,11 @@ public class PipelineChainDownloaderTest { public void shouldUpdateSyncStateWhenTargetSelected() { final CompletableFuture selectTargetFuture = new CompletableFuture<>(); when(syncTargetManager.findSyncTarget()).thenReturn(selectTargetFuture); + when(syncTargetManager.shouldContinueDownloading()).thenReturn(true); expectPipelineCreation(syncTarget, downloadPipeline); when(scheduler.startPipeline(downloadPipeline)).thenReturn(new CompletableFuture<>()); chainDownloader.start(); verifyNoInteractions(downloadPipelineFactory); - selectTargetFuture.complete(syncTarget); verify(syncState).setSyncTarget(peer1, commonAncestor); @@ -156,10 +158,9 @@ public class PipelineChainDownloaderTest { verify(syncTargetManager).findSyncTarget(); - when(syncTargetManager.shouldContinueDownloading()).thenReturn(false); pipelineFuture.complete(null); - verify(syncTargetManager).shouldContinueDownloading(); + verify(syncTargetManager, Mockito.times(2)).shouldContinueDownloading(); verify(syncState).clearSyncTarget(); verifyNoMoreInteractions(syncTargetManager); assertThat(result).isCompleted(); @@ -187,6 +188,7 @@ public class PipelineChainDownloaderTest { @Test public void shouldNotNestExceptionHandling() { when(syncTargetManager.shouldContinueDownloading()) + .thenReturn(true) .thenReturn(true) // Allow continuing after first successful download .thenReturn(false); // But not after finding the second sync target fails @@ -213,7 +215,7 @@ public class PipelineChainDownloaderTest { // Should only need to check if it should continue twice. // We'll wind up doing this check more than necessary if we keep wrapping additional exception // handlers when restarting the sequence which wastes memory. - verify(syncTargetManager, times(2)).shouldContinueDownloading(); + verify(syncTargetManager, times(3)).shouldContinueDownloading(); } @Test diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStepTest.java index 91e2898484..eac021d0df 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullImportBlockStepTest.java @@ -50,7 +50,9 @@ public class FullImportBlockStepTest { when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec); when(protocolSpec.getBlockImporter()).thenReturn(blockImporter); - importBlocksStep = new FullImportBlockStep(protocolSchedule, protocolContext, null); + importBlocksStep = + new FullImportBlockStep( + protocolSchedule, protocolContext, null, SyncTerminationCondition.never()); } @Test diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java index b87bea3569..3c877bd230 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java @@ -80,7 +80,13 @@ public class FullSyncChainDownloaderForkTest { private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) { return FullSyncChainDownloader.create( - syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem); + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + syncState, + metricsSystem, + SyncTerminationCondition.never()); } private ChainDownloader downloader() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java index 0fd530673a..24d9fa2e30 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java @@ -117,7 +117,13 @@ public class FullSyncChainDownloaderTest { private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) { return FullSyncChainDownloader.create( - syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem); + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + syncState, + metricsSystem, + SyncTerminationCondition.never()); } private ChainDownloader downloader() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java new file mode 100644 index 0000000000..b80e94450d --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java @@ -0,0 +1,182 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.fullsync; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.hyperledger.besu.ethereum.ProtocolContext; +import org.hyperledger.besu.ethereum.chain.Blockchain; +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; +import org.hyperledger.besu.ethereum.core.Difficulty; +import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; +import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; +import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.hyperledger.besu.plugin.services.MetricsSystem; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class FullSyncChainDownloaderTotalTerminalDifficultyTest { + + protected ProtocolSchedule protocolSchedule; + protected EthProtocolManager ethProtocolManager; + protected EthContext ethContext; + protected ProtocolContext protocolContext; + private SyncState syncState; + + private BlockchainSetupUtil localBlockchainSetup; + protected MutableBlockchain localBlockchain; + private BlockchainSetupUtil otherBlockchainSetup; + protected Blockchain otherBlockchain; + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); + private static final Difficulty TARGET_TERMINAL_DIFFICULTY = Difficulty.of(1_000_000L); + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] {{DataStorageFormat.BONSAI}, {DataStorageFormat.FOREST}}); + } + + private final DataStorageFormat storageFormat; + + public FullSyncChainDownloaderTotalTerminalDifficultyTest(final DataStorageFormat storageFormat) { + this.storageFormat = storageFormat; + } + + @Before + public void setupTest() { + localBlockchainSetup = BlockchainSetupUtil.forTesting(storageFormat); + localBlockchain = localBlockchainSetup.getBlockchain(); + otherBlockchainSetup = BlockchainSetupUtil.forTesting(storageFormat); + otherBlockchain = otherBlockchainSetup.getBlockchain(); + + protocolSchedule = localBlockchainSetup.getProtocolSchedule(); + protocolContext = localBlockchainSetup.getProtocolContext(); + ethProtocolManager = + EthProtocolManagerTestUtil.create( + localBlockchain, + new EthScheduler(1, 1, 1, 1, new NoOpMetricsSystem()), + localBlockchainSetup.getWorldArchive(), + localBlockchainSetup.getTransactionPool(), + EthProtocolConfiguration.defaultConfig()); + ethContext = ethProtocolManager.ethContext(); + syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); + } + + @After + public void tearDown() { + ethProtocolManager.stop(); + } + + private ChainDownloader downloader( + final SynchronizerConfiguration syncConfig, + final SyncTerminationCondition terminalCondition) { + return FullSyncChainDownloader.create( + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + syncState, + metricsSystem, + terminalCondition); + } + + private SynchronizerConfiguration.Builder syncConfigBuilder() { + return SynchronizerConfiguration.builder(); + } + + @Test + public void syncsFullyAndStopsWhenTTDReached() { + otherBlockchainSetup.importFirstBlocks(30); + final long targetBlock = otherBlockchain.getChainHeadBlockNumber(); + // Sanity check + assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber()); + + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder(otherBlockchain); + + final SynchronizerConfiguration syncConfig = + syncConfigBuilder().downloaderChainSegmentSize(1).downloaderParallelism(1).build(); + final ChainDownloader downloader = + downloader( + syncConfig, + SyncTerminationCondition.difficulty(TARGET_TERMINAL_DIFFICULTY, localBlockchain)); + final CompletableFuture future = downloader.start(); + + assertThat(future.isDone()).isFalse(); + + peer.respondWhileOtherThreadsWork(responder, () -> syncState.syncTarget().isEmpty()); + assertThat(syncState.syncTarget()).isPresent(); + assertThat(syncState.syncTarget().get().peer()).isEqualTo(peer.getEthPeer()); + + peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); + + assertThat(localBlockchain.getChainHead().getTotalDifficulty()) + .isGreaterThan(TARGET_TERMINAL_DIFFICULTY); + + assertThat(future.isDone()).isTrue(); + } + + @Test + public void syncsFullyAndContinuesWhenTTDNotSpecified() { + otherBlockchainSetup.importFirstBlocks(30); + final long targetBlock = otherBlockchain.getChainHeadBlockNumber(); + // Sanity check + assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber()); + + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder(otherBlockchain); + + final SynchronizerConfiguration syncConfig = + syncConfigBuilder().downloaderChainSegmentSize(1).downloaderParallelism(1).build(); + final ChainDownloader downloader = downloader(syncConfig, SyncTerminationCondition.never()); + final CompletableFuture future = downloader.start(); + + assertThat(future.isDone()).isFalse(); + + peer.respondWhileOtherThreadsWork(responder, () -> !syncState.syncTarget().isPresent()); + assertThat(syncState.syncTarget()).isPresent(); + assertThat(syncState.syncTarget().get().peer()).isEqualTo(peer.getEthPeer()); + + peer.respondWhileOtherThreadsWork( + responder, () -> localBlockchain.getChainHeadBlockNumber() < targetBlock); + + assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(targetBlock); + + assertThat(future.isDone()).isFalse(); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java index bab67345bd..1214c5da1e 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java @@ -92,7 +92,13 @@ public class FullSyncDownloaderTest { private FullSyncDownloader downloader(final SynchronizerConfiguration syncConfig) { return new FullSyncDownloader( - syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem); + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + syncState, + metricsSystem, + SyncTerminationCondition.never()); } @Test diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java index e99fff3a1f..4a766c5d28 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java @@ -97,7 +97,8 @@ public class FullSyncTargetManagerTest { protocolSchedule, protocolContext, ethContext, - new NoOpMetricsSystem()); + new NoOpMetricsSystem(), + SyncTerminationCondition.never()); } @After diff --git a/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/DummySynchronizer.java b/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/DummySynchronizer.java index 857df96d15..1f94d49e24 100644 --- a/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/DummySynchronizer.java +++ b/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/DummySynchronizer.java @@ -20,6 +20,7 @@ import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.services.BesuEvents; import java.util.Optional; +import java.util.concurrent.CompletableFuture; /** * Naive implementation of Synchronizer used by retesteth. Because retesteth is not implemented in @@ -28,7 +29,9 @@ import java.util.Optional; */ public class DummySynchronizer implements Synchronizer { @Override - public void start() {} + public CompletableFuture start() { + return CompletableFuture.completedFuture(null); + } @Override public void stop() {}