World state halt and chain halt fixes (#7027)

Signed-off-by: Jason Frame <jason.frame@consensys.net>
Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
pull/7043/head
Jason Frame 7 months ago committed by GitHub
parent f63282c3b2
commit bbbfc4c822
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 13
      besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java
  3. 2
      besu/src/test/java/org/hyperledger/besu/cli/options/SynchronizerOptionsTest.java
  4. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SynchronizerConfiguration.java
  5. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  6. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromSafeBlock.java
  7. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java
  8. 37
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/SyncTargetRangeSource.java
  9. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/DynamicPivotBlockSelector.java
  10. 48
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java
  11. 51
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java
  12. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java
  13. 17
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadStateTest.java

@ -55,6 +55,7 @@
- Snap client fixes discovered during snap server testing [#6847](https://github.com/hyperledger/besu/pull/6847) - Snap client fixes discovered during snap server testing [#6847](https://github.com/hyperledger/besu/pull/6847)
- Correctly initialize the txpool as disabled on creation [#6890](https://github.com/hyperledger/besu/pull/6890) - Correctly initialize the txpool as disabled on creation [#6890](https://github.com/hyperledger/besu/pull/6890)
- Fix worldstate download halt when using snap sync during initial sync [#6981](https://github.com/hyperledger/besu/pull/6981) - Fix worldstate download halt when using snap sync during initial sync [#6981](https://github.com/hyperledger/besu/pull/6981)
- Fix chain halt due to peers only partially responding with headers. And worldstate halts caused by a halt in the chain sync [#7027](https://github.com/hyperledger/besu/pull/7027)
### Download Links ### Download Links

@ -39,6 +39,8 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
"--Xsynchronizer-downloader-header-request-size"; "--Xsynchronizer-downloader-header-request-size";
private static final String DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED_FLAG = private static final String DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED_FLAG =
"--Xsynchronizer-downloader-checkpoint-timeouts-permitted"; "--Xsynchronizer-downloader-checkpoint-timeouts-permitted";
private static final String DOWNLOADER_CHECKPOINT_RETRIES_FLAG =
"--Xsynchronizer-downloader-checkpoint-RETRIES";
private static final String DOWNLOADER_CHAIN_SEGMENT_SIZE_FLAG = private static final String DOWNLOADER_CHAIN_SEGMENT_SIZE_FLAG =
"--Xsynchronizer-downloader-chain-segment-size"; "--Xsynchronizer-downloader-chain-segment-size";
private static final String DOWNLOADER_PARALLELISM_FLAG = private static final String DOWNLOADER_PARALLELISM_FLAG =
@ -132,12 +134,12 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
SynchronizerConfiguration.DEFAULT_DOWNLOADER_HEADER_REQUEST_SIZE; SynchronizerConfiguration.DEFAULT_DOWNLOADER_HEADER_REQUEST_SIZE;
@CommandLine.Option( @CommandLine.Option(
names = DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED_FLAG, names = {DOWNLOADER_CHECKPOINT_RETRIES_FLAG, DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED_FLAG},
hidden = true, hidden = true,
paramLabel = "<INTEGER>", paramLabel = "<INTEGER>",
description = description =
"Number of tries to attempt to download checkpoints before stopping (default: ${DEFAULT-VALUE})") "Number of tries to attempt to download checkpoints before stopping (default: ${DEFAULT-VALUE})")
private int downloaderCheckpointTimeoutsPermitted = private int downloaderCheckpointRetries =
SynchronizerConfiguration.DEFAULT_DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED; SynchronizerConfiguration.DEFAULT_DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED;
@CommandLine.Option( @CommandLine.Option(
@ -354,8 +356,7 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
config.getDownloaderChangeTargetThresholdByHeight(); config.getDownloaderChangeTargetThresholdByHeight();
options.downloaderChangeTargetThresholdByTd = config.getDownloaderChangeTargetThresholdByTd(); options.downloaderChangeTargetThresholdByTd = config.getDownloaderChangeTargetThresholdByTd();
options.downloaderHeaderRequestSize = config.getDownloaderHeaderRequestSize(); options.downloaderHeaderRequestSize = config.getDownloaderHeaderRequestSize();
options.downloaderCheckpointTimeoutsPermitted = options.downloaderCheckpointRetries = config.getDownloaderCheckpointRetries();
config.getDownloaderCheckpointTimeoutsPermitted();
options.downloaderChainSegmentSize = config.getDownloaderChainSegmentSize(); options.downloaderChainSegmentSize = config.getDownloaderChainSegmentSize();
options.downloaderParallelism = config.getDownloaderParallelism(); options.downloaderParallelism = config.getDownloaderParallelism();
options.transactionsParallelism = config.getTransactionsParallelism(); options.transactionsParallelism = config.getTransactionsParallelism();
@ -394,7 +395,7 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
builder.downloaderChangeTargetThresholdByHeight(downloaderChangeTargetThresholdByHeight); builder.downloaderChangeTargetThresholdByHeight(downloaderChangeTargetThresholdByHeight);
builder.downloaderChangeTargetThresholdByTd(downloaderChangeTargetThresholdByTd); builder.downloaderChangeTargetThresholdByTd(downloaderChangeTargetThresholdByTd);
builder.downloaderHeadersRequestSize(downloaderHeaderRequestSize); builder.downloaderHeadersRequestSize(downloaderHeaderRequestSize);
builder.downloaderCheckpointTimeoutsPermitted(downloaderCheckpointTimeoutsPermitted); builder.downloaderCheckpointRetries(downloaderCheckpointRetries);
builder.downloaderChainSegmentSize(downloaderChainSegmentSize); builder.downloaderChainSegmentSize(downloaderChainSegmentSize);
builder.downloaderParallelism(downloaderParallelism); builder.downloaderParallelism(downloaderParallelism);
builder.transactionsParallelism(transactionsParallelism); builder.transactionsParallelism(transactionsParallelism);
@ -436,7 +437,7 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
DOWNLOADER_HEADER_REQUEST_SIZE_FLAG, DOWNLOADER_HEADER_REQUEST_SIZE_FLAG,
OptionParser.format(downloaderHeaderRequestSize), OptionParser.format(downloaderHeaderRequestSize),
DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED_FLAG, DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED_FLAG,
OptionParser.format(downloaderCheckpointTimeoutsPermitted), OptionParser.format(downloaderCheckpointRetries),
DOWNLOADER_CHAIN_SEGMENT_SIZE_FLAG, DOWNLOADER_CHAIN_SEGMENT_SIZE_FLAG,
OptionParser.format(downloaderChainSegmentSize), OptionParser.format(downloaderChainSegmentSize),
DOWNLOADER_PARALLELISM_FLAG, DOWNLOADER_PARALLELISM_FLAG,

@ -60,7 +60,7 @@ public class SynchronizerOptionsTest
SynchronizerConfiguration.DEFAULT_DOWNLOADER_CHANGE_TARGET_THRESHOLD_BY_TD.add(2L)) SynchronizerConfiguration.DEFAULT_DOWNLOADER_CHANGE_TARGET_THRESHOLD_BY_TD.add(2L))
.downloaderHeadersRequestSize( .downloaderHeadersRequestSize(
SynchronizerConfiguration.DEFAULT_DOWNLOADER_HEADER_REQUEST_SIZE + 2) SynchronizerConfiguration.DEFAULT_DOWNLOADER_HEADER_REQUEST_SIZE + 2)
.downloaderCheckpointTimeoutsPermitted( .downloaderCheckpointRetries(
SynchronizerConfiguration.DEFAULT_DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED + 2) SynchronizerConfiguration.DEFAULT_DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED + 2)
.downloaderChainSegmentSize( .downloaderChainSegmentSize(
SynchronizerConfiguration.DEFAULT_DOWNLOADER_CHAIN_SEGMENT_SIZE + 2) SynchronizerConfiguration.DEFAULT_DOWNLOADER_CHAIN_SEGMENT_SIZE + 2)

@ -77,7 +77,7 @@ public class SynchronizerConfiguration {
private final long downloaderChangeTargetThresholdByHeight; private final long downloaderChangeTargetThresholdByHeight;
private final UInt256 downloaderChangeTargetThresholdByTd; private final UInt256 downloaderChangeTargetThresholdByTd;
private final int downloaderHeaderRequestSize; private final int downloaderHeaderRequestSize;
private final int downloaderCheckpointTimeoutsPermitted; private final int downloaderCheckpointRetries;
private final int downloaderChainSegmentSize; private final int downloaderChainSegmentSize;
private final int downloaderParallelism; private final int downloaderParallelism;
private final int transactionsParallelism; private final int transactionsParallelism;
@ -101,7 +101,7 @@ public class SynchronizerConfiguration {
final long downloaderChangeTargetThresholdByHeight, final long downloaderChangeTargetThresholdByHeight,
final UInt256 downloaderChangeTargetThresholdByTd, final UInt256 downloaderChangeTargetThresholdByTd,
final int downloaderHeaderRequestSize, final int downloaderHeaderRequestSize,
final int downloaderCheckpointTimeoutsPermitted, final int downloaderCheckpointRetries,
final int downloaderChainSegmentSize, final int downloaderChainSegmentSize,
final int downloaderParallelism, final int downloaderParallelism,
final int transactionsParallelism, final int transactionsParallelism,
@ -123,7 +123,7 @@ public class SynchronizerConfiguration {
this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight; this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight;
this.downloaderChangeTargetThresholdByTd = downloaderChangeTargetThresholdByTd; this.downloaderChangeTargetThresholdByTd = downloaderChangeTargetThresholdByTd;
this.downloaderHeaderRequestSize = downloaderHeaderRequestSize; this.downloaderHeaderRequestSize = downloaderHeaderRequestSize;
this.downloaderCheckpointTimeoutsPermitted = downloaderCheckpointTimeoutsPermitted; this.downloaderCheckpointRetries = downloaderCheckpointRetries;
this.downloaderChainSegmentSize = downloaderChainSegmentSize; this.downloaderChainSegmentSize = downloaderChainSegmentSize;
this.downloaderParallelism = downloaderParallelism; this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism; this.transactionsParallelism = transactionsParallelism;
@ -191,8 +191,8 @@ public class SynchronizerConfiguration {
return downloaderHeaderRequestSize; return downloaderHeaderRequestSize;
} }
public int getDownloaderCheckpointTimeoutsPermitted() { public int getDownloaderCheckpointRetries() {
return downloaderCheckpointTimeoutsPermitted; return downloaderCheckpointRetries;
} }
public int getDownloaderChainSegmentSize() { public int getDownloaderChainSegmentSize() {
@ -264,8 +264,7 @@ public class SynchronizerConfiguration {
private UInt256 downloaderChangeTargetThresholdByTd = private UInt256 downloaderChangeTargetThresholdByTd =
DEFAULT_DOWNLOADER_CHANGE_TARGET_THRESHOLD_BY_TD; DEFAULT_DOWNLOADER_CHANGE_TARGET_THRESHOLD_BY_TD;
private int downloaderHeaderRequestSize = DEFAULT_DOWNLOADER_HEADER_REQUEST_SIZE; private int downloaderHeaderRequestSize = DEFAULT_DOWNLOADER_HEADER_REQUEST_SIZE;
private int downloaderCheckpointTimeoutsPermitted = private int downloaderCheckpointRetries = DEFAULT_DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED;
DEFAULT_DOWNLOADER_CHECKPOINT_TIMEOUTS_PERMITTED;
private SnapSyncConfiguration snapSyncConfiguration = SnapSyncConfiguration.getDefault(); private SnapSyncConfiguration snapSyncConfiguration = SnapSyncConfiguration.getDefault();
private int downloaderChainSegmentSize = DEFAULT_DOWNLOADER_CHAIN_SEGMENT_SIZE; private int downloaderChainSegmentSize = DEFAULT_DOWNLOADER_CHAIN_SEGMENT_SIZE;
private int downloaderParallelism = DEFAULT_DOWNLOADER_PARALLELISM; private int downloaderParallelism = DEFAULT_DOWNLOADER_PARALLELISM;
@ -327,9 +326,8 @@ public class SynchronizerConfiguration {
return this; return this;
} }
public Builder downloaderCheckpointTimeoutsPermitted( public Builder downloaderCheckpointRetries(final int downloaderCheckpointRetries) {
final int downloaderCheckpointTimeoutsPermitted) { this.downloaderCheckpointRetries = downloaderCheckpointRetries;
this.downloaderCheckpointTimeoutsPermitted = downloaderCheckpointTimeoutsPermitted;
return this; return this;
} }
@ -422,7 +420,7 @@ public class SynchronizerConfiguration {
downloaderChangeTargetThresholdByHeight, downloaderChangeTargetThresholdByHeight,
downloaderChangeTargetThresholdByTd, downloaderChangeTargetThresholdByTd,
downloaderHeaderRequestSize, downloaderHeaderRequestSize,
downloaderCheckpointTimeoutsPermitted, downloaderCheckpointRetries,
downloaderChainSegmentSize, downloaderChainSegmentSize,
downloaderParallelism, downloaderParallelism,
transactionsParallelism, transactionsParallelism,

@ -125,7 +125,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
ethContext.getScheduler(), ethContext.getScheduler(),
target.peer(), target.peer(),
getCommonAncestor(target), getCommonAncestor(target),
syncConfig.getDownloaderCheckpointTimeoutsPermitted(), syncConfig.getDownloaderCheckpointRetries(),
SyncTerminationCondition.never()); SyncTerminationCondition.never());
final DownloadHeadersStep downloadHeadersStep = final DownloadHeadersStep downloadHeadersStep =
new DownloadHeadersStep( new DownloadHeadersStep(

@ -28,6 +28,7 @@ import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -127,7 +128,7 @@ public class PivotSelectorFromSafeBlock implements PivotBlockSelector {
maybeCachedHeadBlockHeader = Optional.of(blockHeader); maybeCachedHeadBlockHeader = Optional.of(blockHeader);
return blockHeader.getNumber(); return blockHeader.getNumber();
}) })
.get(); .get(20, TimeUnit.SECONDS);
} catch (Throwable t) { } catch (Throwable t) {
LOG.debug( LOG.debug(
"Error trying to download chain head block header by hash {}", "Error trying to download chain head block header by hash {}",

@ -91,7 +91,7 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
ethContext.getScheduler(), ethContext.getScheduler(),
target.peer(), target.peer(),
target.commonAncestor(), target.commonAncestor(),
syncConfig.getDownloaderCheckpointTimeoutsPermitted(), syncConfig.getDownloaderCheckpointRetries(),
fullSyncTerminationCondition); fullSyncTerminationCondition);
final DownloadHeadersStep downloadHeadersStep = final DownloadHeadersStep downloadHeadersStep =
new DownloadHeadersStep( new DownloadHeadersStep(

@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition; import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -44,7 +45,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
private final SyncTargetChecker syncTargetChecker; private final SyncTargetChecker syncTargetChecker;
private final EthPeer peer; private final EthPeer peer;
private final EthScheduler ethScheduler; private final EthScheduler ethScheduler;
private final int rangeTimeoutsPermitted; private final int retriesPermitted;
private final Duration newHeaderWaitDuration; private final Duration newHeaderWaitDuration;
private final SyncTerminationCondition terminationCondition; private final SyncTerminationCondition terminationCondition;
@ -52,7 +53,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
private BlockHeader lastRangeEnd; private BlockHeader lastRangeEnd;
private boolean reachedEndOfRanges = false; private boolean reachedEndOfRanges = false;
private Optional<CompletableFuture<List<BlockHeader>>> pendingRequests = Optional.empty(); private Optional<CompletableFuture<List<BlockHeader>>> pendingRequests = Optional.empty();
private int requestFailureCount = 0; private int retryCount = 0;
public SyncTargetRangeSource( public SyncTargetRangeSource(
final RangeHeadersFetcher fetcher, final RangeHeadersFetcher fetcher,
@ -60,7 +61,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
final EthScheduler ethScheduler, final EthScheduler ethScheduler,
final EthPeer peer, final EthPeer peer,
final BlockHeader commonAncestor, final BlockHeader commonAncestor,
final int rangeTimeoutsPermitted, final int retriesPermitted,
final SyncTerminationCondition terminationCondition) { final SyncTerminationCondition terminationCondition) {
this( this(
fetcher, fetcher,
@ -68,7 +69,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
ethScheduler, ethScheduler,
peer, peer,
commonAncestor, commonAncestor,
rangeTimeoutsPermitted, retriesPermitted,
Duration.ofSeconds(5), Duration.ofSeconds(5),
terminationCondition); terminationCondition);
} }
@ -79,7 +80,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
final EthScheduler ethScheduler, final EthScheduler ethScheduler,
final EthPeer peer, final EthPeer peer,
final BlockHeader commonAncestor, final BlockHeader commonAncestor,
final int rangeTimeoutsPermitted, final int retriesPermitted,
final Duration newHeaderWaitDuration, final Duration newHeaderWaitDuration,
final SyncTerminationCondition terminationCondition) { final SyncTerminationCondition terminationCondition) {
this.fetcher = fetcher; this.fetcher = fetcher;
@ -87,7 +88,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
this.ethScheduler = ethScheduler; this.ethScheduler = ethScheduler;
this.peer = peer; this.peer = peer;
this.lastRangeEnd = commonAncestor; this.lastRangeEnd = commonAncestor;
this.rangeTimeoutsPermitted = rangeTimeoutsPermitted; this.retriesPermitted = retriesPermitted;
this.newHeaderWaitDuration = newHeaderWaitDuration; this.newHeaderWaitDuration = newHeaderWaitDuration;
this.terminationCondition = terminationCondition; this.terminationCondition = terminationCondition;
} }
@ -96,7 +97,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
public boolean hasNext() { public boolean hasNext() {
return terminationCondition.shouldContinueDownload() return terminationCondition.shouldContinueDownload()
&& (!retrievedRanges.isEmpty() && (!retrievedRanges.isEmpty()
|| (requestFailureCount < rangeTimeoutsPermitted || (retryCount < retriesPermitted
&& syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd) && syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd)
&& !reachedEndOfRanges)); && !reachedEndOfRanges));
} }
@ -148,13 +149,21 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
pendingRequest.get(newHeaderWaitDuration.toMillis(), MILLISECONDS); pendingRequest.get(newHeaderWaitDuration.toMillis(), MILLISECONDS);
this.pendingRequests = Optional.empty(); this.pendingRequests = Optional.empty();
if (newHeaders.isEmpty()) { if (newHeaders.isEmpty()) {
requestFailureCount++; retryCount++;
if (retryCount >= retriesPermitted) {
LOG.atDebug()
.setMessage(
"Disconnecting target peer for providing useless or empty range header: {}.")
.addArgument(peer)
.log();
peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_USELESS_RESPONSES);
}
} else { } else {
requestFailureCount = 0; retryCount = 0;
} for (final BlockHeader header : newHeaders) {
for (final BlockHeader header : newHeaders) { retrievedRanges.add(new SyncTargetRange(peer, lastRangeEnd, header));
retrievedRanges.add(new SyncTargetRange(peer, lastRangeEnd, header)); lastRangeEnd = header;
lastRangeEnd = header; }
} }
return retrievedRanges.poll(); return retrievedRanges.poll();
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
@ -163,7 +172,7 @@ public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
} catch (final ExecutionException e) { } catch (final ExecutionException e) {
LOG.debug("Failed to retrieve new range headers", e); LOG.debug("Failed to retrieve new range headers", e);
this.pendingRequests = Optional.empty(); this.pendingRequests = Optional.empty();
requestFailureCount++; retryCount++;
return null; return null;
} catch (final TimeoutException e) { } catch (final TimeoutException e) {
return null; return null;

@ -161,7 +161,7 @@ public class DynamicPivotBlockSelector {
.addArgument(this::logLastPivotBlockFound) .addArgument(this::logLastPivotBlockFound)
.log(); .log();
}) })
.orTimeout(5, TimeUnit.MINUTES); .orTimeout(20, TimeUnit.SECONDS);
} }
private boolean isSamePivotBlock(final FastSyncState fss) { private boolean isSamePivotBlock(final FastSyncState fss) {

@ -21,6 +21,7 @@ import static org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordina
import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.context.SnapSyncStatePersistenceManager; import org.hyperledger.besu.ethereum.eth.sync.snapsync.context.SnapSyncStatePersistenceManager;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.AccountRangeDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest;
@ -86,6 +87,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
// blockchain // blockchain
private final Blockchain blockchain; private final Blockchain blockchain;
private final Long blockObserverId; private final Long blockObserverId;
private final EthContext ethContext;
// metrics around the snapsync // metrics around the snapsync
private final SnapSyncMetricsManager metricsManager; private final SnapSyncMetricsManager metricsManager;
@ -99,7 +101,8 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
final int maxRequestsWithoutProgress, final int maxRequestsWithoutProgress,
final long minMillisBeforeStalling, final long minMillisBeforeStalling,
final SnapSyncMetricsManager metricsManager, final SnapSyncMetricsManager metricsManager,
final Clock clock) { final Clock clock,
final EthContext ethContext) {
super( super(
worldStateStorageCoordinator, worldStateStorageCoordinator,
pendingRequests, pendingRequests,
@ -111,6 +114,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
this.snapSyncState = snapSyncState; this.snapSyncState = snapSyncState;
this.metricsManager = metricsManager; this.metricsManager = metricsManager;
this.blockObserverId = blockchain.observeBlockAdded(createBlockchainObserver()); this.blockObserverId = blockchain.observeBlockAdded(createBlockchainObserver());
this.ethContext = ethContext;
metricsManager metricsManager
.getMetricsSystem() .getMetricsSystem()
@ -423,23 +427,29 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
} }
public BlockAddedObserver createBlockchainObserver() { public BlockAddedObserver createBlockchainObserver() {
return addedBlockContext -> { return addedBlockContext ->
final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false); ethContext
pivotBlockSelector.check( .getScheduler()
(____, isNewPivotBlock) -> { .executeServiceTask(
if (isNewPivotBlock) { () -> {
foundNewPivotBlock.set(true); final AtomicBoolean foundNewPivotBlock = new AtomicBoolean(false);
} pivotBlockSelector.check(
}); (____, isNewPivotBlock) -> {
if (isNewPivotBlock) {
final boolean isNewPivotBlockFound = foundNewPivotBlock.get(); foundNewPivotBlock.set(true);
final boolean isBlockchainCaughtUp = }
snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind(); });
if (snapSyncState.isHealTrieInProgress() && (isNewPivotBlockFound || isBlockchainCaughtUp)) { final boolean isNewPivotBlockFound = foundNewPivotBlock.get();
snapSyncState.setWaitingBlockchain(false); final boolean isBlockchainCaughtUp =
reloadTrieHeal(); snapSyncState.isWaitingBlockchain()
} && !pivotBlockSelector.isBlockchainBehind();
};
if (snapSyncState.isHealTrieInProgress()
&& (isNewPivotBlockFound || isBlockchainCaughtUp)) {
snapSyncState.setWaitingBlockchain(false);
reloadTrieHeal();
}
});
} }
} }

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.sync.snapsync; package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.DynamicPivotBlockSelector.doNothingOnPivotChange;
import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom; import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
@ -158,12 +159,19 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
private SnapSyncProcessState snapSyncState; private SnapSyncProcessState snapSyncState;
private PersistDataStep persistDataStep; private PersistDataStep persistDataStep;
private CompleteTaskStep completeTaskStep; private CompleteTaskStep completeTaskStep;
private DynamicPivotBlockSelector pivotBlockManager;
public Builder configuration(final SnapSyncConfiguration snapSyncConfiguration) { public Builder configuration(final SnapSyncConfiguration snapSyncConfiguration) {
this.snapSyncConfiguration = snapSyncConfiguration; this.snapSyncConfiguration = snapSyncConfiguration;
return this; return this;
} }
public Builder dynamicPivotBlockSelector(
final DynamicPivotBlockSelector dynamicPivotBlockSelector) {
this.pivotBlockManager = dynamicPivotBlockSelector;
return this;
}
public Builder maxOutstandingRequests(final int maxOutstandingRequests) { public Builder maxOutstandingRequests(final int maxOutstandingRequests) {
this.maxOutstandingRequests = maxOutstandingRequests; this.maxOutstandingRequests = maxOutstandingRequests;
return this; return this;
@ -257,6 +265,12 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
outputCounter, outputCounter,
true, true,
"world_state_download") "world_state_download")
.thenProcess(
"checkNewPivotBlock-Account",
tasks -> {
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsync( .thenProcessAsync(
"batchDownloadAccountData", "batchDownloadAccountData",
requestTask -> requestDataStep.requestAccount(requestTask), requestTask -> requestDataStep.requestAccount(requestTask),
@ -274,6 +288,12 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
true, true,
"world_state_download") "world_state_download")
.inBatches(snapSyncConfiguration.getStorageCountPerRequest()) .inBatches(snapSyncConfiguration.getStorageCountPerRequest())
.thenProcess(
"checkNewPivotBlock-Storage",
tasks -> {
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsyncOrdered( .thenProcessAsyncOrdered(
"batchDownloadStorageData", "batchDownloadStorageData",
requestTask -> requestDataStep.requestStorage(requestTask), requestTask -> requestDataStep.requestStorage(requestTask),
@ -294,6 +314,12 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
outputCounter, outputCounter,
true, true,
"world_state_download") "world_state_download")
.thenProcess(
"checkNewPivotBlock-LargeStorage",
tasks -> {
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsyncOrdered( .thenProcessAsyncOrdered(
"batchDownloadLargeStorageData", "batchDownloadLargeStorageData",
requestTask -> requestDataStep.requestStorage(List.of(requestTask)), requestTask -> requestDataStep.requestStorage(List.of(requestTask)),
@ -328,6 +354,14 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
.map(BytecodeRequest::getCodeHash) .map(BytecodeRequest::getCodeHash)
.distinct() .distinct()
.count()) .count())
.thenProcess(
"checkNewPivotBlock-Code",
tasks -> {
pivotBlockManager.check(
(blockHeader, newBlockFound) ->
reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound));
return tasks;
})
.thenProcessAsyncOrdered( .thenProcessAsyncOrdered(
"batchDownloadCodeData", "batchDownloadCodeData",
tasks -> requestDataStep.requestCode(tasks), tasks -> requestDataStep.requestCode(tasks),
@ -356,6 +390,14 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
3, 3,
bufferCapacity) bufferCapacity)
.inBatches(snapSyncConfiguration.getTrienodeCountPerRequest()) .inBatches(snapSyncConfiguration.getTrienodeCountPerRequest())
.thenProcess(
"checkNewPivotBlock-TrieNode",
tasks -> {
pivotBlockManager.check(
(blockHeader, newBlockFound) ->
reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound));
return tasks;
})
.thenProcessAsync( .thenProcessAsync(
"batchDownloadTrieNodeData", "batchDownloadTrieNodeData",
tasks -> requestDataStep.requestTrieNodeByPath(tasks), tasks -> requestDataStep.requestTrieNodeByPath(tasks),
@ -419,4 +461,13 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
requestsToComplete); requestsToComplete);
} }
} }
private static void reloadHealWhenNeeded(
final SnapSyncProcessState snapSyncState,
final SnapWorldDownloadState downloadState,
final boolean newBlockFound) {
if (snapSyncState.isHealTrieInProgress() && newBlockFound) {
downloadState.reloadTrieHeal();
}
}
} }

@ -147,7 +147,8 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
maxNodeRequestsWithoutProgress, maxNodeRequestsWithoutProgress,
minMillisBeforeStalling, minMillisBeforeStalling,
snapsyncMetricsManager, snapsyncMetricsManager,
clock); clock,
ethContext);
final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16); final Map<Bytes32, Bytes32> ranges = RangeManager.generateAllRanges(16);
snapsyncMetricsManager.initRange(ranges); snapsyncMetricsManager.initRange(ranges);
@ -214,6 +215,7 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
SnapWorldStateDownloadProcess.builder() SnapWorldStateDownloadProcess.builder()
.configuration(snapSyncConfiguration) .configuration(snapSyncConfiguration)
.maxOutstandingRequests(maxOutstandingRequests) .maxOutstandingRequests(maxOutstandingRequests)
.dynamicPivotBlockSelector(dynamicPivotBlockManager)
.loadLocalDataStep( .loadLocalDataStep(
new LoadLocalDataStep( new LoadLocalDataStep(
worldStateStorageCoordinator, worldStateStorageCoordinator,

@ -33,6 +33,8 @@ import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.context.SnapSyncStatePersistenceManager; import org.hyperledger.besu.ethereum.eth.sync.snapsync.context.SnapSyncStatePersistenceManager;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.BytecodeRequest;
@ -90,6 +92,7 @@ public class SnapWorldDownloadStateTest {
private final Blockchain blockchain = mock(Blockchain.class); private final Blockchain blockchain = mock(Blockchain.class);
private final DynamicPivotBlockSelector dynamicPivotBlockManager = private final DynamicPivotBlockSelector dynamicPivotBlockManager =
mock(DynamicPivotBlockSelector.class); mock(DynamicPivotBlockSelector.class);
private final EthContext ethContext = mock(EthContext.class);
private final TestClock clock = new TestClock(); private final TestClock clock = new TestClock();
private SnapWorldDownloadState downloadState; private SnapWorldDownloadState downloadState;
@ -132,7 +135,8 @@ public class SnapWorldDownloadStateTest {
MAX_REQUESTS_WITHOUT_PROGRESS, MAX_REQUESTS_WITHOUT_PROGRESS,
MIN_MILLIS_BEFORE_STALLING, MIN_MILLIS_BEFORE_STALLING,
metricsManager, metricsManager,
clock); clock,
ethContext);
final DynamicPivotBlockSelector dynamicPivotBlockManager = final DynamicPivotBlockSelector dynamicPivotBlockManager =
mock(DynamicPivotBlockSelector.class); mock(DynamicPivotBlockSelector.class);
doAnswer( doAnswer(
@ -147,6 +151,17 @@ public class SnapWorldDownloadStateTest {
downloadState.setRootNodeData(ROOT_NODE_DATA); downloadState.setRootNodeData(ROOT_NODE_DATA);
future = downloadState.getDownloadFuture(); future = downloadState.getDownloadFuture();
assertThat(downloadState.isDownloading()).isTrue(); assertThat(downloadState.isDownloading()).isTrue();
final EthScheduler ethScheduler = mock(EthScheduler.class);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
doAnswer(
invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
})
.when(ethScheduler)
.executeServiceTask(any(Runnable.class));
} }
@ParameterizedTest @ParameterizedTest

Loading…
Cancel
Save