From 110052c9f3990dce7e3035579a692ab9bb75daec Mon Sep 17 00:00:00 2001 From: matkt Date: Tue, 21 Dec 2021 11:21:49 +0100 Subject: [PATCH] fastsync refactor (#3179) Light refactor of the fastsync code to facilitate the implementation of the snapsync Signed-off-by: Karim TAAM --- .../worldstate/StateTrieAccountValue.java | 20 +- .../WorldStateDownloaderBenchmark.java | 8 +- .../eth/sync/CheckpointHeaderFetcher.java | 17 +- .../sync/fastsync/FastDownloaderFactory.java | 5 +- .../eth/sync/fastsync/FastSyncActions.java | 2 +- .../fastsync/FastSyncChainDownloader.java | 7 +- .../FastSyncDownloadPipelineFactory.java | 15 +- .../eth/sync/fastsync/FastSyncDownloader.java | 8 +- .../eth/sync/fastsync/FastSyncState.java | 38 +-- .../sync/fastsync/FastSyncTargetManager.java | 10 +- .../AccountTrieNodeDataRequest.java | 2 +- .../worldstate/CodeNodeDataRequest.java | 2 +- .../worldstate/CompleteTaskStep.java | 7 +- .../worldstate/FastWorldDownloadState.java | 65 +++++ .../FastWorldStateDownloadProcess.java | 210 ++++++++++++++++ .../worldstate/FastWorldStateDownloader.java | 177 ++++++++++++++ .../worldstate/LoadLocalDataStep.java | 2 +- .../worldstate/NodeDataRequest.java | 2 +- .../worldstate/PersistDataStep.java | 5 +- .../worldstate/RequestDataStep.java | 7 +- .../worldstate/RequestType.java | 2 +- .../StorageTrieNodeDataRequest.java | 2 +- .../worldstate/TrieNodeDataRequest.java | 2 +- .../FullSyncDownloadPipelineFactory.java | 5 +- .../sync/worldstate/TaskQueueIterator.java | 8 +- .../sync/worldstate/WorldDownloadState.java | 96 ++++---- .../worldstate/WorldStateDownloadProcess.java | 226 +----------------- .../worldstate/WorldStateDownloadStatus.java | 4 +- .../sync/worldstate/WorldStateDownloader.java | 154 +----------- .../eth/sync/CheckpointHeaderFetcherTest.java | 40 ++-- .../fastsync/FastSyncChainDownloaderTest.java | 2 +- .../sync/fastsync/FastSyncDownloaderTest.java | 84 +++++-- .../worldstate/CompleteTaskStepTest.java | 5 +- .../FastWorldDownloadStateTest.java} | 14 +- .../FastWorldStateDownloaderTest.java} | 62 +++-- .../worldstate/LoadLocalDataStepTest.java | 3 +- .../worldstate/NodeDataRequestTest.java | 2 +- .../worldstate/PersistDataStepTest.java | 5 +- .../worldstate/RequestDataStepTest.java | 5 +- .../eth/sync/worldstate/StubTask.java | 1 + 40 files changed, 767 insertions(+), 564 deletions(-) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/AccountTrieNodeDataRequest.java (98%) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/CodeNodeDataRequest.java (96%) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/CompleteTaskStep.java (93%) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldDownloadState.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloadProcess.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloader.java rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/LoadLocalDataStep.java (96%) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/NodeDataRequest.java (98%) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/PersistDataStep.java (89%) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/RequestDataStep.java (93%) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/RequestType.java (94%) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/StorageTrieNodeDataRequest.java (98%) rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/TrieNodeDataRequest.java (97%) rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/CompleteTaskStepTest.java (94%) rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/{worldstate/WorldDownloadStateTest.java => fastsync/worldstate/FastWorldDownloadStateTest.java} (94%) rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/{worldstate/WorldStateDownloaderTest.java => fastsync/worldstate/FastWorldStateDownloaderTest.java} (95%) rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/LoadLocalDataStepTest.java (95%) rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/NodeDataRequestTest.java (98%) rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/PersistDataStepTest.java (95%) rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/{ => fastsync}/worldstate/RequestDataStepTest.java (95%) diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/StateTrieAccountValue.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/StateTrieAccountValue.java index b0dd3005cf..a99675a800 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/StateTrieAccountValue.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/StateTrieAccountValue.java @@ -23,6 +23,8 @@ import org.hyperledger.besu.ethereum.rlp.RLPOutput; import java.util.Objects; +import org.apache.tuweni.bytes.Bytes32; + /** Represents the raw values associated with an account in the world state trie. */ public class StateTrieAccountValue { @@ -110,11 +112,23 @@ public class StateTrieAccountValue { final long nonce = in.readLongScalar(); final Wei balance = Wei.of(in.readUInt256Scalar()); - final Hash storageRoot = Hash.wrap(in.readBytes32()); - final Hash codeHash = Hash.wrap(in.readBytes32()); + Bytes32 storageRoot; + Bytes32 codeHash; + if (in.nextIsNull()) { + storageRoot = Hash.EMPTY_TRIE_HASH; + in.skipNext(); + } else { + storageRoot = in.readBytes32(); + } + if (in.nextIsNull()) { + codeHash = Hash.EMPTY; + in.skipNext(); + } else { + codeHash = in.readBytes32(); + } in.leaveList(); - return new StateTrieAccountValue(nonce, balance, storageRoot, codeHash); + return new StateTrieAccountValue(nonce, balance, Hash.wrap(storageRoot), Hash.wrap(codeHash)); } } diff --git a/ethereum/eth/src/jmh/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java b/ethereum/eth/src/jmh/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java index 0bdd81ebce..8ea6df2f53 100644 --- a/ethereum/eth/src/jmh/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java +++ b/ethereum/eth/src/jmh/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java @@ -32,6 +32,9 @@ import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer.Responder; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastWorldStateDownloader; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.NodeDataRequest; import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier; import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStorageProviderBuilder; @@ -113,7 +116,7 @@ public class WorldStateDownloaderBenchmark { NodeDataRequest::deserialize), 0); worldStateDownloader = - new WorldStateDownloader( + new FastWorldStateDownloader( ethContext, worldStateStorage, pendingRequests, @@ -147,7 +150,8 @@ public class WorldStateDownloaderBenchmark { @Benchmark public Optional downloadWorldState() { - final CompletableFuture result = worldStateDownloader.run(blockHeader); + final CompletableFuture result = + worldStateDownloader.run(null, new FastSyncState(blockHeader)); if (result.isDone()) { throw new IllegalStateException("World state download was already complete"); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointHeaderFetcher.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointHeaderFetcher.java index dbb3215b60..e3c8f596fb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointHeaderFetcher.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointHeaderFetcher.java @@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -40,19 +41,27 @@ public class CheckpointHeaderFetcher { private final ProtocolSchedule protocolSchedule; private final EthContext ethContext; // The checkpoint we're aiming to reach at the end of this sync. - private final Optional finalCheckpointHeader; + private final FastSyncState fastSyncState; private final MetricsSystem metricsSystem; public CheckpointHeaderFetcher( final SynchronizerConfiguration syncConfig, final ProtocolSchedule protocolSchedule, final EthContext ethContext, - final Optional finalCheckpointHeader, + final MetricsSystem metricsSystem) { + this(syncConfig, protocolSchedule, ethContext, new FastSyncState(), metricsSystem); + } + + public CheckpointHeaderFetcher( + final SynchronizerConfiguration syncConfig, + final ProtocolSchedule protocolSchedule, + final EthContext ethContext, + final FastSyncState fastSyncState, final MetricsSystem metricsSystem) { this.syncConfig = syncConfig; this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; - this.finalCheckpointHeader = finalCheckpointHeader; + this.fastSyncState = fastSyncState; this.metricsSystem = metricsSystem; } @@ -63,6 +72,7 @@ public class CheckpointHeaderFetcher { final long previousCheckpointNumber = previousCheckpointHeader.getNumber(); final int additionalHeaderCount; + final Optional finalCheckpointHeader = fastSyncState.getPivotBlockHeader(); if (finalCheckpointHeader.isPresent()) { final BlockHeader targetHeader = finalCheckpointHeader.get(); final long blocksUntilTarget = targetHeader.getNumber() - previousCheckpointNumber; @@ -116,6 +126,7 @@ public class CheckpointHeaderFetcher { public boolean nextCheckpointEndsAtChainHead( final EthPeer peer, final BlockHeader previousCheckpointHeader) { + final Optional finalCheckpointHeader = fastSyncState.getPivotBlockHeader(); if (finalCheckpointHeader.isPresent()) { return false; } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactory.java index 7491f8f39e..71c3c0013f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactory.java @@ -20,8 +20,9 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastWorldStateDownloader; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.NodeDataRequest; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; -import org.hyperledger.besu.ethereum.eth.sync.worldstate.NodeDataRequest; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; @@ -89,7 +90,7 @@ public class FastDownloaderFactory { metricsSystem, syncConfig.getWorldStateTaskCacheSize()); final WorldStateDownloader worldStateDownloader = - new WorldStateDownloader( + new FastWorldStateDownloader( ethContext, worldStateStorage, taskCollection, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java index e0cfa6c439..8c2c1c0870 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -198,6 +198,6 @@ public class FastSyncActions { ethContext, syncState, metricsSystem, - currentState.getPivotBlockHeader().get()); + currentState); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java index 19ab1d6c11..bc6f9f1c72 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java @@ -15,7 +15,6 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync; import org.hyperledger.besu.ethereum.ProtocolContext; -import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader; @@ -35,17 +34,17 @@ public class FastSyncChainDownloader { final EthContext ethContext, final SyncState syncState, final MetricsSystem metricsSystem, - final BlockHeader pivotBlockHeader) { + final FastSyncState fastSyncState) { final FastSyncTargetManager syncTargetManager = new FastSyncTargetManager( - config, protocolSchedule, protocolContext, ethContext, metricsSystem, pivotBlockHeader); + config, protocolSchedule, protocolContext, ethContext, metricsSystem, fastSyncState); return new PipelineChainDownloader( syncState, syncTargetManager, new FastSyncDownloadPipelineFactory( - config, protocolSchedule, protocolContext, ethContext, pivotBlockHeader, metricsSystem), + config, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem), ethContext.getScheduler(), metricsSystem); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java index 26723804f6..11bd55c6f5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java @@ -42,14 +42,12 @@ import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.services.pipeline.Pipeline; import org.hyperledger.besu.services.pipeline.PipelineBuilder; -import java.util.Optional; - public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory { private final SynchronizerConfiguration syncConfig; private final ProtocolSchedule protocolSchedule; private final ProtocolContext protocolContext; private final EthContext ethContext; - private final BlockHeader pivotBlockHeader; + private final FastSyncState fastSyncState; private final MetricsSystem metricsSystem; private final FastSyncValidationPolicy attachedValidationPolicy; private final FastSyncValidationPolicy detachedValidationPolicy; @@ -60,13 +58,13 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, - final BlockHeader pivotBlockHeader, + final FastSyncState fastSyncState, final MetricsSystem metricsSystem) { this.syncConfig = syncConfig; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; - this.pivotBlockHeader = pivotBlockHeader; + this.fastSyncState = fastSyncState; this.metricsSystem = metricsSystem; final LabelledMetric fastSyncValidationCounter = metricsSystem.createLabelledCounter( @@ -102,11 +100,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory final CheckpointRangeSource checkpointRangeSource = new CheckpointRangeSource( new CheckpointHeaderFetcher( - syncConfig, - protocolSchedule, - ethContext, - Optional.of(pivotBlockHeader), - metricsSystem), + syncConfig, protocolSchedule, ethContext, fastSyncState, metricsSystem), this::shouldContinueDownloadingFromPeer, ethContext.getScheduler(), target.peer(), @@ -157,6 +151,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory private boolean shouldContinueDownloadingFromPeer( final EthPeer peer, final BlockHeader lastCheckpointHeader) { + final BlockHeader pivotBlockHeader = fastSyncState.getPivotBlockHeader().get(); return !peer.isDisconnected() && lastCheckpointHeader.getNumber() < pivotBlockHeader.getNumber(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java index 67a1676111..499f5c0550 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java @@ -19,7 +19,7 @@ import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose; import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements; -import org.hyperledger.besu.ethereum.eth.sync.worldstate.NodeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.NodeDataRequest; import org.hyperledger.besu.ethereum.eth.sync.worldstate.StalledDownloadException; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; @@ -90,7 +90,7 @@ public class FastSyncDownloader { .thenCompose(fastSyncActions::downloadPivotBlockHeader) .thenApply(this::updateMaxTrailingPeers) .thenApply(this::storeState) - .thenCompose(this::downloadChainAndWorldState), + .thenCompose(fss -> downloadChainAndWorldState(fastSyncActions, fss)), this::handleFailure); } @@ -155,7 +155,7 @@ public class FastSyncDownloader { } private CompletableFuture downloadChainAndWorldState( - final FastSyncState currentState) { + final FastSyncActions fastSyncActions, final FastSyncState currentState) { // Synchronized ensures that stop isn't called while we're in the process of starting a // world state and chain download. If it did we might wind up starting a new download // after the stop method had called cancel. @@ -165,7 +165,7 @@ public class FastSyncDownloader { new CancellationException("FastSyncDownloader stopped")); } final CompletableFuture worldStateFuture = - worldStateDownloader.run(currentState.getPivotBlockHeader().get()); + worldStateDownloader.run(fastSyncActions, currentState); final ChainDownloader chainDownloader = fastSyncActions.createChainDownloader(currentState); final CompletableFuture chainFuture = chainDownloader.start(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncState.java index 342cf74a09..688c6fb738 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncState.java @@ -20,15 +20,18 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; -import com.google.common.base.MoreObjects; - public class FastSyncState { public static FastSyncState EMPTY_SYNC_STATE = new FastSyncState(OptionalLong.empty(), Optional.empty()); - private final OptionalLong pivotBlockNumber; - private final Optional pivotBlockHeader; + private OptionalLong pivotBlockNumber; + private Optional pivotBlockHeader; + + public FastSyncState() { + pivotBlockNumber = OptionalLong.empty(); + pivotBlockHeader = Optional.empty(); + } public FastSyncState(final long pivotBlockNumber) { this(OptionalLong.of(pivotBlockNumber), Optional.empty()); @@ -38,7 +41,7 @@ public class FastSyncState { this(OptionalLong.of(pivotBlockHeader.getNumber()), Optional.of(pivotBlockHeader)); } - private FastSyncState( + protected FastSyncState( final OptionalLong pivotBlockNumber, final Optional pivotBlockHeader) { this.pivotBlockNumber = pivotBlockNumber; this.pivotBlockHeader = pivotBlockHeader; @@ -56,15 +59,16 @@ public class FastSyncState { return pivotBlockHeader.isPresent(); } + public void setCurrentHeader(final BlockHeader header) { + pivotBlockNumber = OptionalLong.of(header.getNumber()); + pivotBlockHeader = Optional.of(header); + } + @Override public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final FastSyncState that = (FastSyncState) o; + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FastSyncState that = (FastSyncState) o; return Objects.equals(pivotBlockNumber, that.pivotBlockNumber) && Objects.equals(pivotBlockHeader, that.pivotBlockHeader); } @@ -76,9 +80,11 @@ public class FastSyncState { @Override public String toString() { - return MoreObjects.toStringHelper(this) - .add("pivotBlockNumber", pivotBlockNumber) - .add("pivotBlockHeader", pivotBlockHeader) - .toString(); + return "FastSyncState{" + + "pivotBlockNumber=" + + pivotBlockNumber + + ", pivotBlockHeader=" + + pivotBlockHeader + + '}'; } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncTargetManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncTargetManager.java index a3245aa61e..ec6d1ce662 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncTargetManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncTargetManager.java @@ -42,7 +42,7 @@ class FastSyncTargetManager extends SyncTargetManager { private final ProtocolContext protocolContext; private final EthContext ethContext; private final MetricsSystem metricsSystem; - private final BlockHeader pivotBlockHeader; + private final FastSyncState fastSyncState; public FastSyncTargetManager( final SynchronizerConfiguration config, @@ -50,17 +50,18 @@ class FastSyncTargetManager extends SyncTargetManager { final ProtocolContext protocolContext, final EthContext ethContext, final MetricsSystem metricsSystem, - final BlockHeader pivotBlockHeader) { + final FastSyncState fastSyncState) { super(config, protocolSchedule, protocolContext, ethContext, metricsSystem); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; this.metricsSystem = metricsSystem; - this.pivotBlockHeader = pivotBlockHeader; + this.fastSyncState = fastSyncState; } @Override protected CompletableFuture> selectBestAvailableSyncTarget() { + final BlockHeader pivotBlockHeader = fastSyncState.getPivotBlockHeader().get(); final Optional maybeBestPeer = ethContext.getEthPeers().bestPeerWithHeightEstimate(); if (!maybeBestPeer.isPresent()) { LOG.info("No sync target, waiting for peers: {}", ethContext.getEthPeers().peerCount()); @@ -79,6 +80,7 @@ class FastSyncTargetManager extends SyncTargetManager { } private CompletableFuture> confirmPivotBlockHeader(final EthPeer bestPeer) { + final BlockHeader pivotBlockHeader = fastSyncState.getPivotBlockHeader().get(); final RetryingGetHeaderFromPeerByNumberTask task = RetryingGetHeaderFromPeerByNumberTask.forSingleNumber( protocolSchedule, @@ -113,11 +115,13 @@ class FastSyncTargetManager extends SyncTargetManager { } private boolean peerHasDifferentPivotBlock(final List result) { + final BlockHeader pivotBlockHeader = fastSyncState.getPivotBlockHeader().get(); return result.size() != 1 || !result.get(0).equals(pivotBlockHeader); } @Override public boolean shouldContinueDownloading() { + final BlockHeader pivotBlockHeader = fastSyncState.getPivotBlockHeader().get(); return !protocolContext.getBlockchain().getChainHeadHash().equals(pivotBlockHeader.getHash()); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/AccountTrieNodeDataRequest.java similarity index 98% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/AccountTrieNodeDataRequest.java index f0364698e0..6ee9e4dbf8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/AccountTrieNodeDataRequest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CodeNodeDataRequest.java similarity index 96% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CodeNodeDataRequest.java index 1286561ef1..03add976fb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CodeNodeDataRequest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.rlp.RLPOutput; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CompleteTaskStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStep.java similarity index 93% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CompleteTaskStep.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStep.java index 381b51f063..7312d6b105 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CompleteTaskStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStep.java @@ -12,9 +12,10 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.RunnableCounter; @@ -58,7 +59,7 @@ public class CompleteTaskStep { public void markAsCompleteOrFailed( final BlockHeader header, - final WorldDownloadState downloadState, + final WorldDownloadState downloadState, final Task task) { if (task.getData().getData() != null) { enqueueChildren(task, header, downloadState); @@ -92,7 +93,7 @@ public class CompleteTaskStep { private void enqueueChildren( final Task task, final BlockHeader blockHeader, - final WorldDownloadState downloadState) { + final WorldDownloadState downloadState) { final NodeDataRequest request = task.getData(); // Only queue rootnode children if we started from scratch if (!downloadState.downloadWasResumed() || !isRootState(blockHeader, request)) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldDownloadState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldDownloadState.java new file mode 100644 index 0000000000..5bf6055029 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldDownloadState.java @@ -0,0 +1,65 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; + +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater; +import org.hyperledger.besu.services.tasks.CachingTaskCollection; + +import java.time.Clock; +import java.util.Optional; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes; + +public class FastWorldDownloadState extends WorldDownloadState { + private static final Logger LOG = LogManager.getLogger(); + + public FastWorldDownloadState( + final CachingTaskCollection pendingRequests, + final int maxRequestsWithoutProgress, + final long minMillisBeforeStalling, + final Clock clock) { + super(pendingRequests, maxRequestsWithoutProgress, minMillisBeforeStalling, clock); + } + + @Override + public synchronized boolean checkCompletion( + final WorldStateStorage worldStateStorage, final BlockHeader header) { + if (!internalFuture.isDone() && pendingRequests.allTasksCompleted()) { + if (rootNodeData == null) { + enqueueRequest( + NodeDataRequest.createAccountDataRequest( + header.getStateRoot(), Optional.of(Bytes.EMPTY))); + return false; + } + final Updater updater = worldStateStorage.updater(); + updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData); + updater.commit(); + + internalFuture.complete(null); + // THere are no more inputs to process so make sure we wake up any threads waiting to dequeue + // so they can give up waiting. + notifyAll(); + LOG.info("Finished downloading world state from peers"); + return true; + } else { + return false; + } + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloadProcess.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloadProcess.java new file mode 100644 index 0000000000..bd4390235d --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloadProcess.java @@ -0,0 +1,210 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom; + +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.TaskQueueIterator; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadProcess; +import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.Counter; +import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; +import org.hyperledger.besu.services.pipeline.Pipe; +import org.hyperledger.besu.services.pipeline.Pipeline; +import org.hyperledger.besu.services.pipeline.PipelineBuilder; +import org.hyperledger.besu.services.pipeline.WritePipe; +import org.hyperledger.besu.services.tasks.Task; +import org.hyperledger.besu.util.ExceptionUtils; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class FastWorldStateDownloadProcess implements WorldStateDownloadProcess { + private static final Logger LOG = LogManager.getLogger(); + private final Pipeline> fetchDataPipeline; + private final Pipeline> completionPipeline; + private final WritePipe> requestsToComplete; + + private FastWorldStateDownloadProcess( + final Pipeline> fetchDataPipeline, + final Pipeline> completionPipeline, + final WritePipe> requestsToComplete) { + this.fetchDataPipeline = fetchDataPipeline; + this.completionPipeline = completionPipeline; + this.requestsToComplete = requestsToComplete; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public CompletableFuture start(final EthScheduler ethScheduler) { + final CompletableFuture fetchDataFuture = ethScheduler.startPipeline(fetchDataPipeline); + final CompletableFuture completionFuture = ethScheduler.startPipeline(completionPipeline); + + fetchDataFuture.whenComplete( + (result, error) -> { + if (error != null) { + if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) { + LOG.error("Pipeline failed", error); + } + completionPipeline.abort(); + } else { + // No more data to fetch, so propagate the pipe closure onto the completion pipe. + requestsToComplete.close(); + } + }); + + completionFuture.exceptionally( + error -> { + if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) { + LOG.error("Pipeline failed", error); + } + fetchDataPipeline.abort(); + return null; + }); + return completionFuture; + } + + @Override + public void abort() { + fetchDataPipeline.abort(); + completionPipeline.abort(); + } + + public static class Builder { + + private int hashCountPerRequest; + private int maxOutstandingRequests; + private LoadLocalDataStep loadLocalDataStep; + private FastWorldDownloadState downloadState; + private MetricsSystem metricsSystem; + private RequestDataStep requestDataStep; + private BlockHeader pivotBlockHeader; + private PersistDataStep persistDataStep; + private CompleteTaskStep completeTaskStep; + + public Builder hashCountPerRequest(final int hashCountPerRequest) { + this.hashCountPerRequest = hashCountPerRequest; + return this; + } + + public Builder maxOutstandingRequests(final int maxOutstandingRequests) { + this.maxOutstandingRequests = maxOutstandingRequests; + return this; + } + + public Builder loadLocalDataStep(final LoadLocalDataStep loadLocalDataStep) { + this.loadLocalDataStep = loadLocalDataStep; + return this; + } + + public Builder requestDataStep(final RequestDataStep requestDataStep) { + this.requestDataStep = requestDataStep; + return this; + } + + public Builder persistDataStep(final PersistDataStep persistDataStep) { + this.persistDataStep = persistDataStep; + return this; + } + + public Builder completeTaskStep(final CompleteTaskStep completeTaskStep) { + this.completeTaskStep = completeTaskStep; + return this; + } + + public Builder downloadState(final FastWorldDownloadState downloadState) { + this.downloadState = downloadState; + return this; + } + + public Builder pivotBlockHeader(final BlockHeader pivotBlockHeader) { + this.pivotBlockHeader = pivotBlockHeader; + return this; + } + + public Builder metricsSystem(final MetricsSystem metricsSystem) { + this.metricsSystem = metricsSystem; + return this; + } + + public FastWorldStateDownloadProcess build() { + checkNotNull(loadLocalDataStep); + checkNotNull(requestDataStep); + checkNotNull(persistDataStep); + checkNotNull(completeTaskStep); + checkNotNull(downloadState); + checkNotNull(pivotBlockHeader); + checkNotNull(metricsSystem); + + // Room for the requests we expect to do in parallel plus some buffer but not unlimited. + final int bufferCapacity = hashCountPerRequest * 2; + final LabelledMetric outputCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.SYNCHRONIZER, + "world_state_pipeline_processed_total", + "Number of entries processed by each world state download pipeline stage", + "step", + "action"); + + final Pipeline> completionPipeline = + PipelineBuilder.>createPipeline( + "requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request") + .andFinishWith( + "requestCompleteTask", + task -> + completeTaskStep.markAsCompleteOrFailed( + pivotBlockHeader, downloadState, task)); + + final Pipe> requestsToComplete = completionPipeline.getInputPipe(); + final Pipeline> fetchDataPipeline = + createPipelineFrom( + "requestDequeued", + new TaskQueueIterator<>(downloadState), + bufferCapacity, + outputCounter, + true, + "world_state_download") + .thenFlatMapInParallel( + "requestLoadLocalData", + task -> loadLocalDataStep.loadLocalData(task, requestsToComplete), + 3, + bufferCapacity) + .inBatches(hashCountPerRequest) + .thenProcessAsync( + "batchDownloadData", + requestTasks -> + requestDataStep.requestData(requestTasks, pivotBlockHeader, downloadState), + maxOutstandingRequests) + .thenProcess( + "batchPersistData", + tasks -> persistDataStep.persist(tasks, pivotBlockHeader, downloadState)) + .andFinishWith( + "batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put)); + + return new FastWorldStateDownloadProcess( + fetchDataPipeline, completionPipeline, requestsToComplete); + } + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloader.java new file mode 100644 index 0000000000..4b38150ece --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloader.java @@ -0,0 +1,177 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.services.tasks.CachingTaskCollection; + +import java.time.Clock; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.IntSupplier; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes; + +public class FastWorldStateDownloader implements WorldStateDownloader { + private static final Logger LOG = LogManager.getLogger(); + + private final long minMillisBeforeStalling; + private final Clock clock; + private final MetricsSystem metricsSystem; + + private final EthContext ethContext; + private final CachingTaskCollection taskCollection; + private final int hashCountPerRequest; + private final int maxOutstandingRequests; + private final int maxNodeRequestsWithoutProgress; + private final WorldStateStorage worldStateStorage; + + private final AtomicReference downloadState = new AtomicReference<>(); + + private Optional maybeCompleteTask = Optional.empty(); + + public FastWorldStateDownloader( + final EthContext ethContext, + final WorldStateStorage worldStateStorage, + final CachingTaskCollection taskCollection, + final int hashCountPerRequest, + final int maxOutstandingRequests, + final int maxNodeRequestsWithoutProgress, + final long minMillisBeforeStalling, + final Clock clock, + final MetricsSystem metricsSystem) { + this.ethContext = ethContext; + this.worldStateStorage = worldStateStorage; + this.taskCollection = taskCollection; + this.hashCountPerRequest = hashCountPerRequest; + this.maxOutstandingRequests = maxOutstandingRequests; + this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress; + this.minMillisBeforeStalling = minMillisBeforeStalling; + this.clock = clock; + this.metricsSystem = metricsSystem; + + metricsSystem.createIntegerGauge( + BesuMetricCategory.SYNCHRONIZER, + "world_state_node_requests_since_last_progress_current", + "Number of world state requests made since the last time new data was returned", + downloadStateValue(FastWorldDownloadState::getRequestsSinceLastProgress)); + + metricsSystem.createIntegerGauge( + BesuMetricCategory.SYNCHRONIZER, + "world_state_inflight_requests_current", + "Number of in progress requests for world state data", + downloadStateValue(FastWorldDownloadState::getOutstandingTaskCount)); + } + + private IntSupplier downloadStateValue(final Function getter) { + return () -> { + final FastWorldDownloadState state = this.downloadState.get(); + return state != null ? getter.apply(state) : 0; + }; + } + + @Override + public CompletableFuture run( + final FastSyncActions fastSyncActions, final FastSyncState fastSyncState) { + synchronized (this) { + final FastWorldDownloadState oldDownloadState = this.downloadState.get(); + if (oldDownloadState != null && oldDownloadState.isDownloading()) { + final CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally( + new IllegalStateException( + "Cannot run an already running " + this.getClass().getSimpleName())); + return failed; + } + + final BlockHeader header = fastSyncState.getPivotBlockHeader().get(); + final Hash stateRoot = header.getStateRoot(); + if (worldStateStorage.isWorldStateAvailable(stateRoot, header.getHash())) { + LOG.info( + "World state already available for block {} ({}). State root {}", + header.getNumber(), + header.getHash(), + stateRoot); + return CompletableFuture.completedFuture(null); + } + LOG.info( + "Begin downloading world state from peers for block {} ({}). State root {}", + header.getNumber(), + header.getHash(), + stateRoot); + + final FastWorldDownloadState newDownloadState = + new FastWorldDownloadState( + taskCollection, maxNodeRequestsWithoutProgress, minMillisBeforeStalling, clock); + this.downloadState.set(newDownloadState); + + if (!newDownloadState.downloadWasResumed()) { + // Only queue the root node if we're starting a new download from scratch + newDownloadState.enqueueRequest( + NodeDataRequest.createAccountDataRequest(stateRoot, Optional.of(Bytes.EMPTY))); + } + + maybeCompleteTask = + Optional.of(new CompleteTaskStep(worldStateStorage, metricsSystem, taskCollection::size)); + final FastWorldStateDownloadProcess downloadProcess = + FastWorldStateDownloadProcess.builder() + .hashCountPerRequest(hashCountPerRequest) + .maxOutstandingRequests(maxOutstandingRequests) + .loadLocalDataStep(new LoadLocalDataStep(worldStateStorage, metricsSystem)) + .requestDataStep(new RequestDataStep(ethContext, metricsSystem)) + .persistDataStep(new PersistDataStep(worldStateStorage)) + .completeTaskStep(maybeCompleteTask.get()) + .downloadState(newDownloadState) + .pivotBlockHeader(header) + .metricsSystem(metricsSystem) + .build(); + + newDownloadState.setWorldStateDownloadProcess(downloadProcess); + + return newDownloadState.startDownload(downloadProcess, ethContext.getScheduler()); + } + } + + @Override + public void cancel() { + synchronized (this) { + final FastWorldDownloadState downloadState = this.downloadState.get(); + if (downloadState != null) { + downloadState.getDownloadFuture().cancel(true); + } + } + } + + @Override + public Optional getPulledStates() { + return maybeCompleteTask.map(CompleteTaskStep::getCompletedRequests); + } + + @Override + public Optional getKnownStates() { + return maybeCompleteTask.map(task -> task.getCompletedRequests() + task.getPendingRequests()); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/LoadLocalDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStep.java similarity index 96% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/LoadLocalDataStep.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStep.java index 98e9715db4..920fd5f487 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/LoadLocalDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStep.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.BesuMetricCategory; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/NodeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/NodeDataRequest.java similarity index 98% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/NodeDataRequest.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/NodeDataRequest.java index ec8c29aa33..af406e5bbe 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/NodeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/NodeDataRequest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import static com.google.common.base.Preconditions.checkNotNull; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java similarity index 89% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/PersistDataStep.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java index 7c7b2a0c86..aefd08fc89 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/PersistDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java @@ -12,9 +12,10 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater; import org.hyperledger.besu.services.tasks.Task; @@ -31,7 +32,7 @@ public class PersistDataStep { public List> persist( final List> tasks, final BlockHeader blockHeader, - final WorldDownloadState downloadState) { + final WorldDownloadState downloadState) { final Updater updater = worldStateStorage.updater(); tasks.stream() .map(Task::getData) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/RequestDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStep.java similarity index 93% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/RequestDataStep.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStep.java index 2f8c52562b..e4e85793be 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/RequestDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStep.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -20,6 +20,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetNodeDataFromPeerTask; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.services.tasks.Task; import org.hyperledger.besu.util.ExceptionUtils; @@ -56,7 +57,7 @@ public class RequestDataStep { public CompletableFuture>> requestData( final List> requestTasks, final BlockHeader blockHeader, - final WorldDownloadState downloadState) { + final WorldDownloadState downloadState) { final List hashes = requestTasks.stream() .map(Task::getData) @@ -80,7 +81,7 @@ public class RequestDataStep { private CompletableFuture> sendRequest( final BlockHeader blockHeader, final List hashes, - final WorldDownloadState downloadState) { + final WorldDownloadState downloadState) { final EthTask> task = getNodeDataTaskFactory.apply(hashes, blockHeader.getNumber()); downloadState.addOutstandingTask(task); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/RequestType.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestType.java similarity index 94% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/RequestType.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestType.java index e0b9005eb4..3629e0a615 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/RequestType.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestType.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; public enum RequestType { ACCOUNT_TRIE_NODE((byte) 1), diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StorageTrieNodeDataRequest.java similarity index 98% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StorageTrieNodeDataRequest.java index a054311bc1..d3cbf51bcb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/StorageTrieNodeDataRequest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/TrieNodeDataRequest.java similarity index 97% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/TrieNodeDataRequest.java index bc99ca1d4f..f286e8ec13 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/TrieNodeDataRequest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.trie.Node; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java index 34c5d1f5de..d7d9cf2a3a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java @@ -34,8 +34,6 @@ import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.services.pipeline.Pipeline; import org.hyperledger.besu.services.pipeline.PipelineBuilder; -import java.util.Optional; - public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory { private final SynchronizerConfiguration syncConfig; @@ -68,8 +66,7 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory final int singleHeaderBufferSize = headerRequestSize * downloaderParallelism; final CheckpointRangeSource checkpointRangeSource = new CheckpointRangeSource( - new CheckpointHeaderFetcher( - syncConfig, protocolSchedule, ethContext, Optional.empty(), metricsSystem), + new CheckpointHeaderFetcher(syncConfig, protocolSchedule, ethContext, metricsSystem), this::shouldContinueDownloadingFromPeer, ethContext.getScheduler(), target.peer(), diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TaskQueueIterator.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TaskQueueIterator.java index fb8753b30f..377fe21d3c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TaskQueueIterator.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/TaskQueueIterator.java @@ -18,11 +18,11 @@ import org.hyperledger.besu.services.tasks.Task; import java.util.Iterator; -class TaskQueueIterator implements Iterator> { +public class TaskQueueIterator implements Iterator> { - private final WorldDownloadState downloadState; + private final WorldDownloadState downloadState; - public TaskQueueIterator(final WorldDownloadState downloadState) { + public TaskQueueIterator(final WorldDownloadState downloadState) { this.downloadState = downloadState; } @@ -32,7 +32,7 @@ class TaskQueueIterator implements Iterator> { } @Override - public Task next() { + public Task next() { return downloadState.dequeueRequestBlocking(); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldDownloadState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldDownloadState.java index 4faa84da35..95fb2e6522 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldDownloadState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldDownloadState.java @@ -18,14 +18,12 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; -import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater; -import org.hyperledger.besu.services.tasks.CachingTaskCollection; import org.hyperledger.besu.services.tasks.Task; +import org.hyperledger.besu.services.tasks.TaskCollection; import org.hyperledger.besu.util.ExceptionUtils; import java.time.Clock; import java.util.Collections; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -36,26 +34,27 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes; -class WorldDownloadState { +public abstract class WorldDownloadState { private static final Logger LOG = LogManager.getLogger(); - private final boolean downloadWasResumed; - private final CachingTaskCollection pendingRequests; - private final int maxRequestsWithoutProgress; + private boolean downloadWasResumed; + protected final TaskCollection pendingRequests; + + protected final int maxRequestsWithoutProgress; private final Clock clock; private final Set> outstandingRequests = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final CompletableFuture internalFuture; - private final CompletableFuture downloadFuture; + protected CompletableFuture internalFuture; + private CompletableFuture downloadFuture; // Volatile so monitoring can access it without having to synchronize. - private volatile int requestsSinceLastProgress = 0; + protected volatile int requestsSinceLastProgress = 0; private final long minMillisBeforeStalling; private volatile long timestampOfLastProgress; - private Bytes rootNodeData; - private WorldStateDownloadProcess worldStateDownloadProcess; + protected Bytes rootNodeData; + protected WorldStateDownloadProcess worldStateDownloadProcess; public WorldDownloadState( - final CachingTaskCollection pendingRequests, + final TaskCollection pendingRequests, final int maxRequestsWithoutProgress, final long minMillisBeforeStalling, final Clock clock) { @@ -78,6 +77,23 @@ class WorldDownloadState { }); } + public void reset() { + this.timestampOfLastProgress = clock.millis(); + this.requestsSinceLastProgress = 0; + this.downloadWasResumed = true; + this.internalFuture = new CompletableFuture<>(); + this.downloadFuture = new CompletableFuture<>(); + this.internalFuture.whenComplete(this::cleanup); + this.downloadFuture.exceptionally( + error -> { + // Propagate cancellation back to our internal future. + if (error instanceof CancellationException) { + this.internalFuture.cancel(true); + } + return null; + }); + } + private synchronized void cleanup(final Void result, final Throwable error) { // Handle cancellations if (internalFuture.isCancelled()) { @@ -122,23 +138,23 @@ class WorldDownloadState { return downloadFuture; } - public synchronized void enqueueRequest(final NodeDataRequest request) { + public synchronized void enqueueRequest(final REQUEST request) { if (!internalFuture.isDone()) { pendingRequests.add(request); notifyAll(); } } - public synchronized void enqueueRequests(final Stream requests) { + public synchronized void enqueueRequests(final Stream requests) { if (!internalFuture.isDone()) { requests.forEach(pendingRequests::add); notifyAll(); } } - public synchronized Task dequeueRequestBlocking() { + public synchronized Task dequeueRequestBlocking() { while (!internalFuture.isDone()) { - final Task task = pendingRequests.remove(); + Task task = pendingRequests.remove(); if (task != null) { return task; } @@ -156,7 +172,8 @@ class WorldDownloadState { this.rootNodeData = rootNodeData; } - public synchronized void requestComplete(final boolean madeProgress) { + public synchronized void requestComplete( + final boolean madeProgress, final long minMillisBeforeStalling) { if (madeProgress) { requestsSinceLastProgress = 0; timestampOfLastProgress = clock.millis(); @@ -169,51 +186,28 @@ class WorldDownloadState { } } + public synchronized void requestComplete(final boolean madeProgress) { + requestComplete(madeProgress, minMillisBeforeStalling); + } + public int getRequestsSinceLastProgress() { return requestsSinceLastProgress; } - private synchronized void markAsStalled(final int maxNodeRequestRetries) { + protected synchronized void markAsStalled(final int maxNodeRequestRetries) { final String message = "Download stalled due to too many failures to retrieve node data (>" + maxNodeRequestRetries + " requests without making progress)"; + final WorldStateDownloaderException e = new StalledDownloadException(message); internalFuture.completeExceptionally(e); } - public synchronized boolean checkCompletion( - final WorldStateStorage worldStateStorage, final BlockHeader header) { - if (!internalFuture.isDone() && pendingRequests.allTasksCompleted()) { - if (rootNodeData == null) { - enqueueRequest( - NodeDataRequest.createAccountDataRequest( - header.getStateRoot(), Optional.of(Bytes.EMPTY))); - return false; - } - final Updater updater = worldStateStorage.updater(); - updater.saveWorldState(header.getHash(), header.getStateRoot(), rootNodeData); - updater.commit(); - internalFuture.complete(null); - // THere are no more inputs to process so make sure we wake up any threads waiting to dequeue - // so they can give up waiting. - notifyAll(); - LOG.info("Finished downloading world state from peers"); - return true; - } else { - return false; - } - } - public synchronized boolean isDownloading() { return !internalFuture.isDone(); } - public synchronized void setWorldStateDownloadProcess( - final WorldStateDownloadProcess worldStateDownloadProcess) { - this.worldStateDownloadProcess = worldStateDownloadProcess; - } - public synchronized void notifyTaskAvailable() { notifyAll(); } @@ -236,4 +230,12 @@ class WorldDownloadState { }); return downloadFuture; } + + public void setWorldStateDownloadProcess( + final WorldStateDownloadProcess worldStateDownloadProcess) { + this.worldStateDownloadProcess = worldStateDownloadProcess; + } + + public abstract boolean checkCompletion( + final WorldStateStorage worldStateStorage, final BlockHeader header); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java index ec06e789b6..9c35ccd24d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java @@ -14,233 +14,13 @@ */ package org.hyperledger.besu.ethereum.eth.sync.worldstate; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.hyperledger.besu.services.pipeline.PipelineBuilder.createPipelineFrom; - -import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; -import org.hyperledger.besu.metrics.BesuMetricCategory; -import org.hyperledger.besu.plugin.services.MetricsSystem; -import org.hyperledger.besu.plugin.services.metrics.Counter; -import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; -import org.hyperledger.besu.services.pipeline.Pipe; -import org.hyperledger.besu.services.pipeline.Pipeline; -import org.hyperledger.besu.services.pipeline.PipelineBuilder; -import org.hyperledger.besu.services.pipeline.WritePipe; -import org.hyperledger.besu.services.tasks.Task; -import org.hyperledger.besu.util.ExceptionUtils; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * Establishes the pipelines required for downloading world state. This involves: - * - *
- *                                ---------------------------
- *                                | Load from Pending Queue |
- *                                ---------------------------
- *                                             |
- *                                             |
- *                                ---------------------------
- *                                |  Is Available Locally?  |
- *                                ---------------------------
- *                             Yes             |             No
- *                    ---------------------------------------------------
- *                    |                                                 |
- *                    |                                                 |
- *                    |                                         ------------------
- *                    |                                         | Create Batches |
- *                    |                                         ------------------
- *                    |                                                 |
- *                    |                                                 |
- *                    |                                       ------------------------
- *                    |                                       |  Request From Peers  |
- *                    |                                       ------------------------
- *                    |                                                 |
- *                    |                                                 |
- *                    |                                         ------------------
- *                    |                                         |  Persist Data  |
- *                    |                                         ------------------
- *                    |                                                 |
- *                    |                                                 |
- *                    ---------------------------------------------------
- *                                             |
- *                                             |
- *                                   ----------------------
- *                                   |  Enqueue Children  |
- *                                   | and Mark Complete  |
- *                                   ----------------------
- * 
- */ -public class WorldStateDownloadProcess { - private static final Logger LOG = LogManager.getLogger(); - private final Pipeline> fetchDataPipeline; - private final Pipeline> completionPipeline; - private final WritePipe> requestsToComplete; - - private WorldStateDownloadProcess( - final Pipeline> fetchDataPipeline, - final Pipeline> completionPipeline, - final WritePipe> requestsToComplete) { - this.fetchDataPipeline = fetchDataPipeline; - this.completionPipeline = completionPipeline; - this.requestsToComplete = requestsToComplete; - } - - public static Builder builder() { - return new Builder(); - } - - public CompletableFuture start(final EthScheduler ethScheduler) { - final CompletableFuture fetchDataFuture = ethScheduler.startPipeline(fetchDataPipeline); - final CompletableFuture completionFuture = ethScheduler.startPipeline(completionPipeline); - - fetchDataFuture.whenComplete( - (result, error) -> { - if (error != null) { - if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) { - LOG.error("Pipeline failed", error); - } - completionPipeline.abort(); - } else { - // No more data to fetch, so propagate the pipe closure onto the completion pipe. - requestsToComplete.close(); - } - }); - - completionFuture.exceptionally( - error -> { - if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) { - LOG.error("Pipeline failed", error); - } - fetchDataPipeline.abort(); - return null; - }); - return completionFuture; - } - - public void abort() { - fetchDataPipeline.abort(); - completionPipeline.abort(); - } - - public static class Builder { - - private int hashCountPerRequest; - private int maxOutstandingRequests; - private LoadLocalDataStep loadLocalDataStep; - private WorldDownloadState downloadState; - private MetricsSystem metricsSystem; - private RequestDataStep requestDataStep; - private BlockHeader pivotBlockHeader; - private PersistDataStep persistDataStep; - private CompleteTaskStep completeTaskStep; - - public Builder hashCountPerRequest(final int hashCountPerRequest) { - this.hashCountPerRequest = hashCountPerRequest; - return this; - } - - public Builder maxOutstandingRequests(final int maxOutstandingRequests) { - this.maxOutstandingRequests = maxOutstandingRequests; - return this; - } - - public Builder loadLocalDataStep(final LoadLocalDataStep loadLocalDataStep) { - this.loadLocalDataStep = loadLocalDataStep; - return this; - } - - public Builder requestDataStep(final RequestDataStep requestDataStep) { - this.requestDataStep = requestDataStep; - return this; - } - - public Builder persistDataStep(final PersistDataStep persistDataStep) { - this.persistDataStep = persistDataStep; - return this; - } - - public Builder completeTaskStep(final CompleteTaskStep completeTaskStep) { - this.completeTaskStep = completeTaskStep; - return this; - } - - public Builder downloadState(final WorldDownloadState downloadState) { - this.downloadState = downloadState; - return this; - } - - public Builder pivotBlockHeader(final BlockHeader pivotBlockHeader) { - this.pivotBlockHeader = pivotBlockHeader; - return this; - } - - public Builder metricsSystem(final MetricsSystem metricsSystem) { - this.metricsSystem = metricsSystem; - return this; - } - - public WorldStateDownloadProcess build() { - checkNotNull(loadLocalDataStep); - checkNotNull(requestDataStep); - checkNotNull(persistDataStep); - checkNotNull(completeTaskStep); - checkNotNull(downloadState); - checkNotNull(pivotBlockHeader); - checkNotNull(metricsSystem); - - // Room for the requests we expect to do in parallel plus some buffer but not unlimited. - final int bufferCapacity = hashCountPerRequest * 2; - final LabelledMetric outputCounter = - metricsSystem.createLabelledCounter( - BesuMetricCategory.SYNCHRONIZER, - "world_state_pipeline_processed_total", - "Number of entries processed by each world state download pipeline stage", - "step", - "action"); - - final Pipeline> completionPipeline = - PipelineBuilder.>createPipeline( - "requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request") - .andFinishWith( - "requestCompleteTask", - task -> - completeTaskStep.markAsCompleteOrFailed( - pivotBlockHeader, downloadState, task)); +public interface WorldStateDownloadProcess { - final Pipe> requestsToComplete = completionPipeline.getInputPipe(); - final Pipeline> fetchDataPipeline = - createPipelineFrom( - "requestDequeued", - new TaskQueueIterator(downloadState), - bufferCapacity, - outputCounter, - true, - "world_state_download") - .thenFlatMapInParallel( - "requestLoadLocalData", - task -> loadLocalDataStep.loadLocalData(task, requestsToComplete), - 3, - bufferCapacity) - .inBatches(hashCountPerRequest) - .thenProcessAsync( - "batchDownloadData", - requestTasks -> - requestDataStep.requestData(requestTasks, pivotBlockHeader, downloadState), - maxOutstandingRequests) - .thenProcess( - "batchPersistData", - tasks -> persistDataStep.persist(tasks, pivotBlockHeader, downloadState)) - .andFinishWith( - "batchDataDownloaded", tasks -> tasks.forEach(requestsToComplete::put)); + CompletableFuture start(final EthScheduler ethScheduler); - return new WorldStateDownloadProcess( - fetchDataPipeline, completionPipeline, requestsToComplete); - } - } + void abort(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadStatus.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadStatus.java index 2e88f46c65..9c38b0e18b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadStatus.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadStatus.java @@ -20,7 +20,7 @@ import java.util.Optional; public interface WorldStateDownloadStatus { - public Optional getPulledStates(); + Optional getPulledStates(); - public Optional getKnownStates(); + Optional getKnownStates(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloader.java index 299bd665f8..1a4f888a61 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -14,157 +14,15 @@ */ package org.hyperledger.besu.ethereum.eth.sync.worldstate; -import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.core.BlockHeader; -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; -import org.hyperledger.besu.metrics.BesuMetricCategory; -import org.hyperledger.besu.plugin.services.MetricsSystem; -import org.hyperledger.besu.services.tasks.CachingTaskCollection; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; -import java.time.Clock; -import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import java.util.function.IntSupplier; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.tuweni.bytes.Bytes; +public interface WorldStateDownloader extends WorldStateDownloadStatus { -public class WorldStateDownloader implements WorldStateDownloadStatus { - private static final Logger LOG = LogManager.getLogger(); + CompletableFuture run( + final FastSyncActions fastSyncActions, final FastSyncState fastSyncState); - private final long minMillisBeforeStalling; - private final Clock clock; - private final MetricsSystem metricsSystem; - - private final EthContext ethContext; - private final CachingTaskCollection taskCollection; - private final int hashCountPerRequest; - private final int maxOutstandingRequests; - private final int maxNodeRequestsWithoutProgress; - private final WorldStateStorage worldStateStorage; - - private final AtomicReference downloadState = new AtomicReference<>(); - - private Optional maybeCompleteTask = Optional.empty(); - - public WorldStateDownloader( - final EthContext ethContext, - final WorldStateStorage worldStateStorage, - final CachingTaskCollection taskCollection, - final int hashCountPerRequest, - final int maxOutstandingRequests, - final int maxNodeRequestsWithoutProgress, - final long minMillisBeforeStalling, - final Clock clock, - final MetricsSystem metricsSystem) { - this.ethContext = ethContext; - this.worldStateStorage = worldStateStorage; - this.taskCollection = taskCollection; - this.hashCountPerRequest = hashCountPerRequest; - this.maxOutstandingRequests = maxOutstandingRequests; - this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress; - this.minMillisBeforeStalling = minMillisBeforeStalling; - this.clock = clock; - this.metricsSystem = metricsSystem; - - metricsSystem.createIntegerGauge( - BesuMetricCategory.SYNCHRONIZER, - "world_state_node_requests_since_last_progress_current", - "Number of world state requests made since the last time new data was returned", - downloadStateValue(WorldDownloadState::getRequestsSinceLastProgress)); - - metricsSystem.createIntegerGauge( - BesuMetricCategory.SYNCHRONIZER, - "world_state_inflight_requests_current", - "Number of in progress requests for world state data", - downloadStateValue(WorldDownloadState::getOutstandingTaskCount)); - } - - private IntSupplier downloadStateValue(final Function getter) { - return () -> { - final WorldDownloadState state = this.downloadState.get(); - return state != null ? getter.apply(state) : 0; - }; - } - - public CompletableFuture run(final BlockHeader header) { - synchronized (this) { - final WorldDownloadState oldDownloadState = this.downloadState.get(); - if (oldDownloadState != null && oldDownloadState.isDownloading()) { - final CompletableFuture failed = new CompletableFuture<>(); - failed.completeExceptionally( - new IllegalStateException( - "Cannot run an already running " + this.getClass().getSimpleName())); - return failed; - } - - final Hash stateRoot = header.getStateRoot(); - if (worldStateStorage.isWorldStateAvailable(stateRoot, header.getHash())) { - LOG.info( - "World state already available for block {} ({}). State root {}", - header.getNumber(), - header.getHash(), - stateRoot); - return CompletableFuture.completedFuture(null); - } - LOG.info( - "Begin downloading world state from peers for block {} ({}). State root {}", - header.getNumber(), - header.getHash(), - stateRoot); - - final WorldDownloadState newDownloadState = - new WorldDownloadState( - taskCollection, maxNodeRequestsWithoutProgress, minMillisBeforeStalling, clock); - this.downloadState.set(newDownloadState); - - if (!newDownloadState.downloadWasResumed()) { - // Only queue the root node if we're starting a new download from scratch - newDownloadState.enqueueRequest( - NodeDataRequest.createAccountDataRequest(stateRoot, Optional.of(Bytes.EMPTY))); - } - - maybeCompleteTask = - Optional.of(new CompleteTaskStep(worldStateStorage, metricsSystem, taskCollection::size)); - final WorldStateDownloadProcess downloadProcess = - WorldStateDownloadProcess.builder() - .hashCountPerRequest(hashCountPerRequest) - .maxOutstandingRequests(maxOutstandingRequests) - .loadLocalDataStep(new LoadLocalDataStep(worldStateStorage, metricsSystem)) - .requestDataStep(new RequestDataStep(ethContext, metricsSystem)) - .persistDataStep(new PersistDataStep(worldStateStorage)) - .completeTaskStep(maybeCompleteTask.get()) - .downloadState(newDownloadState) - .pivotBlockHeader(header) - .metricsSystem(metricsSystem) - .build(); - - newDownloadState.setWorldStateDownloadProcess(downloadProcess); - - return newDownloadState.startDownload(downloadProcess, ethContext.getScheduler()); - } - } - - public void cancel() { - synchronized (this) { - final WorldDownloadState downloadState = this.downloadState.get(); - if (downloadState != null) { - downloadState.getDownloadFuture().cancel(true); - } - } - } - - @Override - public Optional getPulledStates() { - return maybeCompleteTask.map(CompleteTaskStep::getCompletedRequests); - } - - @Override - public Optional getKnownStates() { - return maybeCompleteTask.map(task -> task.getCompletedRequests() + task.getPendingRequests()); - } + void cancel(); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointHeaderFetcherTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointHeaderFetcherTest.java index a14660e39e..e5cad674a4 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointHeaderFetcherTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/CheckpointHeaderFetcherTest.java @@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer.Responder; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; @@ -36,7 +37,6 @@ import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.junit.Before; @@ -88,8 +88,7 @@ public class CheckpointHeaderFetcherTest { @Test public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() { - final CheckpointHeaderFetcher checkpointHeaderFetcher = - createCheckpointHeaderFetcher(Optional.empty()); + final CheckpointHeaderFetcher checkpointHeaderFetcher = createCheckpointHeaderFetcher(); final CompletableFuture> result = checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1)); @@ -104,7 +103,7 @@ public class CheckpointHeaderFetcherTest { @Test public void shouldNotRequestHeadersBeyondTargetWhenTargetIsMultipleOfSegmentSize() { final CheckpointHeaderFetcher checkpointHeaderFetcher = - createCheckpointHeaderFetcher(Optional.of(header(11))); + createCheckpointHeaderFetcher(header(11)); final CompletableFuture> result = checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1)); @@ -117,7 +116,7 @@ public class CheckpointHeaderFetcherTest { @Test public void shouldNotRequestHeadersBeyondTargetWhenTargetIsNotAMultipleOfSegmentSize() { final CheckpointHeaderFetcher checkpointHeaderFetcher = - createCheckpointHeaderFetcher(Optional.of(header(15))); + createCheckpointHeaderFetcher(header(15)); final CompletableFuture> result = checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1)); @@ -130,7 +129,7 @@ public class CheckpointHeaderFetcherTest { @Test public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheCheckpointBeforeTarget() { final CheckpointHeaderFetcher checkpointHeaderFetcher = - createCheckpointHeaderFetcher(Optional.of(header(15))); + createCheckpointHeaderFetcher(header(15)); final CompletableFuture> result = checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(11)); @@ -141,7 +140,7 @@ public class CheckpointHeaderFetcherTest { @Test public void shouldReturnEmptyListWhenLastHeaderIsTarget() { final CheckpointHeaderFetcher checkpointHeaderFetcher = - createCheckpointHeaderFetcher(Optional.of(header(15))); + createCheckpointHeaderFetcher(header(15)); final CompletableFuture> result = checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(15)); @@ -151,7 +150,7 @@ public class CheckpointHeaderFetcherTest { @Test public void shouldReturnEmptyListWhenLastHeaderIsAfterTarget() { final CheckpointHeaderFetcher checkpointHeaderFetcher = - createCheckpointHeaderFetcher(Optional.of(header(15))); + createCheckpointHeaderFetcher(header(15)); final CompletableFuture> result = checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(16)); @@ -161,8 +160,7 @@ public class CheckpointHeaderFetcherTest { @Test public void nextCheckpointShouldEndAtChainHeadWhenNextCheckpointHeaderIsAfterHead() { final long remoteChainHeight = blockchain.getChainHeadBlockNumber(); - final CheckpointHeaderFetcher checkpointHeaderFetcher = - createCheckpointHeaderFetcher(Optional.empty()); + final CheckpointHeaderFetcher checkpointHeaderFetcher = createCheckpointHeaderFetcher(); assertThat( checkpointHeaderFetcher.nextCheckpointEndsAtChainHead( @@ -174,7 +172,7 @@ public class CheckpointHeaderFetcherTest { public void nextCheckpointShouldNotEndAtChainHeadWhenAFinalCheckpointHeaderIsSpecified() { final long remoteChainHeight = blockchain.getChainHeadBlockNumber(); final CheckpointHeaderFetcher checkpointHeaderFetcher = - createCheckpointHeaderFetcher(Optional.of(header(remoteChainHeight))); + createCheckpointHeaderFetcher(header(remoteChainHeight)); assertThat( checkpointHeaderFetcher.nextCheckpointEndsAtChainHead( @@ -185,8 +183,7 @@ public class CheckpointHeaderFetcherTest { @Test public void shouldReturnRemoteChainHeadWhenNextCheckpointHeaderIsTheRemoteHead() { final long remoteChainHeight = blockchain.getChainHeadBlockNumber(); - final CheckpointHeaderFetcher checkpointHeaderFetcher = - createCheckpointHeaderFetcher(Optional.empty()); + final CheckpointHeaderFetcher checkpointHeaderFetcher = createCheckpointHeaderFetcher(); assertThat( checkpointHeaderFetcher.nextCheckpointEndsAtChainHead( @@ -202,8 +199,19 @@ public class CheckpointHeaderFetcherTest { assertThat(result).isCompletedWithValue(singletonList(header(remoteChainHeight))); } - private CheckpointHeaderFetcher createCheckpointHeaderFetcher( - final Optional targetHeader) { + private CheckpointHeaderFetcher createCheckpointHeaderFetcher() { + final EthContext ethContext = ethProtocolManager.ethContext(); + return new CheckpointHeaderFetcher( + SynchronizerConfiguration.builder() + .downloaderChainSegmentSize(SEGMENT_SIZE) + .downloaderHeadersRequestSize(3) + .build(), + protocolSchedule, + ethContext, + metricsSystem); + } + + private CheckpointHeaderFetcher createCheckpointHeaderFetcher(final BlockHeader targetHeader) { final EthContext ethContext = ethProtocolManager.ethContext(); return new CheckpointHeaderFetcher( SynchronizerConfiguration.builder() @@ -212,7 +220,7 @@ public class CheckpointHeaderFetcherTest { .build(), protocolSchedule, ethContext, - targetHeader, + new FastSyncState(targetHeader), metricsSystem); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java index a51d19c977..fb28b45f6d 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java @@ -108,7 +108,7 @@ public class FastSyncChainDownloaderTest { ethContext, syncState, new NoOpMetricsSystem(), - otherBlockchain.getBlockHeader(pivotBlockNumber).get()); + new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get())); } @Test diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java index 0053f79d20..2128de0592 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloaderTest.java @@ -18,6 +18,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowable; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -29,7 +30,8 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements; -import org.hyperledger.besu.ethereum.eth.sync.worldstate.NodeDataRequest; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastWorldStateDownloader; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.NodeDataRequest; import org.hyperledger.besu.ethereum.eth.sync.worldstate.StalledDownloadException; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; @@ -55,7 +57,7 @@ public class FastSyncDownloaderTest { private final WorldStateStorage worldStateStorage = mock(WorldStateStorage.class); - private final WorldStateDownloader worldStateDownloader = mock(WorldStateDownloader.class); + private final WorldStateDownloader worldStateDownloader = mock(FastWorldStateDownloader.class); private final FastSyncStateStorage storage = mock(FastSyncStateStorage.class); @SuppressWarnings("unchecked") @@ -88,7 +90,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState)) .thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(completedFuture(null)); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(completedFuture(null)); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(completedFuture(null)); final CompletableFuture result = downloader.start(); @@ -98,7 +102,8 @@ public class FastSyncDownloaderTest { verify(storage).storeState(downloadPivotBlockHeaderState); verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState); verify(chainDownloader).start(); - verify(worldStateDownloader).run(pivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage); assertThat(result).isCompletedWithValue(downloadPivotBlockHeaderState); } @@ -113,7 +118,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.downloadPivotBlockHeader(fastSyncState)).thenReturn(complete); when(fastSyncActions.createChainDownloader(fastSyncState)).thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(completedFuture(null)); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(completedFuture(null)); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(completedFuture(null)); final FastSyncDownloader resumedDownloader = new FastSyncDownloader( @@ -133,7 +140,8 @@ public class FastSyncDownloaderTest { verify(storage).storeState(fastSyncState); verify(fastSyncActions).createChainDownloader(fastSyncState); verify(chainDownloader).start(); - verify(worldStateDownloader).run(pivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage); assertThat(result).isCompletedWithValue(fastSyncState); } @@ -182,7 +190,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState)) .thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(chainFuture); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(worldStateFuture); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(worldStateFuture); final CompletableFuture result = downloader.start(); @@ -191,7 +201,8 @@ public class FastSyncDownloaderTest { verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState); verify(storage).storeState(downloadPivotBlockHeaderState); verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState); - verify(worldStateDownloader).run(pivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage); assertThat(result).isNotDone(); @@ -218,7 +229,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState)) .thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(chainFuture); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(worldStateFuture); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(worldStateFuture); final CompletableFuture result = downloader.start(); @@ -226,7 +239,8 @@ public class FastSyncDownloaderTest { verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE); verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState); verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState); - verify(worldStateDownloader).run(pivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions); verifyNoMoreInteractions(worldStateDownloader); @@ -287,7 +301,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState)) .thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(chainFuture); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(worldStateFuture); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(worldStateFuture); final CompletableFuture result = downloader.start(); @@ -295,7 +311,8 @@ public class FastSyncDownloaderTest { verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE); verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState); verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState); - verify(worldStateDownloader).run(pivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions); verifyNoMoreInteractions(worldStateDownloader); @@ -320,7 +337,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState)) .thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(chainFuture); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(worldStateFuture); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(worldStateFuture); final CompletableFuture result = downloader.start(); @@ -328,7 +347,8 @@ public class FastSyncDownloaderTest { verify(fastSyncActions).selectPivotBlock(FastSyncState.EMPTY_SYNC_STATE); verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState); verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState); - verify(worldStateDownloader).run(pivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions); verifyNoMoreInteractions(worldStateDownloader); @@ -363,7 +383,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState)) .thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(chainFuture); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(firstWorldStateFuture); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(firstWorldStateFuture); // Second attempt with new pivot block when(fastSyncActions.downloadPivotBlockHeader(secondSelectPivotBlockState)) @@ -372,7 +394,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(secondDownloadPivotBlockHeaderState)) .thenReturn(secondChainDownloader); when(secondChainDownloader.start()).thenReturn(completedFuture(null)); - when(worldStateDownloader.run(secondPivotBlockHeader)).thenReturn(secondWorldStateFuture); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(secondPivotBlockHeader)))) + .thenReturn(secondWorldStateFuture); final CompletableFuture result = downloader.start(); @@ -381,7 +405,8 @@ public class FastSyncDownloaderTest { verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState); verify(storage).storeState(downloadPivotBlockHeaderState); verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState); - verify(worldStateDownloader).run(pivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage); assertThat(result).isNotDone(); @@ -397,7 +422,8 @@ public class FastSyncDownloaderTest { verify(fastSyncActions).downloadPivotBlockHeader(secondSelectPivotBlockState); verify(storage).storeState(secondDownloadPivotBlockHeaderState); verify(fastSyncActions).createChainDownloader(secondDownloadPivotBlockHeaderState); - verify(worldStateDownloader).run(secondPivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(secondPivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage); secondWorldStateFuture.complete(null); @@ -430,7 +456,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState)) .thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(chainFuture); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(firstWorldStateFuture); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(firstWorldStateFuture); when(fastSyncActions.scheduleFutureTask(any(), any())) .thenAnswer(invocation -> ((Supplier) invocation.getArgument(0)).get()); @@ -441,7 +469,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(secondDownloadPivotBlockHeaderState)) .thenReturn(secondChainDownloader); when(secondChainDownloader.start()).thenReturn(completedFuture(null)); - when(worldStateDownloader.run(secondPivotBlockHeader)).thenReturn(secondWorldStateFuture); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(secondPivotBlockHeader)))) + .thenReturn(secondWorldStateFuture); final CompletableFuture result = downloader.start(); @@ -450,7 +480,8 @@ public class FastSyncDownloaderTest { verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState); verify(storage).storeState(downloadPivotBlockHeaderState); verify(fastSyncActions).createChainDownloader(downloadPivotBlockHeaderState); - verify(worldStateDownloader).run(pivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage); assertThat(result).isNotDone(); @@ -468,7 +499,8 @@ public class FastSyncDownloaderTest { verify(fastSyncActions).downloadPivotBlockHeader(secondSelectPivotBlockState); verify(storage).storeState(secondDownloadPivotBlockHeaderState); verify(fastSyncActions).createChainDownloader(secondDownloadPivotBlockHeaderState); - verify(worldStateDownloader).run(secondPivotBlockHeader); + verify(worldStateDownloader) + .run(any(FastSyncActions.class), eq(new FastSyncState(secondPivotBlockHeader))); verifyNoMoreInteractions(fastSyncActions, worldStateDownloader, storage); secondWorldStateFuture.complete(null); @@ -500,7 +532,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState)) .thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(new CompletableFuture<>()); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(new CompletableFuture<>()); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(new CompletableFuture<>()); downloader.start(); Assertions.assertThat(downloader.calculateTrailingPeerRequirements()) @@ -520,7 +554,9 @@ public class FastSyncDownloaderTest { when(fastSyncActions.createChainDownloader(downloadPivotBlockHeaderState)) .thenReturn(chainDownloader); when(chainDownloader.start()).thenReturn(completedFuture(null)); - when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(completedFuture(null)); + when(worldStateDownloader.run( + any(FastSyncActions.class), eq(new FastSyncState(pivotBlockHeader)))) + .thenReturn(completedFuture(null)); final CompletableFuture result = downloader.start(); assertThat(result).isDone(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CompleteTaskStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStepTest.java similarity index 94% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CompleteTaskStepTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStepTest.java index 99a0588a61..6e3b22926a 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/CompleteTaskStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/CompleteTaskStepTest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -22,6 +22,7 @@ import static org.mockito.Mockito.verify; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; @@ -36,7 +37,7 @@ public class CompleteTaskStepTest { private static final Hash ROOT_HASH = Hash.hash(Bytes.of(1, 2, 3)); private final WorldStateStorage worldStateStorage = mock(WorldStateStorage.class); - private final WorldDownloadState downloadState = mock(WorldDownloadState.class); + private final FastWorldDownloadState downloadState = mock(FastWorldDownloadState.class); private final BlockHeader blockHeader = new BlockHeaderTestFixture().stateRoot(ROOT_HASH).buildHeader(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldDownloadStateTest.java similarity index 94% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldDownloadStateTest.java index ee6bbc8be2..016e7b01a3 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldDownloadStateTest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -25,6 +25,8 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.StalledDownloadException; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadProcess; import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; @@ -47,7 +49,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @RunWith(Parameterized.class) -public class WorldDownloadStateTest { +public class FastWorldDownloadStateTest { private static final Bytes ROOT_NODE_DATA = Bytes.of(1, 2, 3, 4); private static final Hash ROOT_NODE_HASH = Hash.hash(ROOT_NODE_DATA); @@ -64,8 +66,8 @@ public class WorldDownloadStateTest { mock(WorldStateDownloadProcess.class); private final TestClock clock = new TestClock(); - private final WorldDownloadState downloadState = - new WorldDownloadState( + private final FastWorldDownloadState downloadState = + new FastWorldDownloadState( pendingRequests, MAX_REQUESTS_WITHOUT_PROGRESS, MIN_MILLIS_BEFORE_STALLING, clock); private final CompletableFuture future = downloadState.getDownloadFuture(); @@ -77,7 +79,7 @@ public class WorldDownloadStateTest { private final DataStorageFormat storageFormat; - public WorldDownloadStateTest(final DataStorageFormat storageFormat) { + public FastWorldDownloadStateTest(final DataStorageFormat storageFormat) { this.storageFormat = storageFormat; } @@ -218,7 +220,7 @@ public class WorldDownloadStateTest { assertThat(pendingRequests.isEmpty()).isTrue(); } - private void assertWorldStateStalled(final WorldDownloadState state) { + private void assertWorldStateStalled(final FastWorldDownloadState state) { final CompletableFuture future = state.getDownloadFuture(); assertThat(future).isCompletedExceptionally(); assertThatThrownBy(future::get) diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloaderTest.java similarity index 95% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloaderTest.java index f967117625..87448b2a8d 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastWorldStateDownloaderTest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -43,6 +43,9 @@ import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.messages.EthPV63; import org.hyperledger.besu.ethereum.eth.messages.GetNodeDataMessage; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.StalledDownloadException; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.rlp.RLP; @@ -98,7 +101,7 @@ import org.junit.Test; import org.junit.rules.Timeout; @Ignore("PIE-1434 - Ignored while working to make test more reliable") -public class WorldStateDownloaderTest { +public class FastWorldStateDownloaderTest { @Rule public Timeout globalTimeout = Timeout.seconds(60); // 1 minute max per test @@ -109,7 +112,7 @@ public class WorldStateDownloaderTest { Executors.newCachedThreadPool( new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat(WorldStateDownloaderTest.class.getSimpleName() + "-persistence-%d") + .setNameFormat(FastWorldStateDownloaderTest.class.getSimpleName() + "-persistence-%d") .build()); final EthProtocolManager ethProtocolManager = @@ -154,11 +157,14 @@ public class WorldStateDownloaderTest { @Test public void downloadEmptyWorldState() { + final BlockHeader header = dataGen .block(BlockOptions.create().setStateRoot(EMPTY_TRIE_ROOT).setBlockNumber(10)) .getHeader(); + final FastSyncState fastSyncState = new FastSyncState(header); + // Create some peers final List peers = Stream.generate( @@ -173,7 +179,7 @@ public class WorldStateDownloaderTest { final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); - final CompletableFuture future = downloader.run(header); + final CompletableFuture future = downloader.run(null, fastSyncState); assertThat(future).isDone(); // Peers should not have been queried @@ -210,7 +216,9 @@ public class WorldStateDownloaderTest { worldStateArchive.getWorldStateStorage(), taskCollection); - final CompletableFuture future = downloader.run(header); + final FastSyncState fastSyncState = new FastSyncState(header); + + final CompletableFuture future = downloader.run(null, fastSyncState); assertThat(future).isDone(); // Peers should not have been queried because we already had the state @@ -254,7 +262,9 @@ public class WorldStateDownloaderTest { final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); - final CompletableFuture result = downloader.run(header); + final FastSyncState fastSyncState = new FastSyncState(header); + + final CompletableFuture result = downloader.run(null, fastSyncState); serviceExecutor.runPendingFuturesInSeparateThreads(persistenceThread); @@ -312,7 +322,9 @@ public class WorldStateDownloaderTest { final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); - final CompletableFuture result = downloader.run(header); + final FastSyncState fastSyncState = new FastSyncState(header); + + final CompletableFuture result = downloader.run(null, fastSyncState); // Respond to node data requests final List sentMessages = new ArrayList<>(); @@ -385,7 +397,9 @@ public class WorldStateDownloaderTest { final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); - final CompletableFuture result = downloader.run(header); + final FastSyncState fastSyncState = new FastSyncState(header); + + final CompletableFuture result = downloader.run(null, fastSyncState); // Send a few responses final RespondingEthPeer.Responder responder = @@ -478,7 +492,9 @@ public class WorldStateDownloaderTest { final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); - final CompletableFuture result = downloader.run(header); + final FastSyncState fastSyncState = new FastSyncState(header); + + final CompletableFuture result = downloader.run(null, fastSyncState); // Respond to node data requests final List sentMessages = new ArrayList<>(); @@ -574,7 +590,9 @@ public class WorldStateDownloaderTest { final WorldStateDownloader downloader = createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); - final CompletableFuture result = downloader.run(header); + final FastSyncState fastSyncState = new FastSyncState(header); + + final CompletableFuture result = downloader.run(null, fastSyncState); // Respond to node data requests final List sentMessages = new ArrayList<>(); @@ -640,11 +658,17 @@ public class WorldStateDownloaderTest { EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()); // Start downloader (with a state root that's not available anywhere - final CompletableFuture result = + + final CompletableFuture result = downloader.run( - new BlockHeaderTestFixture().stateRoot(Hash.hash(Bytes.of(1, 2, 3, 4))).buildHeader()); + null, + new FastSyncState( + new BlockHeaderTestFixture() + .stateRoot(Hash.hash(Bytes.of(1, 2, 3, 4))) + .buildHeader())); + // A second run should return an error without impacting the first result - final CompletableFuture secondResult = downloader.run(header); + final CompletableFuture secondResult = downloader.run(null, new FastSyncState(header)); assertThat(secondResult).isCompletedExceptionally(); assertThat(result).isNotCompletedExceptionally(); @@ -655,7 +679,9 @@ public class WorldStateDownloaderTest { assertThatThrownBy(result::get).hasCauseInstanceOf(StalledDownloadException.class); // Finally, check that when we restart the download with state that is available it works - final CompletableFuture retryResult = downloader.run(header); + + final CompletableFuture retryResult = downloader.run(null, new FastSyncState(header)); + final RespondingEthPeer.Responder responder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); peer.respondWhileOtherThreadsWork(responder, () -> !retryResult.isDone()); @@ -710,7 +736,7 @@ public class WorldStateDownloaderTest { final RespondingEthPeer.Responder responder = RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); - CompletableFuture result = downloader.run(header); + CompletableFuture result = downloader.run(null, new FastSyncState(header)); peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone()); assertThat(localStorage.isWorldStateAvailable(stateRoot, header.getHash())).isTrue(); @@ -855,9 +881,9 @@ public class WorldStateDownloaderTest { .collect(Collectors.toList()); // Start downloader - final CompletableFuture result = downloader.run(header); + final CompletableFuture result = downloader.run(null, new FastSyncState(header)); // A second run should return an error without impacting the first result - final CompletableFuture secondResult = downloader.run(header); + final CompletableFuture secondResult = downloader.run(null, new FastSyncState(header)); assertThat(secondResult).isCompletedExceptionally(); assertThat(result).isNotCompletedExceptionally(); @@ -969,7 +995,7 @@ public class WorldStateDownloaderTest { final EthContext context, final WorldStateStorage storage, final CachingTaskCollection taskCollection) { - return new WorldStateDownloader( + return new FastWorldStateDownloader( context, storage, taskCollection, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStepTest.java similarity index 95% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStepTest.java index 39f099c2b2..b2aded8ecf 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/LoadLocalDataStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/LoadLocalDataStepTest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import static org.assertj.core.api.Assertions.assertThat; import static org.hyperledger.besu.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER; @@ -21,6 +21,7 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.services.pipeline.Pipe; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/NodeDataRequestTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/NodeDataRequestTest.java similarity index 98% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/NodeDataRequestTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/NodeDataRequestTest.java index e8bcd7cfa0..6019556229 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/NodeDataRequestTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/NodeDataRequestTest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import static org.assertj.core.api.Assertions.assertThat; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/PersistDataStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStepTest.java similarity index 95% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/PersistDataStepTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStepTest.java index 97e11d6098..18e1ee3f33 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/PersistDataStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStepTest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -24,6 +24,7 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask; import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.services.tasks.Task; @@ -38,7 +39,7 @@ public class PersistDataStepTest { private final WorldStateStorage worldStateStorage = new InMemoryKeyValueStorageProvider().createWorldStateStorage(DataStorageFormat.FOREST); - private final WorldDownloadState downloadState = mock(WorldDownloadState.class); + private final FastWorldDownloadState downloadState = mock(FastWorldDownloadState.class); private final Bytes rootNodeData = Bytes.of(1, 1, 1, 1); private final BlockHeader blockHeader = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/RequestDataStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStepTest.java similarity index 95% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/RequestDataStepTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStepTest.java index 9ca953a755..84b1c34004 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/RequestDataStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStepTest.java @@ -12,7 +12,7 @@ * * SPDX-License-Identifier: Apache-2.0 */ -package org.hyperledger.besu.ethereum.eth.sync.worldstate; +package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import static java.util.Arrays.asList; import static java.util.Collections.emptyMap; @@ -28,6 +28,7 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; +import org.hyperledger.besu.ethereum.eth.sync.worldstate.StubTask; import org.hyperledger.besu.services.tasks.Task; import java.util.List; @@ -55,7 +56,7 @@ public class RequestDataStepTest { @SuppressWarnings("unchecked") private final EthTask> ethTask = mock(EthTask.class); - private final WorldDownloadState downloadState = mock(WorldDownloadState.class); + private final FastWorldDownloadState downloadState = mock(FastWorldDownloadState.class); private final BlockHeader blockHeader = new BlockHeaderTestFixture().number(BLOCK_NUMBER).buildHeader(); private final CompletableFuture> getDataFuture = new CompletableFuture<>(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StubTask.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StubTask.java index b4c216b722..3965969bc0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StubTask.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/StubTask.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.sync.worldstate; import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.NodeDataRequest; import org.hyperledger.besu.services.tasks.Task; import java.util.Optional;