Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask

pull/7638/head
Matilda-Clerke 2 months ago committed by GitHub
commit 84af9f9eb3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      CHANGELOG.md
  2. 4
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  3. 33
      besu/src/main/java/org/hyperledger/besu/cli/ConfigurationOverviewBuilder.java
  4. 26
      besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java
  5. 14
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  6. 10
      besu/src/test/java/org/hyperledger/besu/cli/ConfigurationOverviewBuilderTest.java
  7. 39
      besu/src/test/java/org/hyperledger/besu/controller/MergeBesuControllerBuilderTest.java
  8. 167
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromBlock.java
  9. 58
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromHeadBlock.java
  10. 136
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotSelectorFromSafeBlock.java
  11. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/SyncTargetManager.java
  12. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncTargetManager.java
  13. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncConfiguration.java
  14. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapSyncMetricsManager.java
  15. 52
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java

@ -18,7 +18,8 @@
- Add configuration of Consolidation Request Contract Address via genesis configuration [#7647](https://github.com/hyperledger/besu/pull/7647)
- Interrupt pending transaction processing on block creation timeout [#7673](https://github.com/hyperledger/besu/pull/7673)
- Align gas cap calculation for transaction simulation to Geth approach [#7703](https://github.com/hyperledger/besu/pull/7703)
- Expose chainId in the `BlockchainService` [#7702](https://github.com/hyperledger/besu/pull/7702)
- Expose chainId in the `BlockchainService` [7702](https://github.com/hyperledger/besu/pull/7702)
- Use head block instead of safe block for snap sync [7536](https://github.com/hyperledger/besu/issues/7536)
- Add support for `chainId` in `CallParameters` [#7720](https://github.com/hyperledger/besu/pull/7720)
### Bug fixes

@ -2701,7 +2701,8 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
builder
.setDataStorage(dataStorageOptions.normalizeDataStorageFormat())
.setSyncMode(syncMode.normalize());
.setSyncMode(syncMode.normalize())
.setSyncMinPeers(syncMinPeerCount);
if (jsonRpcConfiguration != null && jsonRpcConfiguration.isEnabled()) {
builder
@ -2738,6 +2739,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
builder.setSnapServerEnabled(this.unstableSynchronizerOptions.isSnapsyncServerEnabled());
builder.setSnapSyncBftEnabled(this.unstableSynchronizerOptions.isSnapSyncBftEnabled());
builder.setSnapSyncToHeadEnabled(this.unstableSynchronizerOptions.isSnapSyncToHeadEnabled());
builder.setTxPoolImplementation(buildTransactionPoolConfiguration().getTxPoolImplementation());
builder.setWorldStateUpdateMode(unstableEvmOptions.toDomainObject().worldUpdaterMode());

@ -46,6 +46,7 @@ public class ConfigurationOverviewBuilder {
private String customGenesisFileName;
private String dataStorage;
private String syncMode;
private Integer syncMinPeers;
private Integer rpcPort;
private Collection<String> rpcHttpApis;
private Integer enginePort;
@ -57,6 +58,7 @@ public class ConfigurationOverviewBuilder {
private Integer trieLogsPruningWindowSize = null;
private boolean isSnapServerEnabled = false;
private boolean isSnapSyncBftEnabled = false;
private boolean isSnapSyncToHeadEnabled = true;
private TransactionPoolConfiguration.Implementation txPoolImplementation;
private EvmConfiguration.WorldUpdaterMode worldStateUpdateMode;
private Map<String, String> environment;
@ -148,6 +150,17 @@ public class ConfigurationOverviewBuilder {
return this;
}
/**
* Sets sync min peers.
*
* @param syncMinPeers number of min peers for sync
* @return the builder
*/
public ConfigurationOverviewBuilder setSyncMinPeers(final int syncMinPeers) {
this.syncMinPeers = syncMinPeers;
return this;
}
/**
* Sets rpc port.
*
@ -245,6 +258,18 @@ public class ConfigurationOverviewBuilder {
return this;
}
/**
* Sets snap sync to head enabled/disabled
*
* @param snapSyncToHeadEnabled bool to indicate if snap sync to head is enabled
* @return the builder
*/
public ConfigurationOverviewBuilder setSnapSyncToHeadEnabled(
final boolean snapSyncToHeadEnabled) {
isSnapSyncToHeadEnabled = snapSyncToHeadEnabled;
return this;
}
/**
* Sets trie logs pruning window size
*
@ -340,6 +365,10 @@ public class ConfigurationOverviewBuilder {
lines.add("Sync mode: " + syncMode);
}
if (syncMinPeers != null) {
lines.add("Sync min peers: " + syncMinPeers);
}
if (rpcHttpApis != null) {
lines.add("RPC HTTP APIs: " + String.join(",", rpcHttpApis));
}
@ -373,6 +402,10 @@ public class ConfigurationOverviewBuilder {
lines.add("Experimental Snap Sync for BFT enabled");
}
if (isSnapSyncToHeadEnabled) {
lines.add("Snap Sync to Head enabled");
}
if (isBonsaiLimitTrieLogsEnabled) {
final StringBuilder trieLogPruningString = new StringBuilder();
trieLogPruningString

@ -87,6 +87,8 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
private static final String SNAP_SYNC_BFT_ENABLED_FLAG = "--Xsnapsync-bft-enabled";
private static final String SNAP_SYNC_TO_HEAD_ENABLED_FLAG = "--Xsnapsync-to-head-enabled";
/**
* Parse block propagation range.
*
@ -314,6 +316,15 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
description = "Snap sync enabled for BFT chains (default: ${DEFAULT-VALUE})")
private Boolean snapsyncBftEnabled = SnapSyncConfiguration.DEFAULT_SNAP_SYNC_BFT_ENABLED;
@CommandLine.Option(
names = SNAP_SYNC_TO_HEAD_ENABLED_FLAG,
hidden = true,
paramLabel = "<Boolean>",
arity = "0..1",
description = "Snap sync to head enabled (default: ${DEFAULT-VALUE})")
private Boolean snapsyncToHeadEnabled =
SnapSyncConfiguration.DEFAULT_SNAP_SYNC_TO_HEAD_ENABLED_FLAG;
@CommandLine.Option(
names = {"--Xpeertask-system-enabled"},
hidden = true,
@ -341,6 +352,15 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
return snapsyncBftEnabled;
}
/**
* Flag to know if Snap sync should sync to head instead of safe block
*
* @return true if snap sync should sync to head
*/
public boolean isSnapSyncToHeadEnabled() {
return snapsyncToHeadEnabled;
}
/**
* Flag to indicate whether the peer task system should be used where available
*
@ -401,6 +421,7 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
options.checkpointPostMergeSyncEnabled = config.isCheckpointPostMergeEnabled();
options.snapsyncServerEnabled = config.getSnapSyncConfiguration().isSnapServerEnabled();
options.snapsyncBftEnabled = config.getSnapSyncConfiguration().isSnapSyncBftEnabled();
options.snapsyncToHeadEnabled = config.getSnapSyncConfiguration().isSnapSyncToHeadEnabled();
return options;
}
@ -434,6 +455,7 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
.localFlatStorageCountToHealPerRequest(snapsyncFlatStorageHealedCountPerRequest)
.isSnapServerEnabled(snapsyncServerEnabled)
.isSnapSyncBftEnabled(snapsyncBftEnabled)
.isSnapSyncToHeadEnabled(snapsyncToHeadEnabled)
.build());
builder.checkpointPostMergeEnabled(checkpointPostMergeSyncEnabled);
builder.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled);
@ -493,7 +515,9 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
SNAP_SERVER_ENABLED_FLAG,
OptionParser.format(snapsyncServerEnabled),
SNAP_SYNC_BFT_ENABLED_FLAG,
OptionParser.format(snapsyncBftEnabled));
OptionParser.format(snapsyncBftEnabled),
SNAP_SYNC_TO_HEAD_ENABLED_FLAG,
OptionParser.format(snapsyncToHeadEnabled));
return value;
}
}

@ -67,6 +67,7 @@ import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromHeadBlock;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromPeers;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromSafeBlock;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
@ -898,6 +899,18 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
LOG.info("Initial sync done, unsubscribe forkchoice supplier");
};
if (syncConfig.getSnapSyncConfiguration().isSnapSyncToHeadEnabled()) {
LOG.info("Using head block for sync.");
return new PivotSelectorFromHeadBlock(
protocolContext,
protocolSchedule,
ethContext,
metricsSystem,
genesisConfigOptions,
unverifiedForkchoiceSupplier,
unsubscribeForkchoiceListener);
} else {
LOG.info("Using safe block for sync.");
return new PivotSelectorFromSafeBlock(
protocolContext,
protocolSchedule,
@ -906,6 +919,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
genesisConfigOptions,
unverifiedForkchoiceSupplier,
unsubscribeForkchoiceListener);
}
} else {
LOG.info("TTD difficulty is not present, creating initial sync phase for PoW");
return new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem);

@ -95,6 +95,16 @@ class ConfigurationOverviewBuilderTest {
assertThat(syncModeSet).contains("Sync mode: fast");
}
@Test
void setSyncMinPeers() {
final String noSyncMinPeersSet = builder.build();
assertThat(noSyncMinPeersSet).doesNotContain("Sync min peers:");
builder.setSyncMinPeers(3);
final String syncMinPeersSet = builder.build();
assertThat(syncMinPeersSet).contains("Sync min peers: 3");
}
@Test
void setRpcPort() {
final String noRpcPortSet = builder.build();

@ -45,6 +45,8 @@ import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromHeadBlock;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromSafeBlock;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.feemarket.BaseFeeMarket;
@ -74,12 +76,15 @@ import com.google.common.collect.Range;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
@ -90,7 +95,10 @@ public class MergeBesuControllerBuilderTest {
@Mock GenesisConfigFile genesisConfigFile;
@Mock GenesisConfigOptions genesisConfigOptions;
@Mock SynchronizerConfiguration synchronizerConfiguration;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
SynchronizerConfiguration synchronizerConfiguration;
@Mock EthProtocolConfiguration ethProtocolConfiguration;
@Mock CheckpointConfigOptions checkpointConfigOptions;
@ -146,6 +154,9 @@ public class MergeBesuControllerBuilderTest {
lenient().when(synchronizerConfiguration.getDownloaderParallelism()).thenReturn(1);
lenient().when(synchronizerConfiguration.getTransactionsParallelism()).thenReturn(1);
lenient().when(synchronizerConfiguration.getComputationParallelism()).thenReturn(1);
lenient()
.when(synchronizerConfiguration.getSnapSyncConfiguration().isSnapSyncToHeadEnabled())
.thenReturn(false);
lenient()
.when(synchronizerConfiguration.getBlockPropagationRange())
@ -291,6 +302,32 @@ public class MergeBesuControllerBuilderTest {
assertThat(mergeContext.getFinalized().get()).isEqualTo(finalizedHeader);
}
@Test
public void assertPivotSelectorFromSafeBlockIsCreated() {
MockedConstruction<PivotSelectorFromSafeBlock> mocked =
Mockito.mockConstruction(PivotSelectorFromSafeBlock.class);
lenient()
.when(synchronizerConfiguration.getSnapSyncConfiguration().isSnapSyncToHeadEnabled())
.thenReturn(false);
visitWithMockConfigs(new MergeBesuControllerBuilder()).build();
Assertions.assertEquals(1, mocked.constructed().size());
}
@Test
public void assertPivotSelectorFromHeadBlockIsCreated() {
MockedConstruction<PivotSelectorFromHeadBlock> mocked =
Mockito.mockConstruction(PivotSelectorFromHeadBlock.class);
lenient()
.when(synchronizerConfiguration.getSnapSyncConfiguration().isSnapSyncToHeadEnabled())
.thenReturn(true);
visitWithMockConfigs(new MergeBesuControllerBuilder()).build();
Assertions.assertEquals(1, mocked.constructed().size());
}
private BlockHeader finalizedBlockHeader() {
final long blockNumber = 42;
final Hash magicHash = Hash.wrap(Bytes32.leftPad(Bytes.ofUnsignedInt(42)));

@ -0,0 +1,167 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.consensus.merge.ForkchoiceEvent;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class PivotSelectorFromBlock implements PivotBlockSelector {
private static final Logger LOG = LoggerFactory.getLogger(PivotSelectorFromBlock.class);
private final ProtocolContext protocolContext;
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final GenesisConfigOptions genesisConfig;
private final Supplier<Optional<ForkchoiceEvent>> forkchoiceStateSupplier;
private final Runnable cleanupAction;
private long lastNoFcuReceivedInfoLog = System.currentTimeMillis();
private static final long NO_FCU_RECEIVED_LOGGING_THRESHOLD = 60000L;
private volatile Optional<BlockHeader> maybeCachedHeadBlockHeader = Optional.empty();
public PivotSelectorFromBlock(
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final GenesisConfigOptions genesisConfig,
final Supplier<Optional<ForkchoiceEvent>> forkchoiceStateSupplier,
final Runnable cleanupAction) {
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.genesisConfig = genesisConfig;
this.forkchoiceStateSupplier = forkchoiceStateSupplier;
this.cleanupAction = cleanupAction;
}
@Override
public CompletableFuture<Void> prepareRetry() {
// nothing to do
return CompletableFuture.completedFuture(null);
}
@Override
public void close() {
cleanupAction.run();
}
@Override
public long getMinRequiredBlockNumber() {
return genesisConfig.getTerminalBlockNumber().orElse(0L);
}
@Override
public long getBestChainHeight() {
final long localChainHeight = protocolContext.getBlockchain().getChainHeadBlockNumber();
return Math.max(
forkchoiceStateSupplier
.get()
.map(ForkchoiceEvent::getHeadBlockHash)
.map(
headBlockHash ->
maybeCachedHeadBlockHeader
.filter(
cachedBlockHeader -> cachedBlockHeader.getHash().equals(headBlockHash))
.map(BlockHeader::getNumber)
.orElseGet(
() -> {
LOG.debug(
"Downloading chain head block header by hash {}", headBlockHash);
try {
return waitForPeers(1)
.thenCompose(unused -> downloadBlockHeader(headBlockHash))
.thenApply(
blockHeader -> {
maybeCachedHeadBlockHeader = Optional.of(blockHeader);
return blockHeader.getNumber();
})
.get(20, TimeUnit.SECONDS);
} catch (Throwable t) {
LOG.debug(
"Error trying to download chain head block header by hash {}",
headBlockHash,
t);
}
return null;
}))
.orElse(0L),
localChainHeight);
}
@Override
public Optional<FastSyncState> selectNewPivotBlock() {
final Optional<ForkchoiceEvent> maybeForkchoice = forkchoiceStateSupplier.get();
if (maybeForkchoice.isPresent()) {
Optional<Hash> pivotHash = getPivotHash(maybeForkchoice.get());
if (pivotHash.isPresent()) {
LOG.info("Selecting new pivot block: {}", pivotHash);
return Optional.of(new FastSyncState(pivotHash.get()));
}
}
if (lastNoFcuReceivedInfoLog + NO_FCU_RECEIVED_LOGGING_THRESHOLD < System.currentTimeMillis()) {
lastNoFcuReceivedInfoLog = System.currentTimeMillis();
LOG.info(
"Waiting for consensus client, this may be because your consensus client is still syncing");
}
LOG.debug("No finalized block hash announced yet");
return Optional.empty();
}
private CompletableFuture<BlockHeader> downloadBlockHeader(final Hash hash) {
return RetryingGetHeaderFromPeerByHashTask.byHash(
protocolSchedule, ethContext, hash, 0, metricsSystem)
.getHeader()
.whenComplete(
(blockHeader, throwable) -> {
if (throwable != null) {
LOG.debug("Error downloading block header by hash {}", hash);
} else {
LOG.atDebug()
.setMessage("Successfully downloaded pivot block header by hash {}")
.addArgument(blockHeader::toLogString)
.log();
}
});
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
protected abstract Optional<Hash> getPivotHash(final ForkchoiceEvent forkchoiceEvent);
}

@ -0,0 +1,58 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.consensus.merge.ForkchoiceEvent;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PivotSelectorFromHeadBlock extends PivotSelectorFromBlock {
private static final Logger LOG = LoggerFactory.getLogger(PivotSelectorFromHeadBlock.class);
public PivotSelectorFromHeadBlock(
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final GenesisConfigOptions genesisConfig,
final Supplier<Optional<ForkchoiceEvent>> forkchoiceStateSupplier,
final Runnable cleanupAction) {
super(
protocolContext,
protocolSchedule,
ethContext,
metricsSystem,
genesisConfig,
forkchoiceStateSupplier,
cleanupAction);
}
@Override
protected Optional<Hash> getPivotHash(final ForkchoiceEvent forkchoiceEvent) {
Hash hash = forkchoiceEvent.getHeadBlockHash();
LOG.info("Returning head block hash {} as pivot", hash);
return Optional.of(hash);
}
}

@ -18,36 +18,18 @@ import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.consensus.merge.ForkchoiceEvent;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PivotSelectorFromSafeBlock implements PivotBlockSelector {
public class PivotSelectorFromSafeBlock extends PivotSelectorFromBlock {
private static final Logger LOG = LoggerFactory.getLogger(PivotSelectorFromSafeBlock.class);
private final ProtocolContext protocolContext;
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final GenesisConfigOptions genesisConfig;
private final Supplier<Optional<ForkchoiceEvent>> forkchoiceStateSupplier;
private final Runnable cleanupAction;
private long lastNoFcuReceivedInfoLog = System.currentTimeMillis();
private static final long NO_FCU_RECEIVED_LOGGING_THRESHOLD = 60000L;
private volatile Optional<BlockHeader> maybeCachedHeadBlockHeader = Optional.empty();
public PivotSelectorFromSafeBlock(
final ProtocolContext protocolContext,
@ -57,111 +39,25 @@ public class PivotSelectorFromSafeBlock implements PivotBlockSelector {
final GenesisConfigOptions genesisConfig,
final Supplier<Optional<ForkchoiceEvent>> forkchoiceStateSupplier,
final Runnable cleanupAction) {
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.genesisConfig = genesisConfig;
this.forkchoiceStateSupplier = forkchoiceStateSupplier;
this.cleanupAction = cleanupAction;
}
@Override
public Optional<FastSyncState> selectNewPivotBlock() {
final Optional<ForkchoiceEvent> maybeForkchoice = forkchoiceStateSupplier.get();
if (maybeForkchoice.isPresent() && maybeForkchoice.get().hasValidSafeBlockHash()) {
return Optional.of(selectLastSafeBlockAsPivot(maybeForkchoice.get().getSafeBlockHash()));
}
if (lastNoFcuReceivedInfoLog + NO_FCU_RECEIVED_LOGGING_THRESHOLD < System.currentTimeMillis()) {
lastNoFcuReceivedInfoLog = System.currentTimeMillis();
LOG.info(
"Waiting for consensus client, this may be because your consensus client is still syncing");
}
LOG.debug("No finalized block hash announced yet");
return Optional.empty();
super(
protocolContext,
protocolSchedule,
ethContext,
metricsSystem,
genesisConfig,
forkchoiceStateSupplier,
cleanupAction);
}
@Override
public CompletableFuture<Void> prepareRetry() {
// nothing to do
return CompletableFuture.completedFuture(null);
}
private FastSyncState selectLastSafeBlockAsPivot(final Hash safeHash) {
LOG.debug("Returning safe block hash {} as pivot", safeHash);
return new FastSyncState(safeHash);
}
@Override
public void close() {
cleanupAction.run();
}
@Override
public long getMinRequiredBlockNumber() {
return genesisConfig.getTerminalBlockNumber().orElse(0L);
}
@Override
public long getBestChainHeight() {
final long localChainHeight = protocolContext.getBlockchain().getChainHeadBlockNumber();
return Math.max(
forkchoiceStateSupplier
.get()
.map(ForkchoiceEvent::getHeadBlockHash)
.map(
headBlockHash ->
maybeCachedHeadBlockHeader
.filter(
cachedBlockHeader -> cachedBlockHeader.getHash().equals(headBlockHash))
.map(BlockHeader::getNumber)
.orElseGet(
() -> {
LOG.debug(
"Downloading chain head block header by hash {}", headBlockHash);
try {
return waitForPeers(1)
.thenCompose(unused -> downloadBlockHeader(headBlockHash))
.thenApply(
blockHeader -> {
maybeCachedHeadBlockHeader = Optional.of(blockHeader);
return blockHeader.getNumber();
})
.get(20, TimeUnit.SECONDS);
} catch (Throwable t) {
LOG.debug(
"Error trying to download chain head block header by hash {}",
headBlockHash,
t);
}
return null;
}))
.orElse(0L),
localChainHeight);
}
private CompletableFuture<BlockHeader> downloadBlockHeader(final Hash hash) {
return RetryingGetHeaderFromPeerByHashTask.byHash(
protocolSchedule, ethContext, hash, 0, metricsSystem)
.getHeader()
.whenComplete(
(blockHeader, throwable) -> {
if (throwable != null) {
LOG.debug("Error downloading block header by hash {}", hash);
protected Optional<Hash> getPivotHash(final ForkchoiceEvent forkchoiceEvent) {
if (forkchoiceEvent.hasValidSafeBlockHash()) {
Hash hash = forkchoiceEvent.getSafeBlockHash();
LOG.debug("Returning safe block hash {} as pivot.", hash);
return Optional.of(hash);
} else {
LOG.atDebug()
.setMessage("Successfully downloaded pivot block header by hash {}")
.addArgument(blockHeader::toLogString)
.log();
}
});
LOG.debug("No safe block hash found.");
return Optional.empty();
}
private CompletableFuture<Void> waitForPeers(final int count) {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(ethContext, count, metricsSystem);
return waitForPeersTask.run();
}
}

@ -47,6 +47,7 @@ public class SyncTargetManager extends AbstractSyncTargetManager {
private static final int LOG_INFO_REPEAT_DELAY = 120;
private static final int SECONDS_PER_REQUEST = 6; // 5s per request + 1s wait between retries
private final SynchronizerConfiguration config;
private final WorldStateStorageCoordinator worldStateStorageCoordinator;
private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext;
@ -65,6 +66,7 @@ public class SyncTargetManager extends AbstractSyncTargetManager {
final MetricsSystem metricsSystem,
final FastSyncState fastSyncState) {
super(config, protocolSchedule, protocolContext, ethContext, metricsSystem);
this.config = config;
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
@ -82,15 +84,17 @@ public class SyncTargetManager extends AbstractSyncTargetManager {
throttledLog(
LOG::debug,
String.format(
"Unable to find sync target. Currently checking %d peers for usefulness. Pivot block: %d",
ethContext.getEthPeers().peerCount(), pivotBlockHeader.getNumber()),
"Unable to find sync target. Waiting for %d peers minimum. Currently checking %d peers for usefulness. Pivot block: %d",
config.getSyncMinimumPeerCount(),
ethContext.getEthPeers().peerCount(),
pivotBlockHeader.getNumber()),
logDebug,
LOG_DEBUG_REPEAT_DELAY);
throttledLog(
LOG::info,
String.format(
"Unable to find sync target. Currently checking %d peers for usefulness.",
ethContext.getEthPeers().peerCount()),
"Unable to find sync target. Waiting for %d peers minimum. Currently checking %d peers for usefulness.",
config.getSyncMinimumPeerCount(), ethContext.getEthPeers().peerCount()),
logInfo,
LOG_INFO_REPEAT_DELAY);
return completedFuture(Optional.empty());

@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
class FullSyncTargetManager extends AbstractSyncTargetManager {
private static final Logger LOG = LoggerFactory.getLogger(FullSyncTargetManager.class);
private final SynchronizerConfiguration config;
private final ProtocolContext protocolContext;
private final EthContext ethContext;
private final SyncTerminationCondition terminationCondition;
@ -49,6 +50,7 @@ class FullSyncTargetManager extends AbstractSyncTargetManager {
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition) {
super(config, protocolSchedule, protocolContext, ethContext, metricsSystem);
this.config = config;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.terminationCondition = terminationCondition;
@ -77,7 +79,8 @@ class FullSyncTargetManager extends AbstractSyncTargetManager {
final Optional<EthPeer> maybeBestPeer = ethContext.getEthPeers().bestPeerWithHeightEstimate();
if (!maybeBestPeer.isPresent()) {
LOG.info(
"Unable to find sync target. Currently checking {} peers for usefulness",
"Unable to find sync target. Waiting for {} peers minimum. Currently checking {} peers for usefulness",
config.getSyncMinimumPeerCount(),
ethContext.getEthPeers().peerCount());
return completedFuture(Optional.empty());
} else {

@ -40,6 +40,8 @@ public class SnapSyncConfiguration {
public static final Boolean DEFAULT_SNAP_SYNC_BFT_ENABLED = Boolean.FALSE;
public static final Boolean DEFAULT_SNAP_SYNC_TO_HEAD_ENABLED_FLAG = Boolean.TRUE;
public static SnapSyncConfiguration getDefault() {
return ImmutableSnapSyncConfiguration.builder().build();
}
@ -88,4 +90,9 @@ public class SnapSyncConfiguration {
public Boolean isSnapSyncBftEnabled() {
return DEFAULT_SNAP_SYNC_BFT_ENABLED;
}
@Value.Default
public Boolean isSnapSyncToHeadEnabled() {
return DEFAULT_SNAP_SYNC_TO_HEAD_ENABLED_FLAG;
}
}

@ -218,11 +218,13 @@ public class SnapSyncMetricsManager {
public void notifySnapSyncCompleted() {
final Duration duration = Duration.ofMillis(System.currentTimeMillis() - startSyncTime);
final long hours = (duration.toDaysPart() * 24) + duration.toHoursPart();
LOG.info(
"Finished worldstate snapsync with nodes {} (healed={}) duration {}{}:{},{}.",
nbTrieNodesGenerated.addAndGet(nbTrieNodesHealed.get()),
nbTrieNodesHealed,
duration.toHoursPart() > 0 ? (duration.toHoursPart() + ":") : "",
hours > 0 ? (hours + ":") : "",
duration.toMinutesPart(),
duration.toSecondsPart(),
duration.toMillisPart());

@ -39,6 +39,7 @@ import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.ImmutableSnapSyncConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
@ -488,7 +489,7 @@ public class FastSyncActionsTest {
@ParameterizedTest
@ArgumentsSource(FastSyncActionsTest.FastSyncActionsTestArguments.class)
public void downloadPivotBlockHeaderShouldRetrievePivotBlockHash(
public void downloadPivotBlockHeaderShouldRetrieveSafePivotBlockHash(
final DataStorageFormat storageFormat) {
setUp(storageFormat);
syncConfig = SynchronizerConfiguration.builder().syncMinimumPeerCount(1).build();
@ -526,6 +527,55 @@ public class FastSyncActionsTest {
assertThat(result).isCompletedWithValue(new FastSyncState(blockchain.getBlockHeader(3).get()));
}
@ParameterizedTest
@ArgumentsSource(FastSyncActionsTest.FastSyncActionsTestArguments.class)
public void downloadPivotBlockHeaderShouldRetrieveHeadPivotBlockHash(
final DataStorageFormat storageFormat) {
setUp(storageFormat);
syncConfig =
SynchronizerConfiguration.builder()
.syncMinimumPeerCount(1)
.snapSyncConfiguration(
ImmutableSnapSyncConfiguration.builder().isSnapSyncToHeadEnabled(true).build())
.build();
GenesisConfigOptions genesisConfig = mock(GenesisConfigOptions.class);
when(genesisConfig.getTerminalBlockNumber()).thenReturn(OptionalLong.of(10L));
final Optional<ForkchoiceEvent> finalizedEvent =
Optional.of(
new ForkchoiceEvent(
blockchain.getChainHeadHash(),
blockchain.getBlockHashByNumber(3L).get(),
blockchain.getBlockHashByNumber(2L).get()));
fastSyncActions =
createFastSyncActions(
syncConfig,
new PivotSelectorFromHeadBlock(
blockchainSetupUtil.getProtocolContext(),
blockchainSetupUtil.getProtocolSchedule(),
ethContext,
metricsSystem,
genesisConfig,
() -> finalizedEvent,
() -> {}));
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1001);
final CompletableFuture<FastSyncState> result =
fastSyncActions.downloadPivotBlockHeader(
new FastSyncState(finalizedEvent.get().getHeadBlockHash()));
assertThat(result).isNotCompleted();
final RespondingEthPeer.Responder responder = RespondingEthPeer.blockchainResponder(blockchain);
peer.respond(responder);
assertThat(result)
.isCompletedWithValue(
new FastSyncState(
blockchain.getBlockHeader(blockchain.getChainHeadBlockNumber()).get()));
}
private FastSyncActions createFastSyncActions(
final SynchronizerConfiguration syncConfig, final PivotBlockSelector pivotBlockSelector) {
final ProtocolSchedule protocolSchedule = blockchainSetupUtil.getProtocolSchedule();

Loading…
Cancel
Save