Add checkpoint sync (#3849)

Add a new way to synchronize which is X_CHECKPOINT. This mode is experimental so use it at your own risk. This mode allows you to do like a snapsync but starting from a specific checkpoint instead of starting from the genesis.

This checkpoint will be in the genesis configuration of each network. To add the checkpoint mechanism in a network you just have to add the checkpoint section in the genesis.

Currently there is a checkpoint for ropten, goerli and mainnet.

Mainnet on i3.2xlarge <6 hours
Goerli on i3.2xlarge <1 hours

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
pull/3930/head
matkt 3 years ago committed by GitHub
parent 02d389a19e
commit 6f85e1f83b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 3
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  3. 40
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  4. 3
      besu/src/test/java/org/hyperledger/besu/controller/BesuControllerBuilderTest.java
  5. 3
      besu/src/test/java/org/hyperledger/besu/controller/MergeBesuControllerBuilderTest.java
  6. 3
      besu/src/test/java/org/hyperledger/besu/controller/QbftBesuControllerBuilderTest.java
  7. 63
      config/src/main/java/org/hyperledger/besu/config/CheckpointConfigOptions.java
  8. 2
      config/src/main/java/org/hyperledger/besu/config/GenesisConfigOptions.java
  9. 8
      config/src/main/java/org/hyperledger/besu/config/JsonGenesisConfigOptions.java
  10. 5
      config/src/main/java/org/hyperledger/besu/config/StubGenesisConfigOptions.java
  11. 6
      config/src/main/resources/goerli.json
  12. 5
      config/src/main/resources/mainnet.json
  13. 5
      config/src/main/resources/ropsten.json
  14. 5
      consensus/clique/src/main/java/org/hyperledger/besu/consensus/clique/CliqueBlockHeaderFunctions.java
  15. 5
      consensus/common/src/main/java/org/hyperledger/besu/consensus/common/bft/BftBlockHeaderFunctions.java
  16. 5
      consensus/ibftlegacy/src/main/java/org/hyperledger/besu/consensus/ibftlegacy/LegacyIbftBlockHeaderFunctions.java
  17. 17
      ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/cache/TransactionLogBloomCacher.java
  18. 31
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java
  19. 10
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/MutableBlockchain.java
  20. 12
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/BlockHeaderFunctions.java
  21. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java
  22. 51
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/peervalidation/CheckpointBlocksPeerValidator.java
  23. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  24. 39
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadHeadersStep.java
  25. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadPipelineFactory.java
  26. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloader.java
  27. 10
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SyncMode.java
  28. 57
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointBlockImportStep.java
  29. 81
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java
  30. 149
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java
  31. 75
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSource.java
  32. 62
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java
  33. 59
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java
  34. 101
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java
  35. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastImportBlocksStep.java
  36. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java
  37. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java
  38. 66
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  39. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloader.java
  40. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncTargetManager.java
  41. 30
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/checkpoint/Checkpoint.java
  42. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java
  43. 30
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java
  44. 29
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/RangeHeaders.java
  45. 47
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/RangeHeadersFetcher.java
  46. 22
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/RangeHeadersValidationStep.java
  47. 10
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/SyncTargetRange.java
  48. 97
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/range/SyncTargetRangeSource.java
  49. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java
  50. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncDownloader.java
  51. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapsyncMetricsManager.java
  52. 14
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java
  53. 28
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/DownloadHeadersStepTest.java
  54. 28
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/PipelineChainDownloaderTest.java
  55. 64
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/RangeHeadersFetcherTest.java
  56. 42
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/RangeHeadersValidationStepTest.java
  57. 97
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/SyncTargetRangeSourceTest.java
  58. 82
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointBlockImportStepTest.java
  59. 81
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSourceTest.java
  60. 197
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java
  61. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java

@ -3,6 +3,7 @@
## 22.4.3
### Additions and Improvements
- \[EXPERIMENTAL\] Add checkpoint sync `--sync-mode="X_CHECKPOINT"` [#3849](https://github.com/hyperledger/besu/pull/3849)
### Bug Fixes

@ -202,7 +202,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -1881,7 +1880,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
logger,
commandLine,
"--sync-mode",
!EnumSet.of(SyncMode.FAST, SyncMode.X_SNAP).contains(syncMode),
SyncMode.isFullSync(syncMode),
singletonList("--fast-sync-min-peers"));
if (!securityModuleName.equals(DEFAULT_SECURITY_MODULE)

@ -16,6 +16,7 @@ package org.hyperledger.besu.controller;
import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.config.CheckpointConfigOptions;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.consensus.merge.FinalizedBlockHashSupplier;
@ -36,6 +37,7 @@ import org.hyperledger.besu.ethereum.chain.BlockchainStorage;
import org.hyperledger.besu.ethereum.chain.DefaultBlockchain;
import org.hyperledger.besu.ethereum.chain.GenesisState;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Synchronizer;
@ -48,6 +50,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.CheckpointBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.DaoForkPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
@ -59,6 +62,8 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromFinalizedBlock;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromPeers;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.TransitionPivotSelector;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.ImmutableCheckpoint;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
@ -87,7 +92,6 @@ import java.nio.file.Path;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -345,10 +349,27 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
syncConfig.getTransactionsParallelism(),
syncConfig.getComputationParallelism(),
metricsSystem);
final GenesisConfigOptions configOptions =
genesisConfig.getConfigOptions(genesisConfigOverrides);
Optional<Checkpoint> checkpoint = Optional.empty();
if (configOptions.getCheckpointOptions().isValid()) {
checkpoint =
Optional.of(
ImmutableCheckpoint.builder()
.blockHash(
Hash.fromHexString(configOptions.getCheckpointOptions().getHash().get()))
.blockNumber(configOptions.getCheckpointOptions().getNumber().getAsLong())
.totalDifficulty(
Difficulty.fromHexString(
configOptions.getCheckpointOptions().getTotalDifficulty().get()))
.build());
}
final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final boolean fastSyncEnabled =
EnumSet.of(SyncMode.FAST, SyncMode.X_SNAP).contains(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fastSyncEnabled);
final boolean fastSyncEnabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fastSyncEnabled, checkpoint);
syncState.subscribeTTDReached(new PandaPrinter());
final TransactionPool transactionPool =
@ -603,6 +624,17 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
protocolSchedule, metricsSystem, requiredBlock.getKey(), requiredBlock.getValue()));
}
final CheckpointConfigOptions checkpointConfigOptions =
genesisConfig.getConfigOptions(genesisConfigOverrides).getCheckpointOptions();
if (SyncMode.X_CHECKPOINT.equals(syncConfig.getSyncMode())
&& checkpointConfigOptions.isValid()) {
validators.add(
new CheckpointBlocksPeerValidator(
protocolSchedule,
metricsSystem,
checkpointConfigOptions.getNumber().orElseThrow(),
checkpointConfigOptions.getHash().map(Hash::fromHexString).orElseThrow()));
}
return validators;
}

@ -21,6 +21,7 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.CheckpointConfigOptions;
import org.hyperledger.besu.config.EthashConfigOptions;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.config.GenesisConfigOptions;
@ -69,6 +70,7 @@ public class BesuControllerBuilderTest {
@Mock GenesisConfigFile genesisConfigFile;
@Mock GenesisConfigOptions genesisConfigOptions;
@Mock EthashConfigOptions ethashConfigOptions;
@Mock CheckpointConfigOptions checkpointConfigOptions;
@Mock Keccak256ConfigOptions keccak256ConfigOptions;
@Mock SynchronizerConfiguration synchronizerConfiguration;
@Mock EthProtocolConfiguration ethProtocolConfiguration;
@ -97,6 +99,7 @@ public class BesuControllerBuilderTest {
when(genesisConfigFile.getConfigOptions(any())).thenReturn(genesisConfigOptions);
when(genesisConfigOptions.getThanosBlockNumber()).thenReturn(OptionalLong.empty());
when(genesisConfigOptions.getEthashConfigOptions()).thenReturn(ethashConfigOptions);
when(genesisConfigOptions.getCheckpointOptions()).thenReturn(checkpointConfigOptions);
when(ethashConfigOptions.getFixedDifficulty()).thenReturn(OptionalLong.empty());
when(genesisConfigOptions.getKeccak256ConfigOptions()).thenReturn(keccak256ConfigOptions);
when(keccak256ConfigOptions.getFixedDifficulty()).thenReturn(OptionalLong.empty());

@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.CheckpointConfigOptions;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.consensus.merge.MergeContext;
@ -83,6 +84,7 @@ public class MergeBesuControllerBuilderTest {
@Mock GenesisConfigOptions genesisConfigOptions;
@Mock SynchronizerConfiguration synchronizerConfiguration;
@Mock EthProtocolConfiguration ethProtocolConfiguration;
@Mock CheckpointConfigOptions checkpointConfigOptions;
@Mock MiningParameters miningParameters;
@Mock ObservableMetricsSystem observableMetricsSystem;
@Mock PrivacyParameters privacyParameters;
@ -108,6 +110,7 @@ public class MergeBesuControllerBuilderTest {
when(genesisConfigFile.getMixHash()).thenReturn(Hash.ZERO.toHexString());
when(genesisConfigFile.getNonce()).thenReturn(Long.toHexString(1));
when(genesisConfigFile.getConfigOptions(any())).thenReturn(genesisConfigOptions);
when(genesisConfigOptions.getCheckpointOptions()).thenReturn(checkpointConfigOptions);
when(genesisConfigOptions.getTerminalTotalDifficulty())
.thenReturn((Optional.of(UInt256.valueOf(100L))));
when(genesisConfigOptions.getThanosBlockNumber()).thenReturn(OptionalLong.empty());

@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.config.CheckpointConfigOptions;
import org.hyperledger.besu.config.GenesisConfigFile;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.config.JsonQbftConfigOptions;
@ -75,6 +76,7 @@ public class QbftBesuControllerBuilderTest {
@Mock private GenesisConfigOptions genesisConfigOptions;
@Mock private SynchronizerConfiguration synchronizerConfiguration;
@Mock private EthProtocolConfiguration ethProtocolConfiguration;
@Mock CheckpointConfigOptions checkpointConfigOptions;
@Mock private MiningParameters miningParameters;
@Mock private ObservableMetricsSystem observableMetricsSystem;
@Mock private PrivacyParameters privacyParameters;
@ -98,6 +100,7 @@ public class QbftBesuControllerBuilderTest {
when(genesisConfigFile.getMixHash()).thenReturn(Hash.ZERO.toHexString());
when(genesisConfigFile.getNonce()).thenReturn(Long.toHexString(1));
when(genesisConfigFile.getConfigOptions(any())).thenReturn(genesisConfigOptions);
when(genesisConfigOptions.getCheckpointOptions()).thenReturn(checkpointConfigOptions);
when(storageProvider.createBlockchainStorage(any()))
.thenReturn(
new KeyValueStoragePrefixedKeyBlockchainStorage(

@ -0,0 +1,63 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.config;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
public class CheckpointConfigOptions {
public static final CheckpointConfigOptions DEFAULT =
new CheckpointConfigOptions(JsonUtil.createEmptyObjectNode());
private final ObjectNode checkpointConfigRoot;
CheckpointConfigOptions(final ObjectNode checkpointConfigRoot) {
this.checkpointConfigRoot = checkpointConfigRoot;
}
public Optional<String> getTotalDifficulty() {
return JsonUtil.getString(checkpointConfigRoot, "totaldifficulty");
}
public OptionalLong getNumber() {
return JsonUtil.getLong(checkpointConfigRoot, "number");
}
public Optional<String> getHash() {
return JsonUtil.getString(checkpointConfigRoot, "hash");
}
public boolean isValid() {
return getTotalDifficulty().isPresent() && getNumber().isPresent() && getHash().isPresent();
}
Map<String, Object> asMap() {
final ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
getTotalDifficulty().ifPresent(l -> builder.put("totaldifficulty", l));
getNumber().ifPresent(l -> builder.put("number", l));
getHash().ifPresent(l -> builder.put("hash", l));
return builder.build();
}
@Override
public String toString() {
return "CheckpointConfigOptions{" + "checkpointConfigRoot=" + checkpointConfigRoot + '}';
}
}

@ -48,6 +48,8 @@ public interface GenesisConfigOptions {
IbftLegacyConfigOptions getIbftLegacyConfigOptions();
CheckpointConfigOptions getCheckpointOptions();
CliqueConfigOptions getCliqueConfigOptions();
BftConfigOptions getBftConfigOptions();

@ -47,6 +47,7 @@ public class JsonGenesisConfigOptions implements GenesisConfigOptions {
private static final String EC_CURVE_CONFIG_KEY = "eccurve";
private static final String TRANSITIONS_CONFIG_KEY = "transitions";
private static final String DISCOVERY_CONFIG_KEY = "discovery";
private static final String CHECKPOINT_CONFIG_KEY = "checkpoint";
private final ObjectNode configRoot;
private final Map<String, String> configOverrides = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@ -168,6 +169,13 @@ public class JsonGenesisConfigOptions implements GenesisConfigOptions {
.orElse(DiscoveryOptions.DEFAULT);
}
@Override
public CheckpointConfigOptions getCheckpointOptions() {
return JsonUtil.getObjectNode(configRoot, CHECKPOINT_CONFIG_KEY)
.map(CheckpointConfigOptions::new)
.orElse(CheckpointConfigOptions.DEFAULT);
}
@Override
public CliqueConfigOptions getCliqueConfigOptions() {
return JsonUtil.getObjectNode(configRoot, CLIQUE_CONFIG_KEY)

@ -110,6 +110,11 @@ public class StubGenesisConfigOptions implements GenesisConfigOptions {
return IbftLegacyConfigOptions.DEFAULT;
}
@Override
public CheckpointConfigOptions getCheckpointOptions() {
return CheckpointConfigOptions.DEFAULT;
}
@Override
public CliqueConfigOptions getCliqueConfigOptions() {
return CliqueConfigOptions.DEFAULT;

@ -20,6 +20,12 @@
"enode://d4f764a48ec2a8ecf883735776fdefe0a3949eb0ca476bd7bc8d0954a9defe8fea15ae5da7d40b5d2d59ce9524a99daedadf6da6283fca492cc80b53689fb3b3@46.4.99.122:32109",
"enode://d2b720352e8216c9efc470091aa91ddafc53e222b32780f505c817ceef69e01d5b0b0797b69db254c586f493872352f5a022b4d8479a00fc92ec55f9ad46a27e@88.99.70.182:30303"
]
},
"checkpoint": {
"hash": "0x2ae30061bdfc7f6dad5b07361dce436502eb0fde68645de12bae4929be619188",
"number": 6720000,
"totalDifficulty": "0x967F81",
"_comment": "must be the beginning of an epoch"
}
},
"coinbase":"0x0000000000000000000000000000000000000000",

@ -32,6 +32,11 @@
"enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303",
"enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303"
]
},
"checkpoint": {
"hash": "0x844d581cb00058d19f0584fb582fa2de208876ee56bbae27446a679baf4633f4",
"number": 14700000,
"totalDifficulty": "0xA2539264C62BF98CFC6"
}
},
"nonce": "0x42",

@ -21,6 +21,11 @@
"enode://30b7ab30a01c124a6cceca36863ece12c4f5fa68e3ba9b0b51407ccc002eeed3b3102d20a88f1c1d3c3154e2449317b8ef95090e77b312d5cc39354f86d5d606@52.176.7.10:30303",
"enode://865a63255b3bb68023b6bffd5095118fcc13e79dcf014fe4e47e065c350c7cc72af2e53eff895f11ba1bbb6a2b33271c1116ee870f266618eadfc2e78aa7349c@52.176.100.77:30303"
]
},
"checkpoint": {
"hash": "0x43de216f876d897e59b9757dd24186e5b53be28bc425ca6a966335b48daaa50c",
"number": 12200000,
"totalDifficulty": "0x928D05243C1CF4"
}
},
"nonce": "0x0000000000000042",

@ -30,4 +30,9 @@ public class CliqueBlockHeaderFunctions implements BlockHeaderFunctions {
public CliqueExtraData parseExtraData(final BlockHeader header) {
return CliqueExtraData.decodeRaw(header);
}
@Override
public int getCheckPointWindowSize(final BlockHeader blockHeader) {
return CliqueExtraData.decode(blockHeader).getValidators().size();
}
}

@ -52,4 +52,9 @@ public class BftBlockHeaderFunctions implements BlockHeaderFunctions {
public BftExtraData parseExtraData(final BlockHeader header) {
return bftExtraDataCodec.decodeRaw(header.getExtraData());
}
@Override
public int getCheckPointWindowSize(final BlockHeader header) {
return bftExtraDataCodec.decodeRaw(header.getExtraData()).getValidators().size();
}
}

@ -29,4 +29,9 @@ public class LegacyIbftBlockHeaderFunctions implements BlockHeaderFunctions {
public IbftExtraData parseExtraData(final BlockHeader header) {
return IbftExtraData.decodeRaw(header.getExtraData());
}
@Override
public int getCheckPointWindowSize(final BlockHeader header) {
return IbftExtraData.decodeRaw(header.getExtraData()).getValidators().size();
}
}

@ -173,11 +173,11 @@ public class TransactionLogBloomCacher {
number++) {
final Optional<BlockHeader> ancestorBlockHeader = blockchain.getBlockHeader(number);
if (ancestorBlockHeader.isPresent()) {
cacheSingleBlock(ancestorBlockHeader.get(), cacheFile);
cacheSingleBlock(ancestorBlockHeader.get(), cacheFile, true);
}
}
}
cacheSingleBlock(blockHeader, cacheFile);
cacheSingleBlock(blockHeader, cacheFile, true);
} catch (final InvalidCacheException e) {
populateLatestSegment(blockNumber);
}
@ -195,16 +195,15 @@ public class TransactionLogBloomCacher {
}
}
private void cacheSingleBlock(final BlockHeader blockHeader, final File cacheFile)
private void cacheSingleBlock(
final BlockHeader blockHeader, final File cacheFile, final boolean isCheckSizeNeeded)
throws IOException, InvalidCacheException {
try (final RandomAccessFile writer = new RandomAccessFile(cacheFile, "rw")) {
final long nbCachedBlocks = cacheFile.length() / BLOOM_BITS_LENGTH;
final long blockIndex = (blockHeader.getNumber() % BLOCKS_PER_BLOOM_CACHE);
final long offset = blockIndex * BLOOM_BITS_LENGTH;
// detect missing block
if (blockIndex > nbCachedBlocks) {
if (isCheckSizeNeeded && blockIndex > nbCachedBlocks) {
throw new InvalidCacheException();
}
writer.seek(offset);
@ -226,9 +225,11 @@ public class TransactionLogBloomCacher {
long blockNumber =
Math.min((segmentNumber + 1) * BLOCKS_PER_BLOOM_CACHE - 1, eventBlockNumber);
fillCacheFile(segmentNumber * BLOCKS_PER_BLOOM_CACHE, blockNumber, currentFile);
while (blockNumber <= eventBlockNumber && (blockNumber % BLOCKS_PER_BLOOM_CACHE != 0)) {
cacheSingleBlock(blockchain.getBlockHeader(blockNumber).orElseThrow(), currentFile);
Optional<BlockHeader> blockHeader = blockchain.getBlockHeader(blockNumber);
if (blockHeader.isPresent()) {
cacheSingleBlock(blockHeader.get(), currentFile, false);
}
blockNumber++;
}
Files.move(

@ -319,6 +319,37 @@ public class DefaultBlockchain implements MutableBlockchain {
return blockAddedEvent;
}
@Override
public synchronized void unsafeImportBlock(
final Block block,
final List<TransactionReceipt> transactionReceipts,
final Optional<Difficulty> maybeTotalDifficulty) {
final BlockchainStorage.Updater updater = blockchainStorage.updater();
final Hash hash = block.getHash();
updater.putBlockHeader(hash, block.getHeader());
updater.putBlockHash(block.getHeader().getNumber(), hash);
updater.putBlockBody(hash, block.getBody());
final int nbTrx = block.getBody().getTransactions().size();
for (int i = 0; i < nbTrx; i++) {
final Hash transactionHash = block.getBody().getTransactions().get(i).getHash();
updater.putTransactionLocation(transactionHash, new TransactionLocation(transactionHash, i));
}
updater.putTransactionReceipts(hash, transactionReceipts);
maybeTotalDifficulty.ifPresent(
totalDifficulty -> updater.putTotalDifficulty(hash, totalDifficulty));
updater.commit();
}
@Override
public synchronized void unsafeSetChainHead(
final BlockHeader blockHeader, final Difficulty totalDifficulty) {
final BlockchainStorage.Updater updater = blockchainStorage.updater();
this.chainHeader = blockHeader;
this.totalDifficulty = totalDifficulty;
updater.setChainHead(blockHeader.getBlockHash());
updater.commit();
}
private Difficulty calculateTotalDifficulty(final BlockHeader blockHeader) {
if (blockHeader.getNumber() == BlockHeader.GENESIS_BLOCK_NUMBER) {
return blockHeader.getDifficulty();

@ -16,9 +16,12 @@ package org.hyperledger.besu.ethereum.chain;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import java.util.List;
import java.util.Optional;
public interface MutableBlockchain extends Blockchain {
@ -34,6 +37,13 @@ public interface MutableBlockchain extends Blockchain {
*/
void appendBlock(Block block, List<TransactionReceipt> receipts);
void unsafeImportBlock(
final Block block,
final List<TransactionReceipt> receipts,
final Optional<Difficulty> maybeTtalDifficulty);
void unsafeSetChainHead(final BlockHeader blockHeader, final Difficulty totalDifficulty);
/**
* Rolls back the canonical chainhead to the specified block number.
*

@ -40,4 +40,16 @@ public interface BlockHeaderFunctions {
* consensus mechanism does not include parseable information in the extra data field.
*/
ParsedExtraData parseExtraData(BlockHeader header);
/**
* Depending on the consensus, several block headers must be downloaded before the checkpoint in
* order to validate the following blocks. This method returns the necessary number of block
* headers to download to be able to start the checkpoint sync
*
* @param blockHeader of the checkpoint
* @return number of headers to download
*/
default int getCheckPointWindowSize(final BlockHeader blockHeader) {
return 1;
}
}

@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
@ -81,6 +82,9 @@ public class EthPeer implements Comparable<EthPeer> {
return size() > maxTrackedSeenBlocks;
}
}));
private Optional<BlockHeader> checkpointHeader = Optional.empty();
private final String protocolName;
private final Clock clock;
private final List<NodeMessagePermissioningProvider> permissioningProviders;
@ -555,6 +559,14 @@ public class EthPeer implements Comparable<EthPeer> {
return getConnection().getPeerInfo().compareTo(ethPeer.getConnection().getPeerInfo());
}
public void setCheckpointHeader(final BlockHeader header) {
checkpointHeader = Optional.of(header);
}
public Optional<BlockHeader> getCheckpointHeader() {
return checkpointHeader;
}
@FunctionalInterface
public interface DisconnectCallback {
void onDisconnect(EthPeer peer);

@ -0,0 +1,51 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.peervalidation;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
public class CheckpointBlocksPeerValidator extends RequiredBlocksPeerValidator {
public CheckpointBlocksPeerValidator(
final ProtocolSchedule protocolSchedule,
final MetricsSystem metricsSystem,
final long blockNumber,
final Hash hash,
final long chainHeightEstimationBuffer) {
super(protocolSchedule, metricsSystem, blockNumber, hash, chainHeightEstimationBuffer);
}
public CheckpointBlocksPeerValidator(
final ProtocolSchedule protocolSchedule,
final MetricsSystem metricsSystem,
final long blockNumber,
final Hash hash) {
this(
protocolSchedule, metricsSystem, blockNumber, hash, DEFAULT_CHAIN_HEIGHT_ESTIMATION_BUFFER);
}
@Override
boolean validateBlockHeader(final EthPeer ethPeer, final BlockHeader header) {
final boolean valid = super.validateBlockHeader(ethPeer, header);
if (valid) {
ethPeer.setCheckpointHeader(header);
}
return valid;
}
}

@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.checkpointsync.CheckpointDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate.FastDownloaderFactory;
@ -112,9 +113,22 @@ public class DefaultSynchronizer implements Synchronizer {
metricsSystem,
terminationCondition));
if (SyncMode.X_SNAP.equals(syncConfig.getSyncMode())) {
if (SyncMode.FAST.equals(syncConfig.getSyncMode())) {
this.fastSyncDownloader =
SnapDownloaderFactory.createSnapDownloader(
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
protocolContext,
metricsSystem,
ethContext,
worldStateStorage,
syncState,
clock);
} else if (SyncMode.X_CHECKPOINT.equals(syncConfig.getSyncMode())) {
this.fastSyncDownloader =
CheckpointDownloaderFactory.createCheckpointDownloader(
pivotBlockSelector,
syncConfig,
dataDirectory,
@ -127,7 +141,7 @@ public class DefaultSynchronizer implements Synchronizer {
clock);
} else {
this.fastSyncDownloader =
FastDownloaderFactory.create(
SnapDownloaderFactory.createSnapDownloader(
pivotBlockSelector,
syncConfig,
dataDirectory,
@ -168,7 +182,7 @@ public class DefaultSynchronizer implements Synchronizer {
blockPropagationManager.ifPresent(BlockPropagationManager::start);
CompletableFuture<Void> future;
if (fastSyncDownloader.isPresent()) {
future = fastSyncDownloader.get().start().thenCompose(this::handleFastSyncResult);
future = fastSyncDownloader.get().start().thenCompose(this::handleSyncResult);
} else {
syncState.markInitialSyncPhaseAsDone();
@ -198,7 +212,7 @@ public class DefaultSynchronizer implements Synchronizer {
}
}
private CompletableFuture<Void> handleFastSyncResult(final FastSyncState result) {
private CompletableFuture<Void> handleSyncResult(final FastSyncState result) {
if (!running.get()) {
// We've been shutdown which will have triggered the fast sync future to complete
return CompletableFuture.completedFuture(null);
@ -210,7 +224,7 @@ public class DefaultSynchronizer implements Synchronizer {
blockHeader ->
protocolContext.getWorldStateArchive().setArchiveStateUnSafe(blockHeader));
LOG.info(
"Fast sync completed successfully with pivot block {}",
"Sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
pivotBlockSelector.close();
syncState.markInitialSyncPhaseAsDone();

@ -22,6 +22,8 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeaders;
import org.hyperledger.besu.ethereum.eth.sync.range.SyncTargetRange;
import org.hyperledger.besu.ethereum.eth.sync.tasks.DownloadHeaderSequenceTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -36,7 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DownloadHeadersStep
implements Function<CheckpointRange, CompletableFuture<CheckpointRangeHeaders>> {
implements Function<SyncTargetRange, CompletableFuture<RangeHeaders>> {
private static final Logger LOG = LoggerFactory.getLogger(DownloadHeadersStep.class);
private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext;
@ -61,22 +63,21 @@ public class DownloadHeadersStep
}
@Override
public CompletableFuture<CheckpointRangeHeaders> apply(final CheckpointRange checkpointRange) {
public CompletableFuture<RangeHeaders> apply(final SyncTargetRange checkpointRange) {
final CompletableFuture<List<BlockHeader>> taskFuture = downloadHeaders(checkpointRange);
final CompletableFuture<CheckpointRangeHeaders> processedFuture =
final CompletableFuture<RangeHeaders> processedFuture =
taskFuture.thenApply(headers -> processHeaders(checkpointRange, headers));
FutureUtils.propagateCancellation(processedFuture, taskFuture);
return processedFuture;
}
private CompletableFuture<List<BlockHeader>> downloadHeaders(
final CheckpointRange checkpointRange) {
if (checkpointRange.hasEnd()) {
private CompletableFuture<List<BlockHeader>> downloadHeaders(final SyncTargetRange range) {
if (range.hasEnd()) {
LOG.debug(
"Downloading headers for range {} to {}",
checkpointRange.getStart().getNumber(),
checkpointRange.getEnd().getNumber());
if (checkpointRange.getSegmentLengthExclusive() == 0) {
range.getStart().getNumber(),
range.getEnd().getNumber());
if (range.getSegmentLengthExclusive() == 0) {
// There are no extra headers to download.
return completedFuture(emptyList());
}
@ -84,38 +85,38 @@ public class DownloadHeadersStep
protocolSchedule,
protocolContext,
ethContext,
checkpointRange.getEnd(),
checkpointRange.getSegmentLengthExclusive(),
range.getEnd(),
range.getSegmentLengthExclusive(),
validationPolicy,
metricsSystem)
.run();
} else {
LOG.debug("Downloading headers starting from {}", checkpointRange.getStart().getNumber());
LOG.debug("Downloading headers starting from {}", range.getStart().getNumber());
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
checkpointRange.getStart().getHash(),
checkpointRange.getStart().getNumber(),
range.getStart().getHash(),
range.getStart().getNumber(),
headerRequestSize,
metricsSystem)
.assignPeer(checkpointRange.getSyncTarget())
.assignPeer(range.getSyncTarget())
.run()
.thenApply(PeerTaskResult::getResult);
}
}
private CheckpointRangeHeaders processHeaders(
final CheckpointRange checkpointRange, final List<BlockHeader> headers) {
private RangeHeaders processHeaders(
final SyncTargetRange checkpointRange, final List<BlockHeader> headers) {
if (checkpointRange.hasEnd()) {
final List<BlockHeader> headersToImport = new ArrayList<>(headers);
headersToImport.add(checkpointRange.getEnd());
return new CheckpointRangeHeaders(checkpointRange, headersToImport);
return new RangeHeaders(checkpointRange, headersToImport);
} else {
List<BlockHeader> headersToImport = headers;
if (!headers.isEmpty() && headers.get(0).equals(checkpointRange.getStart())) {
headersToImport = headers.subList(1, headers.size());
}
return new CheckpointRangeHeaders(checkpointRange, headersToImport);
return new RangeHeaders(checkpointRange, headersToImport);
}
}
}

@ -14,9 +14,13 @@
*/
package org.hyperledger.besu.ethereum.eth.sync;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.services.pipeline.Pipeline;
import java.util.concurrent.CompletionStage;
public interface DownloadPipelineFactory {
/**
@ -27,4 +31,7 @@ public interface DownloadPipelineFactory {
* @return the created but not yet started pipeline.
*/
Pipeline<?> createDownloadPipelineForSyncTarget(SyncTarget target);
CompletionStage<Void> startPipeline(
EthScheduler scheduler, SyncState syncState, SyncTarget syncTarget, Pipeline<?> pipeline);
}

@ -157,13 +157,14 @@ public class PipelineChainDownloader implements ChainDownloader {
return CompletableFuture.completedFuture(null);
}
syncState.setSyncTarget(target.peer(), target.commonAncestor());
currentDownloadPipeline = downloadPipelineFactory.createDownloadPipelineForSyncTarget(target);
debugLambda(
LOG,
"Starting download pipeline for sync target {}, common ancestor {} ({})",
() -> target,
() -> target.commonAncestor().getNumber(),
() -> target.commonAncestor().getBlockHash());
return scheduler.startPipeline(currentDownloadPipeline);
currentDownloadPipeline = downloadPipelineFactory.createDownloadPipelineForSyncTarget(target);
return downloadPipelineFactory.startPipeline(
scheduler, syncState, target, currentDownloadPipeline);
}
}

@ -14,13 +14,17 @@
*/
package org.hyperledger.besu.ethereum.eth.sync;
import java.util.EnumSet;
public enum SyncMode {
// Fully validate all blocks as they sync
FULL,
// Perform light validation on older blocks, and switch to full validation for more recent blocks
FAST,
// Perform snapsync
X_SNAP;
X_SNAP,
// Perform snapsync but starting from a checkpoint instead of starting from genesis
X_CHECKPOINT;
public static SyncMode fromString(final String str) {
for (final SyncMode mode : SyncMode.values()) {
@ -30,4 +34,8 @@ public enum SyncMode {
}
return null;
}
public static boolean isFullSync(final SyncMode syncMode) {
return !EnumSet.of(SyncMode.FAST, SyncMode.X_SNAP, SyncMode.X_CHECKPOINT).contains(syncMode);
}
}

@ -0,0 +1,57 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import java.util.Optional;
import java.util.function.Consumer;
public class CheckpointBlockImportStep implements Consumer<Optional<BlockWithReceipts>> {
private final CheckpointSource checkpointSource;
private final Checkpoint checkpoint;
private final MutableBlockchain blockchain;
public CheckpointBlockImportStep(
final CheckpointSource checkpointSource,
final Checkpoint checkpoint,
final MutableBlockchain blockchain) {
this.checkpointSource = checkpointSource;
this.checkpoint = checkpoint;
this.blockchain = blockchain;
}
@Override
public void accept(final Optional<BlockWithReceipts> maybeBlock) {
maybeBlock.ifPresent(
block -> {
blockchain.unsafeImportBlock(
block.getBlock(),
block.getReceipts(),
block.getHash().equals(checkpoint.blockHash())
? Optional.of(checkpoint.totalDifficulty())
: Optional.empty());
checkpointSource.setLastHeaderDownloaded(Optional.of(block.getHeader()));
if (!checkpointSource.hasNext()) {
blockchain.unsafeSetChainHead(
checkpointSource.getCheckpoint(), checkpoint.totalDifficulty());
}
});
checkpointSource.notifyTaskAvailable();
}
}

@ -0,0 +1,81 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.GetBlockFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class CheckpointDownloadBlockStep {
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final Checkpoint checkpoint;
private final MetricsSystem metricsSystem;
public CheckpointDownloadBlockStep(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Checkpoint checkpoint,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.checkpoint = checkpoint;
this.metricsSystem = metricsSystem;
}
public CompletableFuture<Optional<BlockWithReceipts>> downloadBlock(final Hash hash) {
final GetBlockFromPeerTask getBlockFromPeerTask =
GetBlockFromPeerTask.create(
protocolSchedule,
ethContext,
Optional.of(hash),
checkpoint.blockNumber(),
metricsSystem);
return getBlockFromPeerTask
.run()
.thenCompose(this::downloadReceipts)
.exceptionally(throwable -> Optional.empty());
}
private CompletableFuture<Optional<BlockWithReceipts>> downloadReceipts(
final PeerTaskResult<Block> peerTaskResult) {
final Block block = peerTaskResult.getResult();
final GetReceiptsFromPeerTask getReceiptsFromPeerTask =
GetReceiptsFromPeerTask.forHeaders(ethContext, List.of(block.getHeader()), metricsSystem);
return getReceiptsFromPeerTask
.run()
.thenCompose(
receiptTaskResult -> {
final List<TransactionReceipt> transactionReceipts =
receiptTaskResult.getResult().get(block.getHeader());
return CompletableFuture.completedFuture(
Optional.of(new BlockWithReceipts(block, transactionReceipts)));
})
.exceptionally(throwable -> Optional.empty());
}
}

@ -0,0 +1,149 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncStateStorage;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncState;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapWorldStateDownloader;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointDownloaderFactory.class);
public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(
final PivotBlockSelector pivotBlockSelector,
final SynchronizerConfiguration syncConfig,
final Path dataDirectory,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final MetricsSystem metricsSystem,
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final SyncState syncState,
final Clock clock) {
final Path fastSyncDataDirectory = dataDirectory.resolve(FAST_SYNC_FOLDER);
final FastSyncStateStorage fastSyncStateStorage =
new FastSyncStateStorage(fastSyncDataDirectory);
if (SyncMode.isFullSync(syncConfig.getSyncMode())) {
if (fastSyncStateStorage.isFastSyncInProgress()) {
throw new IllegalStateException(
"Unable to change the sync mode when snap sync is incomplete, please restart with checkpoint sync mode");
} else {
return Optional.empty();
}
}
ensureDirectoryExists(fastSyncDataDirectory.toFile());
final FastSyncState fastSyncState =
fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule));
if (fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
!= BlockHeader.GENESIS_BLOCK_NUMBER) {
LOG.info(
"Checkpoint sync was requested, but cannot be enabled because the local blockchain is not empty.");
return Optional.empty();
}
final FastSyncActions fastSyncActions;
if (syncState.getCheckpoint().isEmpty()) {
LOG.warn("Unable to find a valid checkpoint configuration. The genesis will be used");
fastSyncActions =
new FastSyncActions(
syncConfig,
worldStateStorage,
protocolSchedule,
protocolContext,
ethContext,
syncState,
pivotBlockSelector,
metricsSystem);
} else {
LOG.info(
"Checkpoint sync start with block {} and hash {}",
syncState.getCheckpoint().get().blockNumber(),
syncState.getCheckpoint().get().blockHash());
fastSyncActions =
new CheckpointSyncActions(
syncConfig,
worldStateStorage,
protocolSchedule,
protocolContext,
ethContext,
syncState,
pivotBlockSelector,
metricsSystem);
}
final SnapSyncState snapSyncState =
new SnapSyncState(
fastSyncStateStorage.loadState(
ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)));
worldStateStorage.clear();
final InMemoryTasksPriorityQueues<SnapDataRequest> snapTaskCollection =
createSnapWorldStateDownloaderTaskCollection();
final WorldStateDownloader snapWorldStateDownloader =
new SnapWorldStateDownloader(
ethContext,
worldStateStorage,
snapTaskCollection,
syncConfig.getSnapSyncConfiguration(),
syncConfig.getWorldStateRequestParallelism(),
syncConfig.getWorldStateMaxRequestsWithoutProgress(),
syncConfig.getWorldStateMinMillisBeforeStalling(),
clock,
metricsSystem);
final FastSyncDownloader<SnapDataRequest> fastSyncDownloader =
new SnapSyncDownloader(
fastSyncActions,
worldStateStorage,
snapWorldStateDownloader,
fastSyncStateStorage,
snapTaskCollection,
fastSyncDataDirectory,
snapSyncState);
syncState.setWorldStateDownloadStatus(snapWorldStateDownloader);
return Optional.of(fastSyncDownloader);
}
}

@ -0,0 +1,75 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
public class CheckpointSource implements Iterator<Hash> {
private final SyncState syncState;
private final BlockHeader checkpoint;
private final int nbBlocks;
private Optional<BlockHeader> lastHeaderDownloaded;
private final AtomicBoolean isDownloading = new AtomicBoolean(false);
public CheckpointSource(
final SyncState syncState,
final EthPeer ethPeer,
final BlockHeaderFunctions blockHeaderFunctions) {
this.syncState = syncState;
this.checkpoint = ethPeer.getCheckpointHeader().orElseThrow();
this.nbBlocks = blockHeaderFunctions.getCheckPointWindowSize(checkpoint);
this.lastHeaderDownloaded = Optional.empty();
}
@Override
public boolean hasNext() {
return syncState.getLocalChainHeight() == 0
&& lastHeaderDownloaded
.map(blockHeader -> blockHeader.getNumber() > (checkpoint.getNumber() - nbBlocks))
.orElse(true);
}
@Override
public synchronized Hash next() {
isDownloading.getAndSet(true);
return lastHeaderDownloaded
.map(ProcessableBlockHeader::getParentHash)
.orElse(checkpoint.getHash());
}
public synchronized void notifyTaskAvailable() {
isDownloading.getAndSet(false);
notifyAll();
}
public BlockHeader getCheckpoint() {
return checkpoint;
}
public void setLastHeaderDownloaded(final Optional<BlockHeader> lastHeaderDownloaded) {
this.lastHeaderDownloaded = lastHeaderDownloaded;
}
}

@ -0,0 +1,62 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
public class CheckpointSyncActions extends FastSyncActions {
public CheckpointSyncActions(
final SynchronizerConfiguration syncConfig,
final WorldStateStorage worldStateStorage,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final PivotBlockSelector pivotBlockSelector,
final MetricsSystem metricsSystem) {
super(
syncConfig,
worldStateStorage,
protocolSchedule,
protocolContext,
ethContext,
syncState,
pivotBlockSelector,
metricsSystem);
}
@Override
public ChainDownloader createChainDownloader(final FastSyncState currentState) {
return CheckpointSyncChainDownloader.create(
syncConfig,
worldStateStorage,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
currentState);
}
}

@ -0,0 +1,59 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/ package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncTargetManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
public class CheckpointSyncChainDownloader extends FastSyncChainDownloader {
public static ChainDownloader create(
final SynchronizerConfiguration config,
final WorldStateStorage worldStateStorage,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final SyncState syncState,
final MetricsSystem metricsSystem,
final FastSyncState fastSyncState) {
final FastSyncTargetManager syncTargetManager =
new FastSyncTargetManager(
config,
worldStateStorage,
protocolSchedule,
protocolContext,
ethContext,
metricsSystem,
fastSyncState);
return new PipelineChainDownloader(
syncState,
syncTargetManager,
new CheckpointSyncDownloadPipelineFactory(
config, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem),
ethContext.getScheduler(),
metricsSystem);
}
}

@ -0,0 +1,101 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloadPipelineFactory;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;
import java.util.concurrent.CompletionStage;
public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipelineFactory {
public CheckpointSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final FastSyncState fastSyncState,
final MetricsSystem metricsSystem) {
super(syncConfig, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem);
}
@Override
public CompletionStage<Void> startPipeline(
final EthScheduler scheduler,
final SyncState syncState,
final SyncTarget syncTarget,
final Pipeline<?> pipeline) {
return scheduler
.startPipeline(createDownloadCheckPointPipeline(syncState, syncTarget))
.thenCompose(unused -> scheduler.startPipeline(pipeline));
}
protected Pipeline<Hash> createDownloadCheckPointPipeline(
final SyncState syncState, final SyncTarget target) {
final Checkpoint checkpoint = syncState.getCheckpoint().orElseThrow();
final CheckpointSource checkPointSource =
new CheckpointSource(
syncState,
target.peer(),
protocolSchedule.getByBlockNumber(checkpoint.blockNumber()).getBlockHeaderFunctions());
final CheckpointBlockImportStep checkPointBlockImportStep =
new CheckpointBlockImportStep(
checkPointSource, checkpoint, protocolContext.getBlockchain());
final CheckpointDownloadBlockStep checkPointDownloadBlockStep =
new CheckpointDownloadBlockStep(protocolSchedule, ethContext, checkpoint, metricsSystem);
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",
checkPointSource,
1,
metricsSystem.createLabelledCounter(
BesuMetricCategory.SYNCHRONIZER,
"chain_download_pipeline_processed_total",
"Number of header process by each chain download pipeline stage",
"step",
"action"),
true,
"checkpointSync")
.thenProcessAsyncOrdered("downloadBlock", checkPointDownloadBlockStep::downloadBlock, 1)
.andFinishWith("importBlock", checkPointBlockImportStep);
}
@Override
protected BlockHeader getCommonAncestor(final SyncTarget target) {
return target
.peer()
.getCheckpointHeader()
.filter(checkpoint -> checkpoint.getNumber() > target.commonAncestor().getNumber())
.orElse(target.commonAncestor());
}
}

@ -37,7 +37,7 @@ public class FastImportBlocksStep implements Consumer<List<BlockWithReceipts>> {
private static final long TEN_SECONDS = TimeUnit.SECONDS.toMillis(10L);
private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext;
protected final ProtocolContext protocolContext;
private final ValidationPolicy headerValidationPolicy;
private final ValidationPolicy ommerValidationPolicy;
private final EthContext ethContext;
@ -93,7 +93,7 @@ public class FastImportBlocksStep implements Consumer<List<BlockWithReceipts>> {
}
}
private boolean importBlock(final BlockWithReceipts blockWithReceipts) {
protected boolean importBlock(final BlockWithReceipts blockWithReceipts) {
final BlockImporter importer =
protocolSchedule.getByBlockNumber(blockWithReceipts.getNumber()).getBlockImporter();
return importer.fastImportBlock(

@ -49,16 +49,16 @@ import org.slf4j.LoggerFactory;
public class FastSyncActions {
private static final Logger LOG = LoggerFactory.getLogger(FastSyncActions.class);
private final SynchronizerConfiguration syncConfig;
private final WorldStateStorage worldStateStorage;
private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final PivotBlockSelector pivotBlockSelector;
private final MetricsSystem metricsSystem;
private final Counter pivotBlockSelectionCounter;
private final AtomicLong pivotBlockGauge = new AtomicLong(0);
protected final SynchronizerConfiguration syncConfig;
protected final WorldStateStorage worldStateStorage;
protected final ProtocolSchedule protocolSchedule;
protected final ProtocolContext protocolContext;
protected final EthContext ethContext;
protected final SyncState syncState;
protected final PivotBlockSelector pivotBlockSelector;
protected final MetricsSystem metricsSystem;
protected final Counter pivotBlockSelectionCounter;
protected final AtomicLong pivotBlockGauge = new AtomicLong(0);
public FastSyncActions(
final SynchronizerConfiguration syncConfig,

@ -26,7 +26,7 @@ import org.hyperledger.besu.plugin.services.MetricsSystem;
public class FastSyncChainDownloader {
private FastSyncChainDownloader() {}
protected FastSyncChainDownloader() {}
public static ChainDownloader create(
final SynchronizerConfiguration config,
@ -47,7 +47,6 @@ public class FastSyncChainDownloader {
ethContext,
metricsSystem,
fastSyncState);
return new PipelineChainDownloader(
syncState,
syncTargetManager,

@ -25,15 +25,17 @@ import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.CheckpointHeaderFetcher;
import org.hyperledger.besu.ethereum.eth.sync.CheckpointHeaderValidationStep;
import org.hyperledger.besu.ethereum.eth.sync.CheckpointRange;
import org.hyperledger.besu.ethereum.eth.sync.CheckpointRangeSource;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeadersFetcher;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeadersValidationStep;
import org.hyperledger.besu.ethereum.eth.sync.range.SyncTargetRange;
import org.hyperledger.besu.ethereum.eth.sync.range.SyncTargetRangeSource;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.BesuMetricCategory;
@ -43,16 +45,18 @@ import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;
import java.util.concurrent.CompletionStage;
public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory {
private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext;
private final EthContext ethContext;
private final FastSyncState fastSyncState;
private final MetricsSystem metricsSystem;
private final FastSyncValidationPolicy attachedValidationPolicy;
private final FastSyncValidationPolicy detachedValidationPolicy;
private final FastSyncValidationPolicy ommerValidationPolicy;
protected final SynchronizerConfiguration syncConfig;
protected final ProtocolSchedule protocolSchedule;
protected final ProtocolContext protocolContext;
protected final EthContext ethContext;
protected final FastSyncState fastSyncState;
protected final MetricsSystem metricsSystem;
protected final FastSyncValidationPolicy attachedValidationPolicy;
protected final FastSyncValidationPolicy detachedValidationPolicy;
protected final FastSyncValidationPolicy ommerValidationPolicy;
public FastSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
@ -94,18 +98,28 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
}
@Override
public Pipeline<CheckpointRange> createDownloadPipelineForSyncTarget(final SyncTarget target) {
public CompletionStage<Void> startPipeline(
final EthScheduler scheduler,
final SyncState syncState,
final SyncTarget syncTarget,
final Pipeline<?> pipeline) {
return scheduler.startPipeline(pipeline);
}
@Override
public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(final SyncTarget target) {
final int downloaderParallelism = syncConfig.getDownloaderParallelism();
final int headerRequestSize = syncConfig.getDownloaderHeaderRequestSize();
final int singleHeaderBufferSize = headerRequestSize * downloaderParallelism;
final CheckpointRangeSource checkpointRangeSource =
new CheckpointRangeSource(
new CheckpointHeaderFetcher(
final SyncTargetRangeSource checkpointRangeSource =
new SyncTargetRangeSource(
new RangeHeadersFetcher(
syncConfig, protocolSchedule, ethContext, fastSyncState, metricsSystem),
this::shouldContinueDownloadingFromPeer,
ethContext.getScheduler(),
target.peer(),
target.commonAncestor(),
getCommonAncestor(target),
syncConfig.getDownloaderCheckpointTimeoutsPermitted(),
SyncTerminationCondition.never());
final DownloadHeadersStep downloadHeadersStep =
@ -116,9 +130,8 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
detachedValidationPolicy,
headerRequestSize,
metricsSystem);
final CheckpointHeaderValidationStep validateHeadersJoinUpStep =
new CheckpointHeaderValidationStep(
protocolSchedule, protocolContext, detachedValidationPolicy);
final RangeHeadersValidationStep validateHeadersJoinUpStep =
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
final DownloadReceiptsStep downloadReceiptsStep =
@ -151,10 +164,13 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
.andFinishWith("importBlock", importBlockStep);
}
private boolean shouldContinueDownloadingFromPeer(
final EthPeer peer, final BlockHeader lastCheckpointHeader) {
protected BlockHeader getCommonAncestor(final SyncTarget syncTarget) {
return syncTarget.commonAncestor();
}
protected boolean shouldContinueDownloadingFromPeer(
final EthPeer peer, final BlockHeader lastRoundHeader) {
final BlockHeader pivotBlockHeader = fastSyncState.getPivotBlockHeader().get();
return !peer.isDisconnected()
&& lastCheckpointHeader.getNumber() < pivotBlockHeader.getNumber();
return !peer.isDisconnected() && lastRoundHeader.getNumber() < pivotBlockHeader.getNumber();
}
}

@ -80,7 +80,7 @@ public class FastSyncDownloader<REQUEST> {
}
protected CompletableFuture<FastSyncState> start(final FastSyncState fastSyncState) {
LOG.info("Starting fast sync.");
LOG.info("Starting sync.");
if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) {
LOG.info("Clearing bonsai flat account db");
worldStateStorage.clearFlatDatabase();

@ -36,7 +36,7 @@ import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class FastSyncTargetManager extends SyncTargetManager {
public class FastSyncTargetManager extends SyncTargetManager {
private static final Logger LOG = LoggerFactory.getLogger(FastSyncTargetManager.class);
private final WorldStateStorage worldStateStorage;

@ -0,0 +1,30 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.immutables.value.Value;
@Value.Immutable
public interface Checkpoint {
long blockNumber();
Hash blockHash();
Difficulty totalDifficulty();
}

@ -67,7 +67,7 @@ public class FastDownloaderFactory {
final FastSyncStateStorage fastSyncStateStorage =
new FastSyncStateStorage(fastSyncDataDirectory);
if (syncConfig.getSyncMode() != SyncMode.FAST) {
if (SyncMode.isFullSync(syncConfig.getSyncMode())) {
if (fastSyncStateStorage.isFastSyncInProgress()) {
throw new IllegalStateException(
"Unable to change the sync mode when fast sync is incomplete, please restart with fast sync mode");

@ -18,14 +18,16 @@ import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.CheckpointHeaderFetcher;
import org.hyperledger.besu.ethereum.eth.sync.CheckpointHeaderValidationStep;
import org.hyperledger.besu.ethereum.eth.sync.CheckpointRangeSource;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.ValidationPolicy;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeadersFetcher;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeadersValidationStep;
import org.hyperledger.besu.ethereum.eth.sync.range.SyncTargetRangeSource;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@ -34,6 +36,8 @@ import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,14 +70,23 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
}
@Override
public CompletionStage<Void> startPipeline(
final EthScheduler scheduler,
final SyncState syncState,
final SyncTarget syncTarget,
final Pipeline<?> pipeline) {
return scheduler.startPipeline(pipeline);
}
@Override
public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target) {
final int downloaderParallelism = syncConfig.getDownloaderParallelism();
final int headerRequestSize = syncConfig.getDownloaderHeaderRequestSize();
final int singleHeaderBufferSize = headerRequestSize * downloaderParallelism;
final CheckpointRangeSource checkpointRangeSource =
new CheckpointRangeSource(
new CheckpointHeaderFetcher(syncConfig, protocolSchedule, ethContext, metricsSystem),
final SyncTargetRangeSource checkpointRangeSource =
new SyncTargetRangeSource(
new RangeHeadersFetcher(syncConfig, protocolSchedule, ethContext, metricsSystem),
this::shouldContinueDownloadingFromPeer,
ethContext.getScheduler(),
target.peer(),
@ -88,9 +101,8 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory
detachedValidationPolicy,
headerRequestSize,
metricsSystem);
final CheckpointHeaderValidationStep validateHeadersJoinUpStep =
new CheckpointHeaderValidationStep(
protocolSchedule, protocolContext, detachedValidationPolicy);
final RangeHeadersValidationStep validateHeadersJoinUpStep =
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep();

@ -12,7 +12,7 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;
package org.hyperledger.besu.ethereum.eth.sync.range;
import static com.google.common.base.Preconditions.checkArgument;
@ -25,25 +25,24 @@ import com.google.common.base.MoreObjects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CheckpointRangeHeaders {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointRangeHeaders.class);
public class RangeHeaders {
private static final Logger LOG = LoggerFactory.getLogger(RangeHeaders.class);
private final CheckpointRange checkpointRange;
private final SyncTargetRange range;
private final List<BlockHeader> headersToImport;
public CheckpointRangeHeaders(
final CheckpointRange checkpointRange, final List<BlockHeader> headersToImport) {
public RangeHeaders(
final SyncTargetRange checkpointRange, final List<BlockHeader> headersToImport) {
if (headersToImport.isEmpty()) {
LOG.debug(
String.format("Headers list empty. CheckpointRange: %s", checkpointRange.toString()));
LOG.debug(String.format("Headers list empty. Range: %s", checkpointRange.toString()));
}
checkArgument(!headersToImport.isEmpty(), "Must have at least one header to import");
this.checkpointRange = checkpointRange;
this.range = checkpointRange;
this.headersToImport = headersToImport;
}
public CheckpointRange getCheckpointRange() {
return checkpointRange;
public SyncTargetRange getRange() {
return range;
}
public List<BlockHeader> getHeadersToImport() {
@ -62,20 +61,20 @@ public class CheckpointRangeHeaders {
if (o == null || getClass() != o.getClass()) {
return false;
}
final CheckpointRangeHeaders that = (CheckpointRangeHeaders) o;
return Objects.equals(checkpointRange, that.checkpointRange)
final RangeHeaders that = (RangeHeaders) o;
return Objects.equals(range, that.range)
&& Objects.equals(headersToImport, that.headersToImport);
}
@Override
public int hashCode() {
return Objects.hash(checkpointRange, headersToImport);
return Objects.hash(range, headersToImport);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("checkpointRange", checkpointRange)
.add("range", range)
.add("headersToImport", headersToImport)
.toString();
}

@ -12,7 +12,7 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;
package org.hyperledger.besu.ethereum.eth.sync.range;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
@ -34,17 +35,17 @@ import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CheckpointHeaderFetcher {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointHeaderFetcher.class);
public class RangeHeadersFetcher {
private static final Logger LOG = LoggerFactory.getLogger(RangeHeadersFetcher.class);
private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
// The checkpoint we're aiming to reach at the end of this sync.
// The range we're aiming to reach at the end of this sync.
private final FastSyncState fastSyncState;
private final MetricsSystem metricsSystem;
public CheckpointHeaderFetcher(
public RangeHeadersFetcher(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
@ -52,7 +53,7 @@ public class CheckpointHeaderFetcher {
this(syncConfig, protocolSchedule, ethContext, new FastSyncState(), metricsSystem);
}
public CheckpointHeaderFetcher(
public RangeHeadersFetcher(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
@ -65,17 +66,17 @@ public class CheckpointHeaderFetcher {
this.metricsSystem = metricsSystem;
}
public CompletableFuture<List<BlockHeader>> getNextCheckpointHeaders(
final EthPeer peer, final BlockHeader previousCheckpointHeader) {
public CompletableFuture<List<BlockHeader>> getNextRangeHeaders(
final EthPeer peer, final BlockHeader previousRangeHeader) {
final int skip = syncConfig.getDownloaderChainSegmentSize() - 1;
final int maximumHeaderRequestSize = syncConfig.getDownloaderHeaderRequestSize();
final long previousCheckpointNumber = previousCheckpointHeader.getNumber();
final long previousRangeNumber = previousRangeHeader.getNumber();
final int additionalHeaderCount;
final Optional<BlockHeader> finalCheckpointHeader = fastSyncState.getPivotBlockHeader();
if (finalCheckpointHeader.isPresent()) {
final BlockHeader targetHeader = finalCheckpointHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - previousCheckpointNumber;
final Optional<BlockHeader> finalRangeHeader = fastSyncState.getPivotBlockHeader();
if (finalRangeHeader.isPresent()) {
final BlockHeader targetHeader = finalRangeHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - previousRangeNumber;
if (blocksUntilTarget <= 0) {
return completedFuture(emptyList());
}
@ -88,7 +89,7 @@ public class CheckpointHeaderFetcher {
additionalHeaderCount = maximumHeaderRequestSize;
}
return requestHeaders(peer, previousCheckpointHeader, additionalHeaderCount, skip);
return requestHeaders(peer, previousRangeHeader, additionalHeaderCount, skip);
}
private CompletableFuture<List<BlockHeader>> requestHeaders(
@ -97,7 +98,7 @@ public class CheckpointHeaderFetcher {
final int headerCount,
final int skip) {
LOG.debug(
"Requesting {} checkpoint headers, starting from {}, {} blocks apart",
"Requesting {} range headers, starting from {}, {} blocks apart",
headerCount,
referenceHeader.getNumber(),
skip);
@ -113,10 +114,10 @@ public class CheckpointHeaderFetcher {
.assignPeer(peer)
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(headers -> stripExistingCheckpointHeader(referenceHeader, headers));
.thenApply(headers -> stripExistingRangeHeaders(referenceHeader, headers));
}
private List<BlockHeader> stripExistingCheckpointHeader(
private List<BlockHeader> stripExistingRangeHeaders(
final BlockHeader lastHeader, final List<BlockHeader> headers) {
if (!headers.isEmpty() && headers.get(0).equals(lastHeader)) {
return headers.subList(1, headers.size());
@ -124,15 +125,15 @@ public class CheckpointHeaderFetcher {
return headers;
}
public boolean nextCheckpointEndsAtChainHead(
final EthPeer peer, final BlockHeader previousCheckpointHeader) {
final Optional<BlockHeader> finalCheckpointHeader = fastSyncState.getPivotBlockHeader();
if (finalCheckpointHeader.isPresent()) {
public boolean nextRangeEndsAtChainHead(
final EthPeer peer, final BlockHeader previousRangeHeader) {
final Optional<BlockHeader> finalRangeHeader = fastSyncState.getPivotBlockHeader();
if (finalRangeHeader.isPresent()) {
return false;
}
final int skip = syncConfig.getDownloaderChainSegmentSize() - 1;
final long peerEstimatedHeight = peer.chainState().getEstimatedHeight();
final long previousCheckpointNumber = previousCheckpointHeader.getNumber();
return previousCheckpointNumber + skip >= peerEstimatedHeight;
final long previousRangeNumber = previousRangeHeader.getNumber();
return previousRangeNumber + skip >= peerEstimatedHeight;
}
}

@ -12,10 +12,11 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;
package org.hyperledger.besu.ethereum.eth.sync.range;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.sync.ValidationPolicy;
import org.hyperledger.besu.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@ -24,14 +25,13 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import java.util.function.Function;
import java.util.stream.Stream;
public class CheckpointHeaderValidationStep
implements Function<CheckpointRangeHeaders, Stream<BlockHeader>> {
public class RangeHeadersValidationStep implements Function<RangeHeaders, Stream<BlockHeader>> {
private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext;
private final ValidationPolicy validationPolicy;
public CheckpointHeaderValidationStep(
public RangeHeadersValidationStep(
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final ValidationPolicy validationPolicy) {
@ -41,16 +41,16 @@ public class CheckpointHeaderValidationStep
}
@Override
public Stream<BlockHeader> apply(final CheckpointRangeHeaders checkpointRangeHeaders) {
final BlockHeader rangeStart = checkpointRangeHeaders.getCheckpointRange().getStart();
final BlockHeader firstHeaderToImport = checkpointRangeHeaders.getFirstHeaderToImport();
public Stream<BlockHeader> apply(final RangeHeaders rangeHeaders) {
final BlockHeader rangeStart = rangeHeaders.getRange().getStart();
final BlockHeader firstHeaderToImport = rangeHeaders.getFirstHeaderToImport();
if (isValid(rangeStart, firstHeaderToImport)) {
return checkpointRangeHeaders.getHeadersToImport().stream();
return rangeHeaders.getHeadersToImport().stream();
} else {
final String rangeEndDescription;
if (checkpointRangeHeaders.getCheckpointRange().hasEnd()) {
final BlockHeader rangeEnd = checkpointRangeHeaders.getCheckpointRange().getEnd();
if (rangeHeaders.getRange().hasEnd()) {
final BlockHeader rangeEnd = rangeHeaders.getRange().getEnd();
rangeEndDescription =
String.format("#%d (%s)", rangeEnd.getNumber(), rangeEnd.getBlockHash());
} else {
@ -58,7 +58,7 @@ public class CheckpointHeaderValidationStep
}
final String errorMessage =
String.format(
"Invalid checkpoint headers. Headers downloaded between #%d (%s) and %s do not connect at #%d (%s)",
"Invalid range headers. Headers downloaded between #%d (%s) and %s do not connect at #%d (%s)",
rangeStart.getNumber(),
rangeStart.getHash(),
rangeEndDescription,

@ -12,7 +12,7 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;
package org.hyperledger.besu.ethereum.eth.sync.range;
import static java.lang.Math.toIntExact;
@ -24,19 +24,19 @@ import java.util.Optional;
import com.google.common.base.MoreObjects;
public class CheckpointRange {
public class SyncTargetRange {
private final EthPeer syncTarget;
private final BlockHeader start;
private final Optional<BlockHeader> end;
public CheckpointRange(final EthPeer syncTarget, final BlockHeader start) {
public SyncTargetRange(final EthPeer syncTarget, final BlockHeader start) {
this.syncTarget = syncTarget;
this.start = start;
this.end = Optional.empty();
}
public CheckpointRange(final EthPeer syncTarget, final BlockHeader start, final BlockHeader end) {
public SyncTargetRange(final EthPeer syncTarget, final BlockHeader start, final BlockHeader end) {
this.syncTarget = syncTarget;
this.start = start;
this.end = Optional.of(end);
@ -70,7 +70,7 @@ public class CheckpointRange {
if (o == null || getClass() != o.getClass()) {
return false;
}
final CheckpointRange that = (CheckpointRange) o;
final SyncTargetRange that = (SyncTargetRange) o;
return Objects.equals(syncTarget, that.syncTarget)
&& Objects.equals(start, that.start)
&& Objects.equals(end, that.end);

@ -12,7 +12,7 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;
package org.hyperledger.besu.ethereum.eth.sync.range;
import static java.util.Collections.emptyList;
import static java.util.concurrent.CompletableFuture.completedFuture;
@ -36,59 +36,58 @@ import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CheckpointRangeSource implements Iterator<CheckpointRange> {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointRangeSource.class);
public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
private static final Logger LOG = LoggerFactory.getLogger(SyncTargetRangeSource.class);
private static final Duration RETRY_DELAY_DURATION = Duration.ofSeconds(2);
private final CheckpointHeaderFetcher checkpointFetcher;
private final RangeHeadersFetcher fetcher;
private final SyncTargetChecker syncTargetChecker;
private final EthPeer peer;
private final EthScheduler ethScheduler;
private final int checkpointTimeoutsPermitted;
private final int rangeTimeoutsPermitted;
private final Duration newHeaderWaitDuration;
private final SyncTerminationCondition terminationCondition;
private final Queue<CheckpointRange> retrievedRanges = new ArrayDeque<>();
private final Queue<SyncTargetRange> retrievedRanges = new ArrayDeque<>();
private BlockHeader lastRangeEnd;
private boolean reachedEndOfCheckpoints = false;
private Optional<CompletableFuture<List<BlockHeader>>> pendingCheckpointsRequest =
Optional.empty();
private boolean reachedEndOfRanges = false;
private Optional<CompletableFuture<List<BlockHeader>>> pendingRequests = Optional.empty();
private int requestFailureCount = 0;
public CheckpointRangeSource(
final CheckpointHeaderFetcher checkpointFetcher,
public SyncTargetRangeSource(
final RangeHeadersFetcher fetcher,
final SyncTargetChecker syncTargetChecker,
final EthScheduler ethScheduler,
final EthPeer peer,
final BlockHeader commonAncestor,
final int checkpointTimeoutsPermitted,
final int rangeTimeoutsPermitted,
final SyncTerminationCondition terminationCondition) {
this(
checkpointFetcher,
fetcher,
syncTargetChecker,
ethScheduler,
peer,
commonAncestor,
checkpointTimeoutsPermitted,
rangeTimeoutsPermitted,
Duration.ofSeconds(5),
terminationCondition);
}
CheckpointRangeSource(
final CheckpointHeaderFetcher checkpointFetcher,
public SyncTargetRangeSource(
final RangeHeadersFetcher fetcher,
final SyncTargetChecker syncTargetChecker,
final EthScheduler ethScheduler,
final EthPeer peer,
final BlockHeader commonAncestor,
final int checkpointTimeoutsPermitted,
final int rangeTimeoutsPermitted,
final Duration newHeaderWaitDuration,
final SyncTerminationCondition terminationCondition) {
this.checkpointFetcher = checkpointFetcher;
this.fetcher = fetcher;
this.syncTargetChecker = syncTargetChecker;
this.ethScheduler = ethScheduler;
this.peer = peer;
this.lastRangeEnd = commonAncestor;
this.checkpointTimeoutsPermitted = checkpointTimeoutsPermitted;
this.rangeTimeoutsPermitted = rangeTimeoutsPermitted;
this.newHeaderWaitDuration = newHeaderWaitDuration;
this.terminationCondition = terminationCondition;
}
@ -97,45 +96,43 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
public boolean hasNext() {
return terminationCondition.shouldContinueDownload()
&& (!retrievedRanges.isEmpty()
|| (requestFailureCount < checkpointTimeoutsPermitted
|| (requestFailureCount < rangeTimeoutsPermitted
&& syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd)
&& !reachedEndOfCheckpoints));
&& !reachedEndOfRanges));
}
@Override
public CheckpointRange next() {
public SyncTargetRange next() {
if (!retrievedRanges.isEmpty()) {
return retrievedRanges.poll();
}
if (pendingCheckpointsRequest.isPresent()) {
return getCheckpointRangeFromPendingRequest();
if (pendingRequests.isPresent()) {
return getRangeFromPendingRequest();
}
if (reachedEndOfCheckpoints) {
if (reachedEndOfRanges) {
return null;
}
if (checkpointFetcher.nextCheckpointEndsAtChainHead(peer, lastRangeEnd)) {
reachedEndOfCheckpoints = true;
return new CheckpointRange(peer, lastRangeEnd);
if (fetcher.nextRangeEndsAtChainHead(peer, lastRangeEnd)) {
reachedEndOfRanges = true;
return new SyncTargetRange(peer, lastRangeEnd);
}
pendingCheckpointsRequest = Optional.of(getNextCheckpointHeaders());
return getCheckpointRangeFromPendingRequest();
pendingRequests = Optional.of(getNextRangeHeaders());
return getRangeFromPendingRequest();
}
private CompletableFuture<List<BlockHeader>> getNextCheckpointHeaders() {
return checkpointFetcher
.getNextCheckpointHeaders(peer, lastRangeEnd)
private CompletableFuture<List<BlockHeader>> getNextRangeHeaders() {
return fetcher
.getNextRangeHeaders(peer, lastRangeEnd)
.exceptionally(
error -> {
LOG.debug("Failed to retrieve checkpoint headers", error);
LOG.debug("Failed to retrieve range headers", error);
return emptyList();
})
.thenCompose(
checkpoints -> checkpoints.isEmpty() ? pauseBriefly() : completedFuture(checkpoints));
.thenCompose(range -> range.isEmpty() ? pauseBriefly() : completedFuture(range));
}
/**
* Pause after failing to get new checkpoints to prevent requesting new checkpoint headers in a
* tight loop.
* Pause after failing to get new range to prevent requesting new range headers in a tight loop.
*
* @return a future that after the pause completes with an empty list.
*/
@ -144,28 +141,28 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
() -> completedFuture(emptyList()), RETRY_DELAY_DURATION);
}
private CheckpointRange getCheckpointRangeFromPendingRequest() {
final CompletableFuture<List<BlockHeader>> pendingRequest = pendingCheckpointsRequest.get();
private SyncTargetRange getRangeFromPendingRequest() {
final CompletableFuture<List<BlockHeader>> pendingRequest = this.pendingRequests.get();
try {
final List<BlockHeader> newCheckpointHeaders =
final List<BlockHeader> newHeaders =
pendingRequest.get(newHeaderWaitDuration.toMillis(), MILLISECONDS);
pendingCheckpointsRequest = Optional.empty();
if (newCheckpointHeaders.isEmpty()) {
this.pendingRequests = Optional.empty();
if (newHeaders.isEmpty()) {
requestFailureCount++;
} else {
requestFailureCount = 0;
}
for (final BlockHeader checkpointHeader : newCheckpointHeaders) {
retrievedRanges.add(new CheckpointRange(peer, lastRangeEnd, checkpointHeader));
lastRangeEnd = checkpointHeader;
for (final BlockHeader header : newHeaders) {
retrievedRanges.add(new SyncTargetRange(peer, lastRangeEnd, header));
lastRangeEnd = header;
}
return retrievedRanges.poll();
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for new checkpoint headers", e);
LOG.trace("Interrupted while waiting for new range headers", e);
return null;
} catch (final ExecutionException e) {
LOG.debug("Failed to retrieve new checkpoint headers", e);
pendingCheckpointsRequest = Optional.empty();
LOG.debug("Failed to retrieve new range headers", e);
this.pendingRequests = Optional.empty();
requestFailureCount++;
return null;
} catch (final TimeoutException e) {
@ -174,6 +171,6 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
}
public interface SyncTargetChecker {
boolean shouldContinueDownloadingFromSyncTarget(EthPeer peer, BlockHeader lastCheckpointHeader);
boolean shouldContinueDownloadingFromSyncTarget(EthPeer peer, BlockHeader lastRangeHeader);
}
}

@ -61,7 +61,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
final FastSyncStateStorage fastSyncStateStorage =
new FastSyncStateStorage(fastSyncDataDirectory);
if (syncConfig.getSyncMode() != SyncMode.X_SNAP) {
if (SyncMode.isFullSync(syncConfig.getSyncMode())) {
if (fastSyncStateStorage.isFastSyncInProgress()) {
throw new IllegalStateException(
"Unable to change the sync mode when snap sync is incomplete, please restart with snap sync mode");
@ -122,7 +122,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
return Optional.of(fastSyncDownloader);
}
private static InMemoryTasksPriorityQueues<SnapDataRequest>
protected static InMemoryTasksPriorityQueues<SnapDataRequest>
createSnapWorldStateDownloaderTaskCollection() {
return new InMemoryTasksPriorityQueues<>();
}

@ -53,7 +53,7 @@ public class SnapSyncDownloader extends FastSyncDownloader<SnapDataRequest> {
@Override
protected CompletableFuture<FastSyncState> start(final FastSyncState fastSyncState) {
LOG.info("Starting snap sync.");
LOG.info("Starting sync.");
return findPivotBlock(fastSyncState, fss -> downloadChainAndWorldState(fastSyncActions, fss));
}

@ -141,7 +141,7 @@ public class SnapsyncMetricsManager {
lastNotifyTimestamp = now;
if (!isHeal) {
LOG.info(
"Snapsync in progress synced={}%, accounts={}, slots={}, codes={}, nodes={}",
"Worldstate download in progress synced={}%, accounts={}, slots={}, codes={}, nodes={}",
percentageDownloaded.get().setScale(2, RoundingMode.HALF_UP),
nbAccounts,
nbSlots,

@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener;
import org.hyperledger.besu.ethereum.eth.manager.ChainHeadEstimate;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloadStatus;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.data.SyncStatus;
@ -49,14 +50,18 @@ public class SyncState {
private Optional<WorldStateDownloadStatus> worldStateDownloadStatus = Optional.empty();
private Optional<Long> newPeerListenerId;
private Optional<Boolean> reachedTerminalDifficulty = Optional.empty();
private final Optional<Checkpoint> checkpoint;
private volatile boolean isInitialSyncPhaseDone;
public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
this(blockchain, ethPeers, false);
this(blockchain, ethPeers, false, Optional.empty());
}
public SyncState(
final Blockchain blockchain, final EthPeers ethPeers, final boolean hasInitialSyncPhase) {
final Blockchain blockchain,
final EthPeers ethPeers,
final boolean hasInitialSyncPhase,
final Optional<Checkpoint> checkpoint) {
this.blockchain = blockchain;
this.ethPeers = ethPeers;
isInitialSyncPhaseDone = !hasInitialSyncPhase;
@ -78,6 +83,7 @@ public class SyncState {
checkInSync();
}
}));
this.checkpoint = checkpoint;
}
/**
@ -290,6 +296,10 @@ public class SyncState {
(syncTracker) -> syncTracker.checkState(localChain, syncTargetChain, bestPeerChain));
}
public Optional<Checkpoint> getCheckpoint() {
return checkpoint;
}
public void markInitialSyncPhaseAsDone() {
isInitialSyncPhaseDone = true;
}

@ -26,6 +26,8 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeaders;
import org.hyperledger.besu.ethereum.eth.sync.range.SyncTargetRange;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
@ -50,7 +52,7 @@ public class DownloadHeadersStepTest {
private final EthPeer syncTarget = mock(EthPeer.class);
private EthProtocolManager ethProtocolManager;
private DownloadHeadersStep downloader;
private CheckpointRange checkpointRange;
private SyncTargetRange checkpointRange;
@BeforeClass
public static void setUpClass() {
@ -74,27 +76,27 @@ public class DownloadHeadersStepTest {
new NoOpMetricsSystem());
checkpointRange =
new CheckpointRange(
new SyncTargetRange(
syncTarget, blockchain.getBlockHeader(1).get(), blockchain.getBlockHeader(10).get());
}
@Test
public void shouldRetrieveHeadersForCheckpointRange() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CompletableFuture<CheckpointRangeHeaders> result = downloader.apply(checkpointRange);
final CompletableFuture<RangeHeaders> result = downloader.apply(checkpointRange);
peer.respond(blockchainResponder(blockchain));
// The start of the range should have been imported as part of the previous batch hence 2-10.
assertThat(result)
.isCompletedWithValue(new CheckpointRangeHeaders(checkpointRange, headersFromChain(2, 10)));
.isCompletedWithValue(new RangeHeaders(checkpointRange, headersFromChain(2, 10)));
}
@Test
public void shouldCancelRequestToPeerWhenReturnedFutureIsCancelled() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CompletableFuture<CheckpointRangeHeaders> result = this.downloader.apply(checkpointRange);
final CompletableFuture<RangeHeaders> result = this.downloader.apply(checkpointRange);
result.cancel(true);
@ -107,28 +109,28 @@ public class DownloadHeadersStepTest {
@Test
public void shouldReturnOnlyEndHeaderWhenCheckpointRangeHasLengthOfOne() {
final CheckpointRange checkpointRange =
new CheckpointRange(
final SyncTargetRange checkpointRange =
new SyncTargetRange(
syncTarget, blockchain.getBlockHeader(3).get(), blockchain.getBlockHeader(4).get());
final CompletableFuture<CheckpointRangeHeaders> result = this.downloader.apply(checkpointRange);
final CompletableFuture<RangeHeaders> result = this.downloader.apply(checkpointRange);
assertThat(result)
.isCompletedWithValue(new CheckpointRangeHeaders(checkpointRange, headersFromChain(4, 4)));
.isCompletedWithValue(new RangeHeaders(checkpointRange, headersFromChain(4, 4)));
}
@Test
public void shouldGetRemainingHeadersWhenRangeHasNoEnd() {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CheckpointRange checkpointRange =
new CheckpointRange(peer.getEthPeer(), blockchain.getBlockHeader(3).get());
final SyncTargetRange checkpointRange =
new SyncTargetRange(peer.getEthPeer(), blockchain.getBlockHeader(3).get());
final CompletableFuture<CheckpointRangeHeaders> result = this.downloader.apply(checkpointRange);
final CompletableFuture<RangeHeaders> result = this.downloader.apply(checkpointRange);
peer.respond(blockchainResponder(blockchain));
assertThat(result)
.isCompletedWithValue(new CheckpointRangeHeaders(checkpointRange, headersFromChain(4, 19)));
.isCompletedWithValue(new RangeHeaders(checkpointRange, headersFromChain(4, 19)));
}
private List<BlockHeader> headersFromChain(final long startNumber, final long endNumber) {

@ -93,15 +93,15 @@ public class PipelineChainDownloaderTest {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.findSyncTarget()).thenReturn(selectTargetFuture);
when(syncTargetManager.shouldContinueDownloading()).thenReturn(true);
expectPipelineCreation(syncTarget, downloadPipeline);
when(scheduler.startPipeline(downloadPipeline)).thenReturn(new CompletableFuture<>());
expectPipelineCreation(syncTarget, downloadPipeline, new CompletableFuture<>());
chainDownloader.start();
verifyNoInteractions(downloadPipelineFactory);
selectTargetFuture.complete(syncTarget);
verify(downloadPipelineFactory).createDownloadPipelineForSyncTarget(syncTarget);
verify(scheduler).startPipeline(downloadPipeline);
verify(downloadPipelineFactory)
.startPipeline(scheduler, syncState, syncTarget, downloadPipeline);
}
@Test
@ -109,8 +109,7 @@ public class PipelineChainDownloaderTest {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.findSyncTarget()).thenReturn(selectTargetFuture);
when(syncTargetManager.shouldContinueDownloading()).thenReturn(true);
expectPipelineCreation(syncTarget, downloadPipeline);
when(scheduler.startPipeline(downloadPipeline)).thenReturn(new CompletableFuture<>());
expectPipelineCreation(syncTarget, downloadPipeline, new CompletableFuture<>());
chainDownloader.start();
verifyNoInteractions(downloadPipelineFactory);
selectTargetFuture.complete(syncTarget);
@ -244,6 +243,8 @@ public class PipelineChainDownloaderTest {
final CompletableFuture<Void> result = chainDownloader.start();
verify(syncTargetManager).findSyncTarget();
verify(downloadPipelineFactory).createDownloadPipelineForSyncTarget(syncTarget);
verify(downloadPipelineFactory)
.startPipeline(scheduler, syncState, syncTarget, downloadPipeline);
chainDownloader.cancel();
// Pipeline is aborted immediately.
@ -267,11 +268,8 @@ public class PipelineChainDownloaderTest {
.thenReturn(completedFuture(syncTarget))
.thenReturn(completedFuture(syncTarget2));
expectPipelineCreation(syncTarget, downloadPipeline);
expectPipelineCreation(syncTarget2, downloadPipeline2);
when(scheduler.startPipeline(downloadPipeline)).thenReturn(pipelineFuture1);
when(scheduler.startPipeline(downloadPipeline2)).thenReturn(pipelineFuture2);
expectPipelineCreation(syncTarget, downloadPipeline, pipelineFuture1);
expectPipelineCreation(syncTarget2, downloadPipeline2, pipelineFuture2);
final CompletableFuture<Void> result = chainDownloader.start();
@ -337,15 +335,19 @@ public class PipelineChainDownloaderTest {
final SyncTarget syncTarget, final Pipeline<?> pipeline) {
final CompletableFuture<Void> pipelineFuture = new CompletableFuture<>();
when(syncTargetManager.findSyncTarget()).thenReturn(completedFuture(syncTarget));
expectPipelineCreation(syncTarget, pipeline);
when(scheduler.startPipeline(pipeline)).thenReturn(pipelineFuture);
expectPipelineCreation(syncTarget, pipeline, pipelineFuture);
return pipelineFuture;
}
@SuppressWarnings({"unchecked", "rawtypes"}) // Mockito really doesn't like Pipeline<?>
private void expectPipelineCreation(final SyncTarget syncTarget, final Pipeline<?> pipeline) {
private void expectPipelineCreation(
final SyncTarget syncTarget,
final Pipeline<?> pipeline,
final CompletableFuture<Void> completableFuture) {
when(downloadPipelineFactory.createDownloadPipelineForSyncTarget(syncTarget))
.thenReturn((Pipeline) pipeline);
when(downloadPipelineFactory.startPipeline(scheduler, syncState, syncTarget, pipeline))
.thenReturn(completableFuture);
}
private void assertExceptionallyCompletedWith(

@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer.Responder;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeadersFetcher;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
@ -46,7 +47,7 @@ import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class CheckpointHeaderFetcherTest {
public class RangeHeadersFetcherTest {
private static final int SEGMENT_SIZE = 5;
private static Blockchain blockchain;
@ -88,10 +89,10 @@ public class CheckpointHeaderFetcherTest {
@Test
public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {
final CheckpointHeaderFetcher checkpointHeaderFetcher = createCheckpointHeaderFetcher();
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher();
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));
rangeHeaderFetcher.getNextRangeHeaders(respondingPeer.getEthPeer(), header(1));
assertThat(result).isNotDone();
@ -102,11 +103,10 @@ public class CheckpointHeaderFetcherTest {
@Test
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsMultipleOfSegmentSize() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(header(11));
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(11));
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));
rangeHeaderFetcher.getNextRangeHeaders(respondingPeer.getEthPeer(), header(1));
respondingPeer.respond(responder);
@ -115,11 +115,10 @@ public class CheckpointHeaderFetcherTest {
@Test
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsNotAMultipleOfSegmentSize() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(header(15));
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15));
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));
rangeHeaderFetcher.getNextRangeHeaders(respondingPeer.getEthPeer(), header(1));
respondingPeer.respond(responder);
@ -127,71 +126,68 @@ public class CheckpointHeaderFetcherTest {
}
@Test
public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheCheckpointBeforeTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(header(15));
public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheRangeBeforeTarget() {
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15));
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(11));
rangeHeaderFetcher.getNextRangeHeaders(respondingPeer.getEthPeer(), header(11));
assertThat(result).isCompletedWithValue(singletonList(header(15)));
}
@Test
public void shouldReturnEmptyListWhenLastHeaderIsTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(header(15));
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15));
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(15));
rangeHeaderFetcher.getNextRangeHeaders(respondingPeer.getEthPeer(), header(15));
assertThat(result).isCompletedWithValue(emptyList());
}
@Test
public void shouldReturnEmptyListWhenLastHeaderIsAfterTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(header(15));
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher(header(15));
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(16));
rangeHeaderFetcher.getNextRangeHeaders(respondingPeer.getEthPeer(), header(16));
assertThat(result).isCompletedWithValue(emptyList());
}
@Test
public void nextCheckpointShouldEndAtChainHeadWhenNextCheckpointHeaderIsAfterHead() {
public void nextRangeShouldEndAtChainHeadWhenNextRangeHeaderIsAfterHead() {
final long remoteChainHeight = blockchain.getChainHeadBlockNumber();
final CheckpointHeaderFetcher checkpointHeaderFetcher = createCheckpointHeaderFetcher();
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher();
assertThat(
checkpointHeaderFetcher.nextCheckpointEndsAtChainHead(
rangeHeaderFetcher.nextRangeEndsAtChainHead(
respondingPeer.getEthPeer(), header(remoteChainHeight - SEGMENT_SIZE + 1)))
.isTrue();
}
@Test
public void nextCheckpointShouldNotEndAtChainHeadWhenAFinalCheckpointHeaderIsSpecified() {
public void nextRangeShouldNotEndAtChainHeadWhenAFinalRangeHeaderIsSpecified() {
final long remoteChainHeight = blockchain.getChainHeadBlockNumber();
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(header(remoteChainHeight));
final RangeHeadersFetcher rangeHeaderFetcher =
createRangeHeaderFetcher(header(remoteChainHeight));
assertThat(
checkpointHeaderFetcher.nextCheckpointEndsAtChainHead(
rangeHeaderFetcher.nextRangeEndsAtChainHead(
respondingPeer.getEthPeer(), header(remoteChainHeight - SEGMENT_SIZE + 1)))
.isFalse();
}
@Test
public void shouldReturnRemoteChainHeadWhenNextCheckpointHeaderIsTheRemoteHead() {
public void shouldReturnRemoteChainHeadWhenNextRangeHeaderIsTheRemoteHead() {
final long remoteChainHeight = blockchain.getChainHeadBlockNumber();
final CheckpointHeaderFetcher checkpointHeaderFetcher = createCheckpointHeaderFetcher();
final RangeHeadersFetcher rangeHeaderFetcher = createRangeHeaderFetcher();
assertThat(
checkpointHeaderFetcher.nextCheckpointEndsAtChainHead(
rangeHeaderFetcher.nextRangeEndsAtChainHead(
respondingPeer.getEthPeer(), header(remoteChainHeight - SEGMENT_SIZE)))
.isFalse();
final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(
rangeHeaderFetcher.getNextRangeHeaders(
respondingPeer.getEthPeer(), header(remoteChainHeight - SEGMENT_SIZE));
respondingPeer.respond(responder);
@ -199,9 +195,9 @@ public class CheckpointHeaderFetcherTest {
assertThat(result).isCompletedWithValue(singletonList(header(remoteChainHeight)));
}
private CheckpointHeaderFetcher createCheckpointHeaderFetcher() {
private RangeHeadersFetcher createRangeHeaderFetcher() {
final EthContext ethContext = ethProtocolManager.ethContext();
return new CheckpointHeaderFetcher(
return new RangeHeadersFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(SEGMENT_SIZE)
.downloaderHeadersRequestSize(3)
@ -211,9 +207,9 @@ public class CheckpointHeaderFetcherTest {
metricsSystem);
}
private CheckpointHeaderFetcher createCheckpointHeaderFetcher(final BlockHeader targetHeader) {
private RangeHeadersFetcher createRangeHeaderFetcher(final BlockHeader targetHeader) {
final EthContext ethContext = ethProtocolManager.ethContext();
return new CheckpointHeaderFetcher(
return new RangeHeadersFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(SEGMENT_SIZE)
.downloaderHeadersRequestSize(3)

@ -27,6 +27,9 @@ import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeaders;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeadersValidationStep;
import org.hyperledger.besu.ethereum.eth.sync.range.SyncTargetRange;
import org.hyperledger.besu.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@ -41,7 +44,7 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class CheckpointHeaderValidationStepTest {
public class RangeHeadersValidationStepTest {
@Mock private ProtocolSchedule protocolSchedule;
@Mock private ProtocolSpec protocolSpec;
@Mock private ProtocolContext protocolContext;
@ -49,15 +52,15 @@ public class CheckpointHeaderValidationStepTest {
@Mock private ValidationPolicy validationPolicy;
@Mock private EthPeer syncTarget;
private final BlockDataGenerator gen = new BlockDataGenerator();
private CheckpointHeaderValidationStep validationStep;
private RangeHeadersValidationStep validationStep;
private final BlockHeader checkpointStart = gen.header(10);
private final BlockHeader checkpointEnd = gen.header(13);
private final BlockHeader rangeStart = gen.header(10);
private final BlockHeader rangeEnd = gen.header(13);
private final BlockHeader firstHeader = gen.header(11);
private final CheckpointRangeHeaders rangeHeaders =
new CheckpointRangeHeaders(
new CheckpointRange(syncTarget, checkpointStart, checkpointEnd),
asList(firstHeader, gen.header(12), checkpointEnd));
private final RangeHeaders rangeHeaders =
new RangeHeaders(
new SyncTargetRange(syncTarget, rangeStart, rangeEnd),
asList(firstHeader, gen.header(12), rangeEnd));
@Before
public void setUp() {
@ -66,20 +69,18 @@ public class CheckpointHeaderValidationStepTest {
when(validationPolicy.getValidationModeForNextBlock()).thenReturn(DETACHED_ONLY);
validationStep =
new CheckpointHeaderValidationStep(protocolSchedule, protocolContext, validationPolicy);
new RangeHeadersValidationStep(protocolSchedule, protocolContext, validationPolicy);
}
@Test
public void shouldValidateFirstHeaderAgainstCheckpointStartHeader() {
when(headerValidator.validateHeader(
firstHeader, checkpointStart, protocolContext, DETACHED_ONLY))
public void shouldValidateFirstHeaderAgainstRangeStartHeader() {
when(headerValidator.validateHeader(firstHeader, rangeStart, protocolContext, DETACHED_ONLY))
.thenReturn(true);
final Stream<BlockHeader> result = validationStep.apply(rangeHeaders);
verify(protocolSchedule).getByBlockNumber(firstHeader.getNumber());
verify(validationPolicy).getValidationModeForNextBlock();
verify(headerValidator)
.validateHeader(firstHeader, checkpointStart, protocolContext, DETACHED_ONLY);
verify(headerValidator).validateHeader(firstHeader, rangeStart, protocolContext, DETACHED_ONLY);
verifyNoMoreInteractions(headerValidator, validationPolicy);
assertThat(result).containsExactlyElementsOf(rangeHeaders.getHeadersToImport());
@ -87,20 +88,19 @@ public class CheckpointHeaderValidationStepTest {
@Test
public void shouldThrowExceptionWhenValidationFails() {
when(headerValidator.validateHeader(
firstHeader, checkpointStart, protocolContext, DETACHED_ONLY))
when(headerValidator.validateHeader(firstHeader, rangeStart, protocolContext, DETACHED_ONLY))
.thenReturn(false);
assertThatThrownBy(() -> validationStep.apply(rangeHeaders))
.isInstanceOf(InvalidBlockException.class)
.hasMessageContaining(
"Invalid checkpoint headers. Headers downloaded between #"
+ checkpointStart.getNumber()
"Invalid range headers. Headers downloaded between #"
+ rangeStart.getNumber()
+ " ("
+ checkpointStart.getHash()
+ rangeStart.getHash()
+ ") and #"
+ checkpointEnd.getNumber()
+ rangeEnd.getNumber()
+ " ("
+ checkpointEnd.getHash()
+ rangeEnd.getHash()
+ ") do not connect at #"
+ firstHeader.getNumber()
+ " ("

@ -33,6 +33,9 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeadersFetcher;
import org.hyperledger.besu.ethereum.eth.sync.range.SyncTargetRange;
import org.hyperledger.besu.ethereum.eth.sync.range.SyncTargetRangeSource;
import java.time.Duration;
import java.util.List;
@ -42,20 +45,20 @@ import java.util.function.Supplier;
import org.junit.Test;
public class CheckpointRangeSourceTest {
public class SyncTargetRangeSourceTest {
private static final int CHECKPOINT_TIMEOUTS_PERMITTED = 3;
private static final Duration RETRY_DELAY_DURATION = Duration.ofSeconds(2);
private final EthPeer peer = mock(EthPeer.class);
private final CheckpointHeaderFetcher checkpointFetcher = mock(CheckpointHeaderFetcher.class);
private final CheckpointRangeSource.SyncTargetChecker syncTargetChecker =
mock(CheckpointRangeSource.SyncTargetChecker.class);
private final RangeHeadersFetcher rangeHeaders = mock(RangeHeadersFetcher.class);
private final SyncTargetRangeSource.SyncTargetChecker syncTargetChecker =
mock(SyncTargetRangeSource.SyncTargetChecker.class);
private final EthScheduler ethScheduler = mock(EthScheduler.class);
private final BlockHeader commonAncestor = header(10);
private final CheckpointRangeSource source =
new CheckpointRangeSource(
checkpointFetcher,
private final SyncTargetRangeSource source =
new SyncTargetRangeSource(
rangeHeaders,
syncTargetChecker,
ethScheduler,
peer,
@ -73,13 +76,13 @@ public class CheckpointRangeSourceTest {
@Test
public void shouldHaveNextWhenMoreCheckpointsAreLoadedRegardlessOfSyncTargetChecker() {
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(completedFuture(asList(header(15), header(20))));
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, commonAncestor))
.thenReturn(false);
source.next();
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(rangeHeaders).getNextRangeHeaders(peer, commonAncestor);
assertThat(source).hasNext();
}
@ -95,13 +98,13 @@ public class CheckpointRangeSourceTest {
@Test
public void shouldNotHaveNextWhenNoMoreCheckpointsAvailableAndRetryLimitReached() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(any(), any())).thenReturn(true);
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(CompletableFuture.failedFuture(new TimeoutException()));
for (int i = 1; i <= CHECKPOINT_TIMEOUTS_PERMITTED; i++) {
assertThat(source).hasNext();
assertThat(source.next()).isNull();
verify(checkpointFetcher, times(i)).getNextCheckpointHeaders(peer, commonAncestor);
verify(rangeHeaders, times(i)).getNextRangeHeaders(peer, commonAncestor);
}
// Too many timeouts, give up on this sync target.
@ -111,13 +114,13 @@ public class CheckpointRangeSourceTest {
@Test
public void shouldConsiderHeaderRequestFailedIfNoNewHeadersReturned() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(any(), any())).thenReturn(true);
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(completedFuture(emptyList()));
for (int i = 1; i <= CHECKPOINT_TIMEOUTS_PERMITTED; i++) {
assertThat(source).hasNext();
assertThat(source.next()).isNull();
verify(checkpointFetcher, times(i)).getNextCheckpointHeaders(peer, commonAncestor);
verify(rangeHeaders, times(i)).getNextRangeHeaders(peer, commonAncestor);
}
// Too many timeouts, give up on this sync target.
@ -128,11 +131,11 @@ public class CheckpointRangeSourceTest {
@SuppressWarnings("unchecked")
public void shouldDelayBeforeRetryingRequestForCheckpointHeadersAfterEmptyResponse() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(any(), any())).thenReturn(true);
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(completedFuture(emptyList()));
assertThat(source.next()).isNull();
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(rangeHeaders).getNextRangeHeaders(peer, commonAncestor);
verify(ethScheduler).scheduleFutureTask(any(Supplier.class), eq(RETRY_DELAY_DURATION));
}
@ -140,18 +143,18 @@ public class CheckpointRangeSourceTest {
@SuppressWarnings("unchecked")
public void shouldDelayBeforeRetryingRequestForCheckpointHeadersAfterFailure() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(any(), any())).thenReturn(true);
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("Nope")));
assertThat(source.next()).isNull();
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(rangeHeaders).getNextRangeHeaders(peer, commonAncestor);
verify(ethScheduler).scheduleFutureTask(any(Supplier.class), eq(RETRY_DELAY_DURATION));
}
@Test
public void shouldResetCheckpointFailureCountWhenMoreCheckpointsReceived() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(any(), any())).thenReturn(true);
when(checkpointFetcher.getNextCheckpointHeaders(any(), any()))
when(rangeHeaders.getNextRangeHeaders(any(), any()))
.thenReturn(CompletableFuture.failedFuture(new TimeoutException()))
.thenReturn(CompletableFuture.failedFuture(new TimeoutException()))
.thenReturn(completedFuture(singletonList(header(15))))
@ -169,39 +172,39 @@ public class CheckpointRangeSourceTest {
@Test
public void shouldRequestMoreHeadersWhenCurrentSetHasRunOut() {
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(completedFuture(asList(header(15), header(20))));
when(checkpointFetcher.getNextCheckpointHeaders(peer, header(20)))
when(rangeHeaders.getNextRangeHeaders(peer, header(20)))
.thenReturn(completedFuture(asList(header(25), header(30))));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor, header(15)));
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(checkpointFetcher).nextCheckpointEndsAtChainHead(peer, commonAncestor);
assertThat(source.next()).isEqualTo(new SyncTargetRange(peer, commonAncestor, header(15)));
verify(rangeHeaders).getNextRangeHeaders(peer, commonAncestor);
verify(rangeHeaders).nextRangeEndsAtChainHead(peer, commonAncestor);
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, header(15), header(20)));
verifyNoMoreInteractions(checkpointFetcher);
assertThat(source.next()).isEqualTo(new SyncTargetRange(peer, header(15), header(20)));
verifyNoMoreInteractions(rangeHeaders);
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, header(20), header(25)));
verify(checkpointFetcher).getNextCheckpointHeaders(peer, header(20));
verify(checkpointFetcher).nextCheckpointEndsAtChainHead(peer, header(20));
assertThat(source.next()).isEqualTo(new SyncTargetRange(peer, header(20), header(25)));
verify(rangeHeaders).getNextRangeHeaders(peer, header(20));
verify(rangeHeaders).nextRangeEndsAtChainHead(peer, header(20));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, header(25), header(30)));
verifyNoMoreInteractions(checkpointFetcher);
assertThat(source.next()).isEqualTo(new SyncTargetRange(peer, header(25), header(30)));
verifyNoMoreInteractions(rangeHeaders);
}
@Test
public void shouldReturnCheckpointsFromExistingBatch() {
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(completedFuture(asList(header(15), header(20))));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor, header(15)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, header(15), header(20)));
assertThat(source.next()).isEqualTo(new SyncTargetRange(peer, commonAncestor, header(15)));
assertThat(source.next()).isEqualTo(new SyncTargetRange(peer, header(15), header(20)));
}
@Test
public void shouldReturnNullIfNewHeadersNotAvailableInTime() {
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(new CompletableFuture<>());
assertThat(source.next()).isNull();
@ -209,52 +212,52 @@ public class CheckpointRangeSourceTest {
@Test
public void shouldNotRequestMoreHeadersIfOriginalRequestStillInProgress() {
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(new CompletableFuture<>());
assertThat(source.next()).isNull();
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(checkpointFetcher).nextCheckpointEndsAtChainHead(peer, commonAncestor);
verify(rangeHeaders).getNextRangeHeaders(peer, commonAncestor);
verify(rangeHeaders).nextRangeEndsAtChainHead(peer, commonAncestor);
assertThat(source.next()).isNull();
verifyNoMoreInteractions(checkpointFetcher);
verifyNoMoreInteractions(rangeHeaders);
}
@Test
public void shouldReturnCheckpointsOnceHeadersRequestCompletes() {
final CompletableFuture<List<BlockHeader>> future = new CompletableFuture<>();
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor)).thenReturn(future);
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor)).thenReturn(future);
assertThat(source.next()).isNull();
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(rangeHeaders).getNextRangeHeaders(peer, commonAncestor);
future.complete(asList(header(15), header(20)));
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor, header(15)));
assertThat(source.next()).isEqualTo(new SyncTargetRange(peer, commonAncestor, header(15)));
}
@Test
public void shouldSendNewRequestIfRequestForHeadersFails() {
when(checkpointFetcher.getNextCheckpointHeaders(peer, commonAncestor))
when(rangeHeaders.getNextRangeHeaders(peer, commonAncestor))
.thenReturn(CompletableFuture.failedFuture(new NoAvailablePeersException()))
.thenReturn(completedFuture(asList(header(15), header(20))));
// Returns null when the first request fails
assertThat(source.next()).isNull();
verify(checkpointFetcher).getNextCheckpointHeaders(peer, commonAncestor);
verify(rangeHeaders).getNextRangeHeaders(peer, commonAncestor);
// Then retries
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor, header(15)));
verify(checkpointFetcher, times(2)).getNextCheckpointHeaders(peer, commonAncestor);
assertThat(source.next()).isEqualTo(new SyncTargetRange(peer, commonAncestor, header(15)));
verify(rangeHeaders, times(2)).getNextRangeHeaders(peer, commonAncestor);
}
@Test
public void shouldReturnUnboundedCheckpointRangeWhenNextCheckpointEndsAtChainHead() {
when(syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, commonAncestor))
.thenReturn(true);
when(checkpointFetcher.nextCheckpointEndsAtChainHead(peer, commonAncestor)).thenReturn(true);
when(rangeHeaders.nextRangeEndsAtChainHead(peer, commonAncestor)).thenReturn(true);
assertThat(source).hasNext();
assertThat(source.next()).isEqualTo(new CheckpointRange(peer, commonAncestor));
assertThat(source.next()).isEqualTo(new SyncTargetRange(peer, commonAncestor));
// Once we've sent an open-ended range we shouldn't have any more ranges.
assertThat(source).isExhausted();

@ -0,0 +1,82 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.chain.DefaultBlockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStoragePrefixedKeyBlockchainStorage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
public class CheckPointBlockImportStepTest {
private final CheckpointSource checkPointSource = mock(CheckpointSource.class);
private final Checkpoint checkpoint = mock(Checkpoint.class);
private MutableBlockchain blockchain;
private CheckpointBlockImportStep checkPointHeaderImportStep;
private KeyValueStoragePrefixedKeyBlockchainStorage blockchainStorage;
@Before
public void setup() {
blockchainStorage =
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions());
blockchain =
DefaultBlockchain.createMutable(
generateBlock(0), blockchainStorage, mock(MetricsSystem.class), 0);
checkPointHeaderImportStep =
new CheckpointBlockImportStep(checkPointSource, checkpoint, blockchain);
}
@Test
public void shouldSaveNewHeader() {
when(checkPointSource.hasNext()).thenReturn(true);
assertThat(blockchainStorage.getBlockHash(1)).isEmpty();
final Block block = generateBlock(1);
checkPointHeaderImportStep.accept(Optional.of(new BlockWithReceipts(block, new ArrayList<>())));
assertThat(blockchainStorage.getBlockHash(1)).isPresent();
}
@Test
public void shouldSaveChainHeadForLastBlock() {
when(checkPointSource.hasNext()).thenReturn(false);
final Block block = generateBlock(2);
when(checkPointSource.getCheckpoint()).thenReturn(block.getHeader());
checkPointHeaderImportStep.accept(Optional.of(new BlockWithReceipts(block, new ArrayList<>())));
assertThat(blockchainStorage.getBlockHash(2)).isPresent();
}
private Block generateBlock(final int blockNumber) {
final BlockBody body = new BlockBody(Collections.emptyList(), Collections.emptyList());
return new Block(new BlockHeaderTestFixture().number(blockNumber).buildHeader(), body);
}
}

@ -0,0 +1,81 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
public class CheckPointSourceTest {
private final SyncState syncState = mock(SyncState.class);
private final EthPeer peer = mock(EthPeer.class);
private final BlockHeaderFunctions blockHeaderFunctions = mock(BlockHeaderFunctions.class);
private CheckpointSource checkPointSource;
@Before
public void setup() {
when(peer.getCheckpointHeader()).thenReturn(Optional.of(header(12)));
when(blockHeaderFunctions.getCheckPointWindowSize(any(BlockHeader.class))).thenReturn(1);
checkPointSource = new CheckpointSource(syncState, peer, blockHeaderFunctions);
}
@Test
public void shouldNotHasNextWhenChainHeightIsNotZero() {
when(syncState.getLocalChainHeight()).thenReturn(1L);
assertThat(checkPointSource.hasNext()).isFalse();
}
@Test
public void shouldHasNextWhenLocalChainIsZero() {
when(syncState.getLocalChainHeight()).thenReturn(0L);
assertThat(checkPointSource.hasNext()).isTrue();
}
@Test
public void shouldHasNextWhenMissingHeader() {
when(syncState.getLocalChainHeight()).thenReturn(0L);
checkPointSource.setLastHeaderDownloaded(Optional.of(header(12)));
assertThat(checkPointSource.hasNext()).isTrue();
}
@Test
public void shouldReturnCheckPointForFirstNext() {
assertThat(checkPointSource.next()).isEqualTo(checkPointSource.getCheckpoint().getHash());
}
@Test
public void shouldReturnParentHashForOtherNextCall() {
final BlockHeader header = header(12);
checkPointSource.setLastHeaderDownloaded(Optional.of(header));
assertThat(checkPointSource.next()).isEqualTo(header.getParentHash());
}
private BlockHeader header(final int number) {
return new BlockHeaderTestFixture().number(number).buildHeader();
}
}

@ -0,0 +1,197 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode.LIGHT_SKIP_DETACHED;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncValidationPolicy;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.ImmutableCheckpoint;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class CheckPointSyncChainDownloaderTest {
private final FastSyncValidationPolicy validationPolicy = mock(FastSyncValidationPolicy.class);
private final WorldStateStorage worldStateStorage = mock(WorldStateStorage.class);
protected ProtocolSchedule protocolSchedule;
protected EthProtocolManager ethProtocolManager;
protected EthContext ethContext;
protected ProtocolContext protocolContext;
private SyncState syncState;
protected MutableBlockchain localBlockchain;
private BlockchainSetupUtil otherBlockchainSetup;
protected Blockchain otherBlockchain;
private Checkpoint checkpoint;
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {{DataStorageFormat.BONSAI}, {DataStorageFormat.FOREST}});
}
private final DataStorageFormat storageFormat;
public CheckPointSyncChainDownloaderTest(final DataStorageFormat storageFormat) {
this.storageFormat = storageFormat;
}
@Before
public void setup() {
when(validationPolicy.getValidationModeForNextBlock()).thenReturn(LIGHT_SKIP_DETACHED);
when(worldStateStorage.isWorldStateAvailable(any(), any())).thenReturn(true);
final BlockchainSetupUtil localBlockchainSetup = BlockchainSetupUtil.forTesting(storageFormat);
localBlockchain = localBlockchainSetup.getBlockchain();
otherBlockchainSetup = BlockchainSetupUtil.forTesting(storageFormat);
otherBlockchain = otherBlockchainSetup.getBlockchain();
protocolSchedule = localBlockchainSetup.getProtocolSchedule();
protocolContext = localBlockchainSetup.getProtocolContext();
ethProtocolManager =
EthProtocolManagerTestUtil.create(
localBlockchain, new EthScheduler(1, 1, 1, 1, new NoOpMetricsSystem()));
ethContext = ethProtocolManager.ethContext();
final int blockNumber = 10;
checkpoint =
ImmutableCheckpoint.builder()
.blockNumber(blockNumber)
.blockHash(localBlockchainSetup.getBlocks().get(blockNumber).getHash())
.totalDifficulty(Difficulty.ONE)
.build();
syncState =
new SyncState(
protocolContext.getBlockchain(),
ethContext.getEthPeers(),
true,
Optional.of(checkpoint));
}
@After
public void tearDown() {
ethProtocolManager.stop();
}
private ChainDownloader downloader(
final SynchronizerConfiguration syncConfig, final long pivotBlockNumber) {
return CheckpointSyncChainDownloader.create(
syncConfig,
worldStateStorage,
protocolSchedule,
protocolContext,
ethContext,
syncState,
new NoOpMetricsSystem(),
new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()));
}
@Test
public void shouldSyncToPivotBlockInMultipleSegments() {
otherBlockchainSetup.importFirstBlocks(30);
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(otherBlockchain);
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build();
final long pivotBlockNumber = 25;
ethContext
.getEthPeers()
.streamAvailablePeers()
.forEach(
ethPeer -> {
ethPeer.setCheckpointHeader(
otherBlockchainSetup.getBlocks().get((int) checkpoint.blockNumber()).getHeader());
});
final ChainDownloader downloader = downloader(syncConfig, pivotBlockNumber);
final CompletableFuture<Void> result = downloader.start();
peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone());
assertThat(result).isCompleted();
assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber);
assertThat(localBlockchain.getChainHeadHeader())
.isEqualTo(otherBlockchain.getBlockHeader(pivotBlockNumber).get());
}
@Test
public void shouldSyncToPivotBlockInSingleSegment() {
otherBlockchainSetup.importFirstBlocks(30);
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(otherBlockchain);
final long pivotBlockNumber = 10;
final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder().build();
ethContext
.getEthPeers()
.streamAvailablePeers()
.forEach(
ethPeer -> {
ethPeer.setCheckpointHeader(
otherBlockchainSetup.getBlocks().get((int) checkpoint.blockNumber()).getHeader());
});
final ChainDownloader downloader = downloader(syncConfig, pivotBlockNumber);
final CompletableFuture<Void> result = downloader.start();
peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone());
assertThat(result).isCompleted();
assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber);
assertThat(localBlockchain.getChainHeadHeader())
.isEqualTo(otherBlockchain.getBlockHeader(pivotBlockNumber).get());
}
}

@ -447,7 +447,7 @@ public class FastSyncActionsTest {
protocolSchedule,
protocolContext,
ethContext,
new SyncState(blockchain, ethContext.getEthPeers(), true),
new SyncState(blockchain, ethContext.getEthPeers(), true, Optional.empty()),
pivotBlockSelector,
new NoOpMetricsSystem());
}

Loading…
Cancel
Save