Fix worldstate halt with snap sync during initial sync (#6981)

Signed-off-by: Karim Taam <karim.t2am@gmail.com>
Signed-off-by: Jason Frame <jason.frame@consensys.net>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
pull/6990/head
Jason Frame 7 months ago committed by GitHub
parent 5ac225442f
commit e4e9f670fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 8
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/CompleteTaskStep.java
  3. 15
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java
  4. 19
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldDownloadState.java
  5. 51
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java
  6. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloader.java

@ -53,6 +53,7 @@
- Fix to avoid broadcasting full blob txs, instead of only the tx announcement, to a subset of nodes [#6835](https://github.com/hyperledger/besu/pull/6835)
- Snap client fixes discovered during snap server testing [#6847](https://github.com/hyperledger/besu/pull/6847)
- Correctly initialize the txpool as disabled on creation [#6890](https://github.com/hyperledger/besu/pull/6890)
- Fix worldstate download halt when using snap sync during initial sync [#6981](https://github.com/hyperledger/besu/pull/6981)
### Download Links

@ -43,9 +43,11 @@ public class CompleteTaskStep {
public synchronized void markAsCompleteOrFailed(
final SnapWorldDownloadState downloadState, final Task<SnapDataRequest> task) {
if (task.getData().isResponseReceived()
|| (task.getData() instanceof TrieNodeHealingRequest
&& task.getData().isExpired(snapSyncState))) {
final boolean isResponseReceived = task.getData().isResponseReceived();
final boolean isExpiredRequest =
task.getData() instanceof TrieNodeHealingRequest && task.getData().isExpired(snapSyncState);
// if pivot block has changed, the request is expired and we mark this one completed
if (isResponseReceived || isExpiredRequest) {
completedRequestsCounter.inc();
task.markCompleted();
downloadState.checkCompletion(snapSyncState.getPivotBlockHeader().orElseThrow());

@ -44,6 +44,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
@ -95,10 +96,11 @@ public class RequestDataStep {
downloadState.addOutstandingTask(getAccountTask);
return getAccountTask
.run()
.orTimeout(10, TimeUnit.SECONDS)
.handle(
(response, error) -> {
downloadState.removeOutstandingTask(getAccountTask);
if (response != null) {
downloadState.removeOutstandingTask(getAccountTask);
accountDataRequest.setRootHash(blockHeader.getStateRoot());
accountDataRequest.addResponse(
worldStateProofProvider, response.accounts(), response.proofs());
@ -130,13 +132,12 @@ public class RequestDataStep {
downloadState.addOutstandingTask(getStorageRangeTask);
return getStorageRangeTask
.run()
.orTimeout(10, TimeUnit.SECONDS)
.handle(
(response, error) -> {
downloadState.removeOutstandingTask(getStorageRangeTask);
if (response != null) {
downloadState.removeOutstandingTask(getStorageRangeTask);
final ArrayDeque<NavigableMap<Bytes32, Bytes>> slots = new ArrayDeque<>();
// Check if we have an empty range
/*
* Checks if the response represents an "empty range".
*
@ -186,10 +187,11 @@ public class RequestDataStep {
downloadState.addOutstandingTask(getByteCodeTask);
return getByteCodeTask
.run()
.orTimeout(10, TimeUnit.SECONDS)
.handle(
(response, error) -> {
downloadState.removeOutstandingTask(getByteCodeTask);
if (response != null) {
downloadState.removeOutstandingTask(getByteCodeTask);
for (Task<SnapDataRequest> requestTask : requestTasks) {
final BytecodeRequest request = (BytecodeRequest) requestTask.getData();
request.setRootHash(blockHeader.getStateRoot());
@ -225,10 +227,11 @@ public class RequestDataStep {
downloadState.addOutstandingTask(getTrieNodeFromPeerTask);
return getTrieNodeFromPeerTask
.run()
.orTimeout(10, TimeUnit.SECONDS)
.handle(
(response, error) -> {
downloadState.removeOutstandingTask(getTrieNodeFromPeerTask);
if (response != null) {
downloadState.removeOutstandingTask(getTrieNodeFromPeerTask);
for (final Task<SnapDataRequest> task : requestTasks) {
final TrieNodeHealingRequest request = (TrieNodeHealingRequest) task.getData();
final Bytes matchingData = response.get(request.getPathId());

@ -46,7 +46,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@ -86,7 +85,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
// blockchain
private final Blockchain blockchain;
private OptionalLong blockObserverId;
private final Long blockObserverId;
// metrics around the snapsync
private final SnapSyncMetricsManager metricsManager;
@ -111,7 +110,8 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
this.blockchain = blockchain;
this.snapSyncState = snapSyncState;
this.metricsManager = metricsManager;
this.blockObserverId = OptionalLong.empty();
this.blockObserverId = blockchain.observeBlockAdded(createBlockchainObserver());
metricsManager
.getMetricsSystem()
.createLongGauge(
@ -174,11 +174,6 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
// if all snapsync tasks are completed and the healing process was not running
if (!snapSyncState.isHealTrieInProgress()) {
// Register blockchain observer if not already registered
blockObserverId =
blockObserverId.isEmpty()
? OptionalLong.of(blockchain.observeBlockAdded(createBlockchainObserver()))
: blockObserverId;
// Start the healing process
startTrieHeal();
}
@ -192,8 +187,6 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
// if all snapsync tasks are completed and the healing was running and the blockchain is not
// behind the pivot block
else {
// Remove the blockchain observer
blockObserverId.ifPresent(blockchain::removeObserver);
// If the flat database healing process is not in progress and the flat database mode is
// FULL
if (!snapSyncState.isHealFlatDatabaseInProgress()
@ -213,6 +206,8 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
});
updater.commit();
// Remove the blockchain observer
blockchain.removeObserver(blockObserverId);
// Notify that the snap sync has completed
metricsManager.notifySnapSyncCompleted();
// Clear the snap context
@ -441,9 +436,7 @@ public class SnapWorldDownloadState extends WorldDownloadState<SnapDataRequest>
final boolean isBlockchainCaughtUp =
snapSyncState.isWaitingBlockchain() && !pivotBlockSelector.isBlockchainBehind();
if (isNewPivotBlockFound
|| isBlockchainCaughtUp) { // restart heal if we found a new pivot block or if close to
// head again
if (snapSyncState.isHealTrieInProgress() && (isNewPivotBlockFound || isBlockchainCaughtUp)) {
snapSyncState.setWaitingBlockchain(false);
reloadTrieHeal();
}

@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.hyperledger.besu.ethereum.eth.sync.snapsync.DynamicPivotBlockSelector.doNothingOnPivotChange;
import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
@ -159,19 +158,12 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
private SnapSyncProcessState snapSyncState;
private PersistDataStep persistDataStep;
private CompleteTaskStep completeTaskStep;
private DynamicPivotBlockSelector pivotBlockManager;
public Builder configuration(final SnapSyncConfiguration snapSyncConfiguration) {
this.snapSyncConfiguration = snapSyncConfiguration;
return this;
}
public Builder dynamicPivotBlockSelector(
final DynamicPivotBlockSelector dynamicPivotBlockSelector) {
this.pivotBlockManager = dynamicPivotBlockSelector;
return this;
}
public Builder maxOutstandingRequests(final int maxOutstandingRequests) {
this.maxOutstandingRequests = maxOutstandingRequests;
return this;
@ -265,12 +257,6 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
outputCounter,
true,
"world_state_download")
.thenProcess(
"checkNewPivotBlock-Account",
tasks -> {
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsync(
"batchDownloadAccountData",
requestTask -> requestDataStep.requestAccount(requestTask),
@ -288,12 +274,6 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
true,
"world_state_download")
.inBatches(snapSyncConfiguration.getStorageCountPerRequest())
.thenProcess(
"checkNewPivotBlock-Storage",
tasks -> {
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsyncOrdered(
"batchDownloadStorageData",
requestTask -> requestDataStep.requestStorage(requestTask),
@ -314,12 +294,6 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
outputCounter,
true,
"world_state_download")
.thenProcess(
"checkNewPivotBlock-LargeStorage",
tasks -> {
pivotBlockManager.check(doNothingOnPivotChange);
return tasks;
})
.thenProcessAsyncOrdered(
"batchDownloadLargeStorageData",
requestTask -> requestDataStep.requestStorage(List.of(requestTask)),
@ -354,14 +328,6 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
.map(BytecodeRequest::getCodeHash)
.distinct()
.count())
.thenProcess(
"checkNewPivotBlock-Code",
tasks -> {
pivotBlockManager.check(
(blockHeader, newBlockFound) ->
reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound));
return tasks;
})
.thenProcessAsyncOrdered(
"batchDownloadCodeData",
tasks -> requestDataStep.requestCode(tasks),
@ -390,14 +356,6 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
3,
bufferCapacity)
.inBatches(snapSyncConfiguration.getTrienodeCountPerRequest())
.thenProcess(
"checkNewPivotBlock-TrieNode",
tasks -> {
pivotBlockManager.check(
(blockHeader, newBlockFound) ->
reloadHealWhenNeeded(snapSyncState, downloadState, newBlockFound));
return tasks;
})
.thenProcessAsync(
"batchDownloadTrieNodeData",
tasks -> requestDataStep.requestTrieNodeByPath(tasks),
@ -461,13 +419,4 @@ public class SnapWorldStateDownloadProcess implements WorldStateDownloadProcess
requestsToComplete);
}
}
private static void reloadHealWhenNeeded(
final SnapSyncProcessState snapSyncState,
final SnapWorldDownloadState downloadState,
final boolean newBlockFound) {
if (snapSyncState.isHealTrieInProgress() && newBlockFound) {
downloadState.reloadTrieHeal();
}
}
}

@ -214,7 +214,6 @@ public class SnapWorldStateDownloader implements WorldStateDownloader {
SnapWorldStateDownloadProcess.builder()
.configuration(snapSyncConfiguration)
.maxOutstandingRequests(maxOutstandingRequests)
.dynamicPivotBlockSelector(dynamicPivotBlockManager)
.loadLocalDataStep(
new LoadLocalDataStep(
worldStateStorageCoordinator,

Loading…
Cancel
Save