Optimize pivot block selector on PoS networks (#4488)

* Refactor to optimize pivot block selector on PoS networks

On PoS network we use a pivot block sent by the Consensus Layer, so we do
not need peers, and so all the logic for selecting the pivot block from peers
has been moved from FastSyncActions to PivotSelectorFromPeers.
We do not need anymore the TransictionPeerSelector, and the --fast-sync-min-peers
applies only to PoW networks.

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/4492/head
Fabio Di Fabio 2 years ago committed by GitHub
parent 84eab6977e
commit 839f39d68d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 20
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  3. 1
      besu/src/main/java/org/hyperledger/besu/cli/DefaultCommandValues.java
  4. 13
      besu/src/main/java/org/hyperledger/besu/cli/config/NetworkName.java
  5. 20
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  6. 44
      besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
  7. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PivotBlockSelector.java
  8. 169
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java
  9. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java
  10. 27
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromFinalizedBlock.java
  11. 88
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromPeers.java
  12. 95
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/TransitionPivotSelector.java
  13. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManager.java
  14. 85
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java
  15. 55
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java
  16. 36
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockManagerTest.java

@ -5,6 +5,7 @@
### Additions and Improvements
- Bring GraphQL into compliance with execution-api specs [#4112](https://github.com/hyperledger/besu/pull/4112)
- Refactor unverified forkchoice event [#4487](https://github.com/hyperledger/besu/pull/4487)
- Optimize pivot block selector on PoS networks [#4488](https://github.com/hyperledger/besu/pull/4488)
- Improve UX of initial sync logs, pushing not relevant logs to debug level [#4486](https://github.com/hyperledger/besu/pull/4486)
### Bug Fixes

@ -20,7 +20,6 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.hyperledger.besu.cli.DefaultCommandValues.getDefaultBesuDataPath;
import static org.hyperledger.besu.cli.config.NetworkName.MAINNET;
import static org.hyperledger.besu.cli.config.NetworkName.isMergedNetwork;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPENDENCY_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATED_AND_USELESS_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATION_WARNING_MSG;
@ -508,12 +507,8 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
names = {"--fast-sync-min-peers"},
paramLabel = MANDATORY_INTEGER_FORMAT_HELP,
description =
"Minimum number of peers required before starting fast sync. (default pre-merge: "
+ FAST_SYNC_MIN_PEER_COUNT
+ " and post-merge: "
+ FAST_SYNC_MIN_PEER_COUNT_POST_MERGE
+ ")")
private final Integer fastSyncMinPeerCount = null;
"Minimum number of peers required before starting fast sync. Has only effect on PoW networks. (default: ${DEFAULT-VALUE})")
private final Integer fastSyncMinPeerCount = FAST_SYNC_MIN_PEER_COUNT;
@Option(
names = {"--network"},
@ -2795,19 +2790,10 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
}
private SynchronizerConfiguration buildSyncConfig() {
Integer fastSyncMinPeers = fastSyncMinPeerCount;
if (fastSyncMinPeers == null) {
if (isMergedNetwork(network)) {
fastSyncMinPeers = FAST_SYNC_MIN_PEER_COUNT_POST_MERGE;
} else {
fastSyncMinPeers = FAST_SYNC_MIN_PEER_COUNT;
}
}
return unstableSynchronizerOptions
.toDomainObject()
.syncMode(syncMode)
.fastSyncMinimumPeerCount(fastSyncMinPeers)
.fastSyncMinimumPeerCount(fastSyncMinPeerCount)
.build();
}

@ -57,7 +57,6 @@ public interface DefaultCommandValues {
NatMethod DEFAULT_NAT_METHOD = NatMethod.AUTO;
JwtAlgorithm DEFAULT_JWT_ALGORITHM = JwtAlgorithm.RS256;
int FAST_SYNC_MIN_PEER_COUNT = 5;
int FAST_SYNC_MIN_PEER_COUNT_POST_MERGE = 1;
int DEFAULT_MAX_PEERS = 25;
int DEFAULT_P2P_PEER_LOWER_BOUND = 25;
int DEFAULT_HTTP_MAX_CONNECTIONS = 80;

@ -86,17 +86,4 @@ public enum NetworkName {
public Optional<String> getDeprecationDate() {
return Optional.ofNullable(deprecationDate);
}
public static boolean isMergedNetwork(final NetworkName networkName) {
switch (networkName) {
case MAINNET:
case GOERLI:
case ROPSTEN:
case SEPOLIA:
case KILN:
return true;
default:
return false;
}
}
}

@ -63,7 +63,6 @@ import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromFinalizedBlock;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromPeers;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.TransitionPivotSelector;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.ImmutableCheckpoint;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
@ -408,7 +407,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final Optional<SnapProtocolManager> maybeSnapProtocolManager =
createSnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive);
final PivotBlockSelector pivotBlockSelector = createPivotSelector(protocolContext);
final PivotBlockSelector pivotBlockSelector =
createPivotSelector(protocolContext, ethContext, syncState);
final Synchronizer synchronizer =
createSynchronizer(
@ -493,9 +493,11 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
return toUse;
}
private PivotBlockSelector createPivotSelector(final ProtocolContext protocolContext) {
private PivotBlockSelector createPivotSelector(
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState) {
final PivotSelectorFromPeers pivotSelectorFromPeers = new PivotSelectorFromPeers(syncConfig);
final GenesisConfigOptions genesisConfigOptions = configOptionsSupplier.get();
if (genesisConfigOptions.getTerminalTotalDifficulty().isPresent()) {
@ -514,15 +516,11 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
LOG.info("Initial sync done, unsubscribe forkchoice supplier");
};
return new TransitionPivotSelector(
genesisConfigOptions,
unverifiedForkchoiceSupplier,
pivotSelectorFromPeers,
new PivotSelectorFromFinalizedBlock(
genesisConfigOptions, unverifiedForkchoiceSupplier, unsubscribeForkchoiceListener));
return new PivotSelectorFromFinalizedBlock(
genesisConfigOptions, unverifiedForkchoiceSupplier, unsubscribeForkchoiceListener);
} else {
LOG.info("TTD difficulty is not present, creating initial sync phase for PoW");
return pivotSelectorFromPeers;
return new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem);
}
}

@ -847,7 +847,7 @@ public class BesuCommandTest extends CommandTestAbstract {
}
@Test
public void noOverrideDefaultValuesIfKeyIsNotPresentInConfigFile() throws IOException {
public void noOverrideDefaultValuesIfKeyIsNotPresentInConfigFile() {
final String configFile = this.getClass().getResource("/partial_config.toml").getFile();
parseCommand("--config-file", configFile);
@ -882,7 +882,7 @@ public class BesuCommandTest extends CommandTestAbstract {
final SynchronizerConfiguration syncConfig = syncConfigurationCaptor.getValue();
assertThat(syncConfig.getSyncMode()).isEqualTo(SyncMode.FAST);
assertThat(syncConfig.getFastSyncMinimumPeerCount()).isEqualTo(1);
assertThat(syncConfig.getFastSyncMinimumPeerCount()).isEqualTo(5);
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
@ -1709,20 +1709,8 @@ public class BesuCommandTest extends CommandTestAbstract {
}
@Test
public void checkValidDefaultFastSyncMinPeersPoS() {
parseCommand("--sync-mode", "FAST", "--network", "MAINNET");
verify(mockControllerBuilder).synchronizerConfiguration(syncConfigurationCaptor.capture());
final SynchronizerConfiguration syncConfig = syncConfigurationCaptor.getValue();
assertThat(syncConfig.getSyncMode()).isEqualTo(SyncMode.FAST);
assertThat(syncConfig.getFastSyncMinimumPeerCount()).isEqualTo(1);
assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}
@Test
public void checkValidDefaultFastSyncMinPeersPoW() {
parseCommand("--sync-mode", "FAST", "--network", "CLASSIC");
public void checkValidDefaultFastSyncMinPeers() {
parseCommand("--sync-mode", "FAST");
verify(mockControllerBuilder).synchronizerConfiguration(syncConfigurationCaptor.capture());
final SynchronizerConfiguration syncConfig = syncConfigurationCaptor.getValue();
@ -1744,30 +1732,6 @@ public class BesuCommandTest extends CommandTestAbstract {
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}
@Test
public void parsesValidFastSyncMinPeersOptionPreMerge() {
parseCommand("--sync-mode", "FAST", "--network", "CLASSIC", "--fast-sync-min-peers", "11");
verify(mockControllerBuilder).synchronizerConfiguration(syncConfigurationCaptor.capture());
final SynchronizerConfiguration syncConfig = syncConfigurationCaptor.getValue();
assertThat(syncConfig.getSyncMode()).isEqualTo(SyncMode.FAST);
assertThat(syncConfig.getFastSyncMinimumPeerCount()).isEqualTo(11);
assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}
@Test
public void parsesValidFastSyncMinPeersOptionPostMerge() {
parseCommand("--sync-mode", "FAST", "--network", "GOERLI", "--fast-sync-min-peers", "11");
verify(mockControllerBuilder).synchronizerConfiguration(syncConfigurationCaptor.capture());
final SynchronizerConfiguration syncConfig = syncConfigurationCaptor.getValue();
assertThat(syncConfig.getSyncMode()).isEqualTo(SyncMode.FAST);
assertThat(syncConfig.getFastSyncMinimumPeerCount()).isEqualTo(11);
assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}
@Test
public void parsesInvalidFastSyncMinPeersOptionWrongFormatShouldFail() {

@ -14,14 +14,16 @@
*/
package org.hyperledger.besu.ethereum.eth.sync;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public interface PivotBlockSelector {
Optional<FastSyncState> selectNewPivotBlock(EthPeer peer);
Optional<FastSyncState> selectNewPivotBlock();
CompletableFuture<Void> prepareRetry();
default void close() {
// do nothing by default

@ -15,18 +15,15 @@
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerLimiter;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@ -34,12 +31,9 @@ import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.util.ExceptionUtils;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
@ -94,40 +88,6 @@ public class FastSyncActions {
return syncState;
}
public CompletableFuture<FastSyncState> waitForSuitablePeers(final FastSyncState fastSyncState) {
if (fastSyncState.hasPivotBlockHeader()) {
return waitForAnyPeer().thenApply(ignore -> fastSyncState);
}
LOG.debug("Waiting for at least {} peers.", syncConfig.getFastSyncMinimumPeerCount());
return waitForPeers(syncConfig.getFastSyncMinimumPeerCount())
.thenApply(successfulWaitResult -> fastSyncState);
}
public <T> CompletableFuture<T> scheduleFutureTask(
final Supplier<CompletableFuture<T>> future, final Duration duration) {
return ethContext.getScheduler().scheduleFutureTask(future, duration);
}
private CompletableFuture<Void> waitForAnyPeer() {
final CompletableFuture<Void> waitForPeerResult =
ethContext.getScheduler().timeout(WaitForPeersTask.create(ethContext, 1, metricsSystem));
return exceptionallyCompose(
waitForPeerResult,
throwable -> {
if (ExceptionUtils.rootCause(throwable) instanceof TimeoutException) {
return waitForAnyPeer();
}
return CompletableFuture.failedFuture(throwable);
});
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
public CompletableFuture<FastSyncState> selectPivotBlock(final FastSyncState fastSyncState) {
return fastSyncState.hasPivotBlockHeader()
? completedFuture(fastSyncState)
@ -136,77 +96,21 @@ public class FastSyncActions {
private CompletableFuture<FastSyncState> selectNewPivotBlock() {
return selectBestPeer()
.map(
bestPeer ->
pivotBlockSelector
.selectNewPivotBlock(bestPeer)
.map(CompletableFuture::completedFuture)
.orElse(null))
return pivotBlockSelector
.selectNewPivotBlock()
.map(CompletableFuture::completedFuture)
.orElseGet(this::retrySelectPivotBlockAfterDelay);
}
private Optional<EthPeer> selectBestPeer() {
return ethContext
.getEthPeers()
.bestPeerMatchingCriteria(this::canPeerDeterminePivotBlock)
// Only select a pivot block number when we have a minimum number of height estimates
.filter(unused -> enoughFastSyncPeersArePresent());
}
private boolean enoughFastSyncPeersArePresent() {
final long peerCount = countPeersThatCanDeterminePivotBlock();
final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount();
if (peerCount < minPeerCount) {
LOG.info(
"Waiting for valid peers with chain height information. {} / {} required peers currently available.",
peerCount,
minPeerCount);
return false;
}
return true;
}
private long countPeersThatCanDeterminePivotBlock() {
return ethContext
.getEthPeers()
.streamAvailablePeers()
.filter(this::canPeerDeterminePivotBlock)
.count();
}
private boolean canPeerDeterminePivotBlock(final EthPeer peer) {
LOG.debug(
"peer {} hasEstimatedHeight {} isFullyValidated? {}",
peer.getShortNodeId(),
peer.chainState().hasEstimatedHeight(),
peer.isFullyValidated());
return peer.chainState().hasEstimatedHeight() && peer.isFullyValidated();
<T> CompletableFuture<T> scheduleFutureTask(
final Supplier<CompletableFuture<T>> future, final Duration duration) {
return ethContext.getScheduler().scheduleFutureTask(future, duration);
}
private CompletableFuture<FastSyncState> retrySelectPivotBlockAfterDelay() {
return ethContext
.getScheduler()
.scheduleFutureTask(
this::limitTrailingPeersAndRetrySelectPivotBlock, Duration.ofSeconds(5));
}
private long conservativelyEstimatedPivotBlock() {
long estimatedNextPivot =
syncState.getLocalChainHeight() + syncConfig.getFastSyncPivotDistance();
return Math.min(syncState.bestChainHeight(), estimatedNextPivot);
}
private CompletableFuture<FastSyncState> limitTrailingPeersAndRetrySelectPivotBlock() {
final TrailingPeerLimiter trailingPeerLimiter =
new TrailingPeerLimiter(
ethContext.getEthPeers(),
() ->
new TrailingPeerRequirements(
conservativelyEstimatedPivotBlock(), syncConfig.getMaxTrailingPeers()));
trailingPeerLimiter.enforceTrailingPeerLimit();
return waitForPeers(syncConfig.getFastSyncMinimumPeerCount())
.scheduleFutureTask(pivotBlockSelector::prepareRetry, Duration.ofSeconds(5))
.thenCompose(ignore -> selectNewPivotBlock());
}
@ -218,22 +122,26 @@ public class FastSyncActions {
private CompletableFuture<FastSyncState> internalDownloadPivotBlockHeader(
final FastSyncState currentState) {
if (currentState.hasPivotBlockHeader()) {
LOG.debug("Initial sync state {} already contains the block header", currentState);
return completedFuture(currentState);
}
return currentState
.getPivotBlockHash()
.map(this::downloadPivotBlockHeader)
.orElseGet(
() ->
new PivotBlockRetriever(
protocolSchedule,
ethContext,
metricsSystem,
currentState.getPivotBlockNumber().getAsLong(),
syncConfig.getFastSyncMinimumPeerCount(),
syncConfig.getFastSyncPivotDistance())
.downloadPivotBlockHeader());
return waitForPeers(1)
.thenCompose(
unused ->
currentState
.getPivotBlockHash()
.map(this::downloadPivotBlockHeader)
.orElseGet(
() ->
new PivotBlockRetriever(
protocolSchedule,
ethContext,
metricsSystem,
currentState.getPivotBlockNumber().getAsLong(),
syncConfig.getFastSyncMinimumPeerCount(),
syncConfig.getFastSyncPivotDistance())
.downloadPivotBlockHeader()));
}
private FastSyncState updateStats(final FastSyncState fastSyncState) {
@ -257,19 +165,32 @@ public class FastSyncActions {
}
private CompletableFuture<FastSyncState> downloadPivotBlockHeader(final Hash hash) {
LOG.debug("Downloading pivot block header by hash {}", hash);
return RetryingGetHeaderFromPeerByHashTask.byHash(
protocolSchedule, ethContext, hash, metricsSystem)
.getHeader()
.thenApply(
blockHeader -> {
LOG.trace(
"Successfully downloaded pivot block header by hash: {}",
blockHeader.toLogString());
return new FastSyncState(blockHeader);
});
.whenComplete(
(blockHeader, throwable) -> {
if (throwable != null) {
LOG.debug("Error downloading block header by hash {}", hash);
} else {
debugLambda(
LOG,
"Successfully downloaded pivot block header by hash {}",
blockHeader::toLogString);
}
})
.thenApply(FastSyncState::new);
}
public boolean isBlockchainBehind(final long blockNumber) {
return protocolContext.getBlockchain().getChainHeadHeader().getNumber() < blockNumber;
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
}

@ -93,8 +93,7 @@ public class FastSyncDownloader<REQUEST> {
final FastSyncState fastSyncState,
final Function<FastSyncState, CompletableFuture<FastSyncState>> onNewPivotBlock) {
return exceptionallyCompose(
fastSyncActions
.waitForSuitablePeers(fastSyncState)
CompletableFuture.completedFuture(fastSyncState)
.thenCompose(fastSyncActions::selectPivotBlock)
.thenCompose(fastSyncActions::downloadPivotBlockHeader)
.thenApply(this::updateMaxTrailingPeers)

@ -17,10 +17,10 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.consensus.merge.ForkchoiceEvent;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.slf4j.Logger;
@ -29,33 +29,38 @@ import org.slf4j.LoggerFactory;
public class PivotSelectorFromFinalizedBlock implements PivotBlockSelector {
private static final Logger LOG = LoggerFactory.getLogger(PivotSelectorFromFinalizedBlock.class);
private final GenesisConfigOptions genesisConfig;
private final Supplier<Optional<ForkchoiceEvent>> forkchoiceSupplier;
private final Supplier<Optional<ForkchoiceEvent>> forkchoiceStateSupplier;
private final Runnable cleanupAction;
public PivotSelectorFromFinalizedBlock(
final GenesisConfigOptions genesisConfig,
final Supplier<Optional<ForkchoiceEvent>> forkchoiceSupplier,
final Supplier<Optional<ForkchoiceEvent>> forkchoiceStateSupplier,
final Runnable cleanupAction) {
this.genesisConfig = genesisConfig;
this.forkchoiceSupplier = forkchoiceSupplier;
this.forkchoiceStateSupplier = forkchoiceStateSupplier;
this.cleanupAction = cleanupAction;
}
@Override
public Optional<FastSyncState> selectNewPivotBlock(final EthPeer peer) {
final Optional<ForkchoiceEvent> maybeForkchoiceEvent = forkchoiceSupplier.get();
if (maybeForkchoiceEvent.isPresent()) {
public Optional<FastSyncState> selectNewPivotBlock() {
final Optional<ForkchoiceEvent> maybeForkchoice = forkchoiceStateSupplier.get();
if (maybeForkchoice.isPresent() && maybeForkchoice.get().hasValidFinalizedBlockHash()) {
return Optional.of(
selectLastFinalizedBlockAsPivot(maybeForkchoiceEvent.get().getFinalizedBlockHash()));
selectLastFinalizedBlockAsPivot(maybeForkchoice.get().getFinalizedBlockHash()));
}
LOG.trace("No finalized block hash announced yet");
LOG.debug("No finalized block hash announced yet");
return Optional.empty();
}
@Override
public CompletableFuture<Void> prepareRetry() {
// nothing to do
return CompletableFuture.completedFuture(null);
}
private FastSyncState selectLastFinalizedBlockAsPivot(final Hash finalizedHash) {
LOG.trace("Returning finalized block hash as pivot: {}", finalizedHash);
LOG.debug("Returning finalized block hash {} as pivot", finalizedHash);
return new FastSyncState(finalizedHash);
}

@ -15,11 +15,18 @@
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerLimiter;
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -28,26 +35,99 @@ public class PivotSelectorFromPeers implements PivotBlockSelector {
private static final Logger LOG = LoggerFactory.getLogger(PivotSelectorFromPeers.class);
private final EthContext ethContext;
private final SynchronizerConfiguration syncConfig;
private final SyncState syncState;
private final MetricsSystem metricsSystem;
public PivotSelectorFromPeers(final SynchronizerConfiguration syncConfig) {
public PivotSelectorFromPeers(
final EthContext ethContext,
final SynchronizerConfiguration syncConfig,
final SyncState syncState,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.syncConfig = syncConfig;
this.syncState = syncState;
this.metricsSystem = metricsSystem;
}
@Override
public Optional<FastSyncState> selectNewPivotBlock(final EthPeer peer) {
return fromBestPeer(peer);
public Optional<FastSyncState> selectNewPivotBlock() {
return selectBestPeer().flatMap(this::fromBestPeer);
}
@Override
public CompletableFuture<Void> prepareRetry() {
final TrailingPeerLimiter trailingPeerLimiter =
new TrailingPeerLimiter(
ethContext.getEthPeers(),
() ->
new TrailingPeerRequirements(
conservativelyEstimatedPivotBlock(), syncConfig.getMaxTrailingPeers()));
trailingPeerLimiter.enforceTrailingPeerLimit();
return waitForPeers(syncConfig.getFastSyncMinimumPeerCount());
}
private Optional<FastSyncState> fromBestPeer(final EthPeer peer) {
final long pivotBlockNumber =
peer.chainState().getEstimatedHeight() - syncConfig.getFastSyncPivotDistance();
if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) {
// Peer's chain isn't long enough, return an empty value so we can try again.
// Peer's chain isn't long enough, return an empty value, so we can try again.
LOG.info("Waiting for peers with sufficient chain height");
return Optional.empty();
}
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber);
return Optional.of(new FastSyncState(pivotBlockNumber));
}
private Optional<EthPeer> selectBestPeer() {
return ethContext
.getEthPeers()
.bestPeerMatchingCriteria(this::canPeerDeterminePivotBlock)
// Only select a pivot block number when we have a minimum number of height estimates
.filter(unused -> enoughFastSyncPeersArePresent());
}
private boolean enoughFastSyncPeersArePresent() {
final long peerCount = countPeersThatCanDeterminePivotBlock();
final int minPeerCount = syncConfig.getFastSyncMinimumPeerCount();
if (peerCount < minPeerCount) {
LOG.info(
"Waiting for valid peers with chain height information. {} / {} required peers currently available.",
peerCount,
minPeerCount);
return false;
}
return true;
}
private long countPeersThatCanDeterminePivotBlock() {
return ethContext
.getEthPeers()
.streamAvailablePeers()
.filter(this::canPeerDeterminePivotBlock)
.count();
}
private boolean canPeerDeterminePivotBlock(final EthPeer peer) {
LOG.debug(
"peer {} hasEstimatedHeight {} isFullyValidated? {}",
peer.getShortNodeId(),
peer.chainState().hasEstimatedHeight(),
peer.isFullyValidated());
return peer.chainState().hasEstimatedHeight() && peer.isFullyValidated();
}
private long conservativelyEstimatedPivotBlock() {
long estimatedNextPivot =
syncState.getLocalChainHeight() + syncConfig.getFastSyncPivotDistance();
return Math.min(syncState.bestChainHeight(), estimatedNextPivot);
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
}

@ -1,95 +0,0 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.consensus.merge.ForkchoiceEvent;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransitionPivotSelector implements PivotBlockSelector {
private static final Logger LOG = LoggerFactory.getLogger(TransitionPivotSelector.class);
private final Difficulty totalTerminalDifficulty;
private final Supplier<Optional<ForkchoiceEvent>> forkchoiceSupplier;
private final PivotBlockSelector pivotSelectorFromPeers;
private final PivotBlockSelector pivotSelectorFromFinalizedBlock;
public TransitionPivotSelector(
final GenesisConfigOptions genesisConfig,
final Supplier<Optional<ForkchoiceEvent>> forkchoiceSupplier,
final PivotBlockSelector pivotSelectorFromPeers,
final PivotBlockSelector pivotSelectorFromFinalizedBlock) {
this.totalTerminalDifficulty =
genesisConfig
.getTerminalTotalDifficulty()
.map(Difficulty::of)
.orElseThrow(
() ->
new IllegalArgumentException(
"This class can only be used when TTD is present"));
this.forkchoiceSupplier = forkchoiceSupplier;
this.pivotSelectorFromPeers = pivotSelectorFromPeers;
this.pivotSelectorFromFinalizedBlock = pivotSelectorFromFinalizedBlock;
}
@Override
public Optional<FastSyncState> selectNewPivotBlock(final EthPeer peer) {
return routeDependingOnTotalTerminalDifficulty(peer);
}
private Optional<FastSyncState> routeDependingOnTotalTerminalDifficulty(final EthPeer peer) {
Difficulty bestPeerEstDifficulty = peer.chainState().getEstimatedTotalDifficulty();
if (forkchoiceSupplier.get().isPresent()) {
LOG.trace("A finalized block is present, use it as pivot");
return pivotSelectorFromFinalizedBlock.selectNewPivotBlock(peer);
}
if (bestPeerEstDifficulty.greaterOrEqualThan(totalTerminalDifficulty)) {
LOG.debug(
"Chain has reached TTD, best peer has estimated difficulty {},"
+ " select pivot from finalized block",
bestPeerEstDifficulty);
return pivotSelectorFromFinalizedBlock.selectNewPivotBlock(peer);
}
LOG.info(
"Chain has not yet reached TTD, best peer has estimated difficulty {},"
+ " select pivot from peers",
bestPeerEstDifficulty);
return pivotSelectorFromPeers.selectNewPivotBlock(peer);
}
@Override
public void close() {
pivotSelectorFromFinalizedBlock.close();
pivotSelectorFromPeers.close();
}
@Override
public long getMinRequiredBlockNumber() {
return pivotSelectorFromFinalizedBlock.getMinRequiredBlockNumber();
}
}

@ -20,6 +20,7 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
@ -68,8 +69,7 @@ public class DynamicPivotBlockManager {
.orElse(currentPivotBlockNumber);
if (distanceNextPivotBlock > pivotBlockDistanceBeforeCaching
&& isSearchingPivotBlock.compareAndSet(false, true)) {
syncActions
.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)
CompletableFuture.completedFuture(FastSyncState.EMPTY_SYNC_STATE)
.thenCompose(syncActions::selectPivotBlock)
.thenCompose(syncActions::downloadPivotBlockHeader)
.thenAccept(fss -> lastPivotBlockFound = fss.getPivotBlockHeader())

@ -21,7 +21,6 @@ import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.consensus.merge.ForkchoiceEvent;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
@ -32,6 +31,7 @@ import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
@ -44,6 +44,7 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.ArrayList;
import java.util.Arrays;
@ -71,8 +72,12 @@ public class FastSyncActionsTest {
private SynchronizerConfiguration syncConfig = syncConfigBuilder.build();
private FastSyncActions fastSyncActions;
private EthProtocolManager ethProtocolManager;
private EthContext ethContext;
private EthPeers ethPeers;
private MutableBlockchain blockchain;
private BlockchainSetupUtil blockchainSetupUtil;
private SyncState syncState;
private MetricsSystem metricsSystem;
@Parameterized.Parameters
public static Collection<Object[]> data() {
@ -97,28 +102,33 @@ public class FastSyncActionsTest {
blockchainSetupUtil.getWorldArchive(),
blockchainSetupUtil.getTransactionPool(),
EthProtocolConfiguration.defaultConfig());
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
ethContext = ethProtocolManager.ethContext();
ethPeers = ethContext.getEthPeers();
syncState = new SyncState(blockchain, ethPeers);
metricsSystem = new NoOpMetricsSystem();
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
}
@Test
public void waitForPeersShouldSucceedIfEnoughPeersAreFound() {
for (int i = 0; i < syncConfig.getFastSyncMinimumPeerCount(); i++) {
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
EthProtocolManagerTestUtil.createPeer(
ethProtocolManager, syncConfig.getFastSyncPivotDistance() + i + 1);
}
final CompletableFuture<FastSyncState> result =
fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
assertThat(result).isCompletedWithValue(FastSyncState.EMPTY_SYNC_STATE);
fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
assertThat(result).isCompletedWithValue(new FastSyncState(5));
}
@Test
public void waitForPeersShouldOnlyRequireOnePeerWhenPivotBlockIsAlreadySelected() {
public void returnTheSamePivotBlockIfAlreadySelected() {
final BlockHeader pivotHeader = new BlockHeaderTestFixture().number(1024).buildHeader();
final FastSyncState fastSyncState = new FastSyncState(pivotHeader);
final CompletableFuture<FastSyncState> result =
fastSyncActions.waitForSuitablePeers(fastSyncState);
assertThat(result).isNotDone();
EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
final CompletableFuture<FastSyncState> result = fastSyncActions.selectPivotBlock(fastSyncState);
assertThat(result).isDone();
assertThat(result).isCompletedWithValue(fastSyncState);
}
@ -140,7 +150,10 @@ public class FastSyncActionsTest {
final int minPeers = 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 5000);
@ -155,7 +168,10 @@ public class FastSyncActionsTest {
final int minPeers = 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(1000), 5500);
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(2000), 4000);
@ -172,7 +188,10 @@ public class FastSyncActionsTest {
final int minPeers = 2;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
final CompletableFuture<FastSyncState> result =
fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
@ -197,7 +216,10 @@ public class FastSyncActionsTest {
final int minPeers = 3;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@ -242,7 +264,10 @@ public class FastSyncActionsTest {
final PeerValidator validator = mock(PeerValidator.class);
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@ -267,7 +292,7 @@ public class FastSyncActionsTest {
// Validate a subset of peers
peers.subList(0, minPeers - 1).forEach(p -> p.getEthPeer().markValidated(validator));
// No pivot should be selected while only a subset of peers have height estimates
// No pivot should be selected while only a subset of peers has height estimates
EthProtocolManagerTestUtil.runPendingFutures(ethProtocolManager);
assertThat(result).isNotDone();
@ -303,7 +328,10 @@ public class FastSyncActionsTest {
final int peerCount = minPeers + 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
final long minPivotHeight = syncConfig.getFastSyncPivotDistance() + 1L;
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@ -349,7 +377,10 @@ public class FastSyncActionsTest {
final int minPeers = 1;
syncConfigBuilder.fastSyncMinimumPeerCount(minPeers);
syncConfig = syncConfigBuilder.build();
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
final long pivotDistance = syncConfig.getFastSyncPivotDistance();
EthProtocolManagerTestUtil.disableEthSchedulerAutoRun(ethProtocolManager);
@ -400,7 +431,10 @@ public class FastSyncActionsTest {
@Test
public void downloadPivotBlockHeaderShouldRetrievePivotBlockHeader() {
syncConfig = SynchronizerConfiguration.builder().fastSyncMinimumPeerCount(1).build();
fastSyncActions = createFastSyncActions(syncConfig, new PivotSelectorFromPeers(syncConfig));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem));
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001);
final CompletableFuture<FastSyncState> result =
@ -419,19 +453,18 @@ public class FastSyncActionsTest {
GenesisConfigOptions genesisConfig = mock(GenesisConfigOptions.class);
when(genesisConfig.getTerminalBlockNumber()).thenReturn(OptionalLong.of(10L));
final Hash finalizedHash = blockchain.getBlockHashByNumber(2L).get();
final Optional<ForkchoiceEvent> finalizedEvent =
Optional.of(new ForkchoiceEvent(null, null, blockchain.getBlockHashByNumber(2L).get()));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromFinalizedBlock(
genesisConfig,
() -> Optional.of(new ForkchoiceEvent(finalizedHash, finalizedHash, finalizedHash)),
() -> {}));
new PivotSelectorFromFinalizedBlock(genesisConfig, () -> finalizedEvent, () -> {}));
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001);
final CompletableFuture<FastSyncState> result =
fastSyncActions.downloadPivotBlockHeader(new FastSyncState(finalizedHash));
fastSyncActions.downloadPivotBlockHeader(
new FastSyncState(finalizedEvent.get().getFinalizedBlockHash()));
assertThat(result).isNotCompleted();
final RespondingEthPeer.Responder responder = RespondingEthPeer.blockchainResponder(blockchain);

@ -50,9 +50,6 @@ import org.junit.Test;
public class FastSyncDownloaderTest {
private static final CompletableFuture<FastSyncState> COMPLETE =
completedFuture(FastSyncState.EMPTY_SYNC_STATE);
@SuppressWarnings("unchecked")
private final FastSyncActions fastSyncActions = mock(FastSyncActions.class);
@ -88,7 +85,6 @@ public class FastSyncDownloaderTest {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
@ -102,7 +98,6 @@ public class FastSyncDownloaderTest {
final CompletableFuture<FastSyncState> result = downloader.start();
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(storage).storeState(downloadPivotBlockHeaderState);
@ -119,7 +114,6 @@ public class FastSyncDownloaderTest {
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState fastSyncState = new FastSyncState(pivotBlockHeader);
final CompletableFuture<FastSyncState> complete = completedFuture(fastSyncState);
when(fastSyncActions.waitForSuitablePeers(fastSyncState)).thenReturn(complete);
when(fastSyncActions.selectPivotBlock(fastSyncState)).thenReturn(complete);
when(fastSyncActions.downloadPivotBlockHeader(fastSyncState)).thenReturn(complete);
when(fastSyncActions.createChainDownloader(fastSyncState)).thenReturn(chainDownloader);
@ -140,7 +134,6 @@ public class FastSyncDownloaderTest {
final CompletableFuture<FastSyncState> result = resumedDownloader.start();
verify(fastSyncActions).waitForSuitablePeers(fastSyncState);
verify(fastSyncActions).selectPivotBlock(fastSyncState);
verify(fastSyncActions).downloadPivotBlockHeader(fastSyncState);
verify(storage).storeState(fastSyncState);
@ -152,23 +145,8 @@ public class FastSyncDownloaderTest {
assertThat(result).isCompletedWithValue(fastSyncState);
}
@Test
public void shouldAbortIfWaitForSuitablePeersFails() {
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(
CompletableFuture.failedFuture(new FastSyncException(FastSyncError.UNEXPECTED_ERROR)));
final CompletableFuture<FastSyncState> result = downloader.start();
assertCompletedExceptionally(result, FastSyncError.UNEXPECTED_ERROR);
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verifyNoMoreInteractions(fastSyncActions);
}
@Test
public void shouldAbortIfSelectPivotBlockFails() {
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenThrow(new FastSyncException(FastSyncError.UNEXPECTED_ERROR));
@ -176,7 +154,6 @@ public class FastSyncDownloaderTest {
assertCompletedExceptionally(result, FastSyncError.UNEXPECTED_ERROR);
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verifyNoMoreInteractions(fastSyncActions);
}
@ -188,7 +165,7 @@ public class FastSyncDownloaderTest {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
@ -202,7 +179,6 @@ public class FastSyncDownloaderTest {
final CompletableFuture<FastSyncState> result = downloader.start();
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(storage).storeState(downloadPivotBlockHeaderState);
@ -227,7 +203,7 @@ public class FastSyncDownloaderTest {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
@ -241,7 +217,6 @@ public class FastSyncDownloaderTest {
final CompletableFuture<FastSyncState> result = downloader.start();
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState);
@ -262,7 +237,7 @@ public class FastSyncDownloaderTest {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
doAnswer(
@ -284,7 +259,6 @@ public class FastSyncDownloaderTest {
Throwable thrown = catchThrowable(() -> result.get());
assertThat(thrown).hasCauseExactlyInstanceOf(CancellationException.class);
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(storage).storeState(downloadPivotBlockHeaderState);
@ -299,7 +273,7 @@ public class FastSyncDownloaderTest {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
@ -313,7 +287,6 @@ public class FastSyncDownloaderTest {
final CompletableFuture<FastSyncState> result = downloader.start();
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState);
@ -335,7 +308,7 @@ public class FastSyncDownloaderTest {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
@ -349,7 +322,6 @@ public class FastSyncDownloaderTest {
final CompletableFuture<FastSyncState> result = downloader.start();
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState);
@ -379,8 +351,8 @@ public class FastSyncDownloaderTest {
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
final FastSyncState secondDownloadPivotBlockHeaderState =
new FastSyncState(secondPivotBlockHeader);
// First attempt
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(
completedFuture(selectPivotBlockState), completedFuture(secondSelectPivotBlockState));
@ -406,7 +378,6 @@ public class FastSyncDownloaderTest {
final CompletableFuture<FastSyncState> result = downloader.start();
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(storage).storeState(downloadPivotBlockHeaderState);
@ -423,7 +394,6 @@ public class FastSyncDownloaderTest {
// A real chain downloader would cause the chainFuture to complete when cancel is called.
chainFuture.completeExceptionally(new CancellationException());
verify(fastSyncActions, times(2)).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions, times(2)).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(secondSelectPivotBlockState);
verify(storage).storeState(secondDownloadPivotBlockHeaderState);
@ -452,8 +422,8 @@ public class FastSyncDownloaderTest {
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
final FastSyncState secondDownloadPivotBlockHeaderState =
new FastSyncState(secondPivotBlockHeader);
// First attempt
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(
completedFuture(selectPivotBlockState), completedFuture(secondSelectPivotBlockState));
@ -481,7 +451,6 @@ public class FastSyncDownloaderTest {
final CompletableFuture<FastSyncState> result = downloader.start();
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(storage).storeState(downloadPivotBlockHeaderState);
@ -500,7 +469,6 @@ public class FastSyncDownloaderTest {
chainFuture.completeExceptionally(new CancellationException());
verify(fastSyncActions).scheduleFutureTask(any(), any());
verify(fastSyncActions, times(2)).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions, times(2)).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE);
verify(fastSyncActions).downloadPivotBlockHeader(secondSelectPivotBlockState);
verify(storage).storeState(secondDownloadPivotBlockHeaderState);
@ -516,12 +484,7 @@ public class FastSyncDownloaderTest {
@Test
public void shouldNotHaveTrailingPeerRequirementsBeforePivotBlockSelected() {
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(new CompletableFuture<>());
downloader.start();
verify(fastSyncActions).waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE);
Assertions.assertThat(downloader.calculateTrailingPeerRequirements()).isEmpty();
}
@ -530,7 +493,7 @@ public class FastSyncDownloaderTest {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
@ -552,7 +515,7 @@ public class FastSyncDownloaderTest {
final FastSyncState selectPivotBlockState = new FastSyncState(50);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(50).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))

@ -16,20 +16,19 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.DeterministicEthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import org.junit.Before;
import org.junit.Test;
@ -39,17 +38,19 @@ public class DynamicPivotBlockManagerTest {
private final SnapSyncState snapSyncState = mock(SnapSyncState.class);
private final FastSyncActions fastSyncActions = mock(FastSyncActions.class);
private final SyncState syncState = mock(SyncState.class);
private final DynamicPivotBlockManager dynamicPivotBlockManager =
new DynamicPivotBlockManager(
fastSyncActions,
snapSyncState,
SnapSyncConfiguration.DEFAULT_PIVOT_BLOCK_WINDOW_VALIDITY,
SnapSyncConfiguration.DEFAULT_PIVOT_BLOCK_DISTANCE_BEFORE_CACHING);
private final EthContext ethContext = mock(EthContext.class);
private DynamicPivotBlockManager dynamicPivotBlockManager;
@Before
public void setup() {
when(fastSyncActions.getSyncState()).thenReturn(syncState);
when(ethContext.getScheduler()).thenReturn(new DeterministicEthScheduler());
dynamicPivotBlockManager =
new DynamicPivotBlockManager(
fastSyncActions,
snapSyncState,
SnapSyncConfiguration.DEFAULT_PIVOT_BLOCK_WINDOW_VALIDITY,
SnapSyncConfiguration.DEFAULT_PIVOT_BLOCK_DISTANCE_BEFORE_CACHING);
}
@Test
@ -60,18 +61,13 @@ public class DynamicPivotBlockManagerTest {
when(snapSyncState.getPivotBlockNumber()).thenReturn(OptionalLong.of(999));
dynamicPivotBlockManager.check(
(blockHeader, newBlockFound) -> assertThat(newBlockFound).isFalse());
verify(fastSyncActions, never()).waitForSuitablePeers(any());
}
@Test
public void shouldSearchNewPivotBlockWhenNotCloseToTheHead() {
final CompletableFuture<FastSyncState> COMPLETE =
completedFuture(FastSyncState.EMPTY_SYNC_STATE);
final FastSyncState selectPivotBlockState = new FastSyncState(1090);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(1090).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
@ -82,18 +78,13 @@ public class DynamicPivotBlockManagerTest {
when(snapSyncState.getPivotBlockNumber()).thenReturn(OptionalLong.of(939));
dynamicPivotBlockManager.check(
(blockHeader, newBlockFound) -> assertThat(newBlockFound).isFalse());
verify(fastSyncActions).waitForSuitablePeers(any());
}
@Test
public void shouldSwitchToNewPivotBlockWhenNeeded() {
final CompletableFuture<FastSyncState> COMPLETE =
completedFuture(FastSyncState.EMPTY_SYNC_STATE);
final FastSyncState selectPivotBlockState = new FastSyncState(1060);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(1060).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
@ -117,18 +108,13 @@ public class DynamicPivotBlockManagerTest {
});
verify(snapSyncState).setCurrentHeader(pivotBlockHeader);
verify(fastSyncActions).waitForSuitablePeers(any());
}
@Test
public void shouldSwitchToNewPivotOnlyOnce() {
final CompletableFuture<FastSyncState> COMPLETE =
completedFuture(FastSyncState.EMPTY_SYNC_STATE);
final FastSyncState selectPivotBlockState = new FastSyncState(1060);
final BlockHeader pivotBlockHeader = new BlockHeaderTestFixture().number(1060).buildHeader();
final FastSyncState downloadPivotBlockHeaderState = new FastSyncState(pivotBlockHeader);
when(fastSyncActions.waitForSuitablePeers(FastSyncState.EMPTY_SYNC_STATE)).thenReturn(COMPLETE);
when(fastSyncActions.selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE))
.thenReturn(completedFuture(selectPivotBlockState));
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))

Loading…
Cancel
Save