7311: Add protocol spec supplier to GetReceiptsFromPeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7638/head
Matilda Clerke 2 months ago
parent 4ad85e8def
commit 86a1f0bf51
  1. 3
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 4
      besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java
  3. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  4. 11
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java
  5. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  6. 8
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java
  7. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java
  8. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java
  9. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java
  10. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java
  11. 8
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java
  12. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java
  13. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java
  14. 8
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  15. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java
  16. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java
  17. 19
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java
  18. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java
  19. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java
  20. 5
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java
  21. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java
  22. 3
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java

@ -705,6 +705,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final DefaultSynchronizer synchronizer =
createSynchronizer(
protocolSchedule,
currentProtocolSpecSupplier,
worldStateStorageCoordinator,
protocolContext,
ethContext,
@ -839,6 +840,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
*/
protected DefaultSynchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
@ -850,6 +852,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
return new DefaultSynchronizer(
syncConfig,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
worldStateStorageCoordinator,
ethProtocolManager.getBlockBroadcaster(),

@ -51,6 +51,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
@ -66,6 +67,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -223,6 +225,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
@Override
protected DefaultSynchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
@ -234,6 +237,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
DefaultSynchronizer sync =
super.createSynchronizer(
protocolSchedule,
currentProtocolSpecSupplier,
worldStateStorageCoordinator,
protocolContext,
ethContext,

@ -470,7 +470,10 @@ public class EthPeers implements PeerSelector {
// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeer(final Predicate<EthPeer> filter) {
return streamBestPeers().filter(filter).filter(EthPeer::hasAvailableRequestCapacity).findFirst();
return streamBestPeers()
.filter(filter)
.filter(EthPeer::hasAvailableRequestCapacity)
.findFirst();
}
// Part of the PeerSelector interface, to be split apart later

@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRetryBehavior;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
@ -34,19 +35,24 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.function.Supplier;
public class GetReceiptsFromPeerTask
implements PeerTask<Map<BlockHeader, List<TransactionReceipt>>> {
private final Collection<BlockHeader> blockHeaders;
private final BodyValidator bodyValidator;
private final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
private final Map<Hash, List<BlockHeader>> headersByReceiptsRoot = new HashMap<>();
private final long requiredBlockchainHeight;
public GetReceiptsFromPeerTask(
final Collection<BlockHeader> blockHeaders, final BodyValidator bodyValidator) {
final Collection<BlockHeader> blockHeaders,
final BodyValidator bodyValidator,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier) {
this.blockHeaders = blockHeaders;
this.bodyValidator = bodyValidator;
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;
blockHeaders.forEach(
header ->
@ -112,6 +118,7 @@ public class GetReceiptsFromPeerTask
public Predicate<EthPeer> getPeerRequirementFilter() {
return (ethPeer) ->
ethPeer.getProtocolName().equals(getSubProtocol().getName())
&& ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight;
&& (currentProtocolSpecSupplier.get().isPoS()
|| ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight);
}
}

@ -35,6 +35,7 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.context.SnapSyncStatePers
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.BonsaiWorldStateProvider;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
@ -79,6 +80,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final BlockBroadcaster blockBroadcaster,
@ -146,6 +148,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
syncConfig,
dataDirectory,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
metricsSystem,
ethContext,
@ -163,6 +166,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
syncConfig,
dataDirectory,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
metricsSystem,
ethContext,
@ -180,6 +184,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
syncConfig,
dataDirectory,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
metricsSystem,
ethContext,

@ -30,16 +30,19 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
public class CheckpointDownloadBlockStep {
private final ProtocolSchedule protocolSchedule;
private final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
private final EthContext ethContext;
private final PeerTaskExecutor peerTaskExecutor;
private final Checkpoint checkpoint;
@ -48,12 +51,14 @@ public class CheckpointDownloadBlockStep {
public CheckpointDownloadBlockStep(
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final Checkpoint checkpoint,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.checkpoint = checkpoint;
@ -81,7 +86,8 @@ public class CheckpointDownloadBlockStep {
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
CompletableFuture<Optional<BlockWithReceipts>> futureReceipts = new CompletableFuture<>();
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(List.of(block.getHeader()), new BodyValidator());
new GetReceiptsFromPeerTask(
List.of(block.getHeader()), new BodyValidator(), currentProtocolSpecSupplier);
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> executorResult =
peerTaskExecutor.execute(task);

@ -34,6 +34,7 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
@ -44,6 +45,7 @@ import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,6 +61,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
final SynchronizerConfiguration syncConfig,
final Path dataDirectory,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final MetricsSystem metricsSystem,
final EthContext ethContext,
@ -110,6 +113,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,
@ -128,6 +132,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,

@ -24,15 +24,19 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncActions;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.SyncDurationMetrics;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.function.Supplier;
public class CheckpointSyncActions extends FastSyncActions {
public CheckpointSyncActions(
final SynchronizerConfiguration syncConfig,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
@ -43,6 +47,7 @@ public class CheckpointSyncActions extends FastSyncActions {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,
@ -58,6 +63,7 @@ public class CheckpointSyncActions extends FastSyncActions {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,

@ -25,16 +25,20 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.SyncTargetManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.SyncDurationMetrics;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.function.Supplier;
public class CheckpointSyncChainDownloader extends FastSyncChainDownloader {
public static ChainDownloader create(
final SynchronizerConfiguration config,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
@ -59,6 +63,7 @@ public class CheckpointSyncChainDownloader extends FastSyncChainDownloader {
new CheckpointSyncDownloadPipelineFactory(
config,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,

@ -27,18 +27,21 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipelineFactory {
public CheckpointSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
@ -47,6 +50,7 @@ public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipel
super(
syncConfig,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,
@ -86,7 +90,13 @@ public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipel
final CheckpointDownloadBlockStep checkPointDownloadBlockStep =
new CheckpointDownloadBlockStep(
protocolSchedule, ethContext, peerTaskExecutor, checkpoint, syncConfig, metricsSystem);
protocolSchedule,
currentProtocolSpecSupplier,
ethContext,
peerTaskExecutor,
checkpoint,
syncConfig,
metricsSystem);
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",

@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPe
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.HashMap;
@ -36,20 +37,24 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
public class DownloadReceiptsStep
implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> {
private final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
private final EthContext ethContext;
private final PeerTaskExecutor peerTaskExecutor;
private final SynchronizerConfiguration synchronizerConfiguration;
private final MetricsSystem metricsSystem;
public DownloadReceiptsStep(
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem) {
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.synchronizerConfiguration = synchronizerConfiguration;
@ -76,7 +81,8 @@ public class DownloadReceiptsStep
getReceiptsWithPeerTaskSystem(final List<BlockHeader> headers) {
Map<BlockHeader, List<TransactionReceipt>> getReceipts = new HashMap<>();
do {
GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(headers, new BodyValidator());
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(headers, new BodyValidator(), currentProtocolSpecSupplier);
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> getReceiptsResult =
peerTaskExecutor.execute(task);
if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS

@ -27,6 +27,7 @@ import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.SyncDurationMetrics;
@ -47,6 +48,7 @@ public class FastSyncActions {
protected final SynchronizerConfiguration syncConfig;
protected final WorldStateStorageCoordinator worldStateStorageCoordinator;
protected final ProtocolSchedule protocolSchedule;
protected final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
protected final ProtocolContext protocolContext;
protected final EthContext ethContext;
protected final PeerTaskExecutor peerTaskExecutor;
@ -60,6 +62,7 @@ public class FastSyncActions {
final SynchronizerConfiguration syncConfig,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
@ -69,6 +72,7 @@ public class FastSyncActions {
this.syncConfig = syncConfig;
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.protocolSchedule = protocolSchedule;
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
@ -166,6 +170,7 @@ public class FastSyncActions {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,

@ -22,10 +22,13 @@ import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.SyncDurationMetrics;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.function.Supplier;
public class FastSyncChainDownloader {
protected FastSyncChainDownloader() {}
@ -34,6 +37,7 @@ public class FastSyncChainDownloader {
final SynchronizerConfiguration config,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
@ -57,6 +61,7 @@ public class FastSyncChainDownloader {
new FastSyncDownloadPipelineFactory(
config,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,

@ -40,6 +40,7 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.BodyValidationMode;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
@ -48,6 +49,7 @@ import org.hyperledger.besu.services.pipeline.Pipeline;
import org.hyperledger.besu.services.pipeline.PipelineBuilder;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,6 +59,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
protected final SynchronizerConfiguration syncConfig;
protected final ProtocolSchedule protocolSchedule;
protected final Supplier<ProtocolSpec> currentProtocolSpecSupplier;
protected final ProtocolContext protocolContext;
protected final EthContext ethContext;
protected final PeerTaskExecutor peerTaskExecutor;
@ -69,6 +72,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
public FastSyncDownloadPipelineFactory(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
@ -76,6 +80,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.currentProtocolSpecSupplier = currentProtocolSpecSupplier;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
@ -149,7 +154,8 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
final DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(ethContext, peerTaskExecutor, syncConfig, metricsSystem);
new DownloadReceiptsStep(
currentProtocolSpecSupplier, ethContext, peerTaskExecutor, syncConfig, metricsSystem);
final ImportBlocksStep importBlockStep =
new ImportBlocksStep(
protocolSchedule,

@ -28,6 +28,7 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncStateStorage;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.metrics.BesuMetricCategory;
@ -41,6 +42,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.slf4j.Logger;
@ -57,6 +59,7 @@ public class FastDownloaderFactory {
final SynchronizerConfiguration syncConfig,
final Path dataDirectory,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final MetricsSystem metricsSystem,
final EthContext ethContext,
@ -126,6 +129,7 @@ public class FastDownloaderFactory {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,

@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
@ -41,6 +42,7 @@ import org.hyperledger.besu.services.tasks.InMemoryTasksPriorityQueues;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,6 +57,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
final SynchronizerConfiguration syncConfig,
final Path dataDirectory,
final ProtocolSchedule protocolSchedule,
final Supplier<ProtocolSpec> currentProtocolSpecSupplier,
final ProtocolContext protocolContext,
final MetricsSystem metricsSystem,
final EthContext ethContext,
@ -121,6 +124,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
currentProtocolSpecSupplier,
protocolContext,
ethContext,
peerTaskExecutor,

@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import java.util.ArrayList;
@ -42,7 +43,8 @@ public class GetReceiptsFromPeerTaskTest {
@Test
public void testGetSubProtocol() {
GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null);
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(Collections.emptyList(), null, () -> null);
Assertions.assertEquals(EthProtocol.get(), task.getSubProtocol());
}
@ -50,7 +52,7 @@ public class GetReceiptsFromPeerTaskTest {
public void testGetRequestMessage() {
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null);
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null, () -> null);
MessageData messageData = task.getRequestMessage();
GetReceiptsMessage getReceiptsMessage = GetReceiptsMessage.readFrom(messageData);
@ -72,7 +74,8 @@ public class GetReceiptsFromPeerTaskTest {
@Test
public void testParseResponseWithNullResponseMessage() {
GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null);
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(Collections.emptyList(), null, () -> null);
Assertions.assertThrows(InvalidPeerTaskResponseException.class, () -> task.parseResponse(null));
}
@ -80,7 +83,7 @@ public class GetReceiptsFromPeerTaskTest {
public void testParseResponseForInvalidResponse() {
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null);
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null, () -> null);
ReceiptsMessage receiptsMessage =
ReceiptsMessage.create(
List.of(
@ -103,7 +106,7 @@ public class GetReceiptsFromPeerTaskTest {
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(
List.of(blockHeader1, blockHeader2, blockHeader3), bodyValidator);
List.of(blockHeader1, blockHeader2, blockHeader3), bodyValidator, () -> null);
TransactionReceipt receiptForBlock1 =
new TransactionReceipt(1, 123, Collections.emptyList(), Optional.empty());
@ -137,8 +140,12 @@ public class GetReceiptsFromPeerTaskTest {
BlockHeader blockHeader2 = mockBlockHeader(2);
BlockHeader blockHeader3 = mockBlockHeader(3);
ProtocolSpec protocolSpec = Mockito.mock(ProtocolSpec.class);
Mockito.when(protocolSpec.isPoS()).thenReturn(false);
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(List.of(blockHeader1, blockHeader2, blockHeader3), null);
new GetReceiptsFromPeerTask(
List.of(blockHeader1, blockHeader2, blockHeader3), null, () -> protocolSpec);
EthPeer failForIncorrectProtocol = mockPeer("incorrectProtocol", 5);
EthPeer failForShortChainHeight = mockPeer("incorrectProtocol", 1);

@ -157,7 +157,7 @@ public class CheckPointSyncChainDownloaderTest {
}
});
when(peerTaskExecutor.executeAsync(any(GetReceiptsFromPeerTask.class)))
when(peerTaskExecutor.execute(any(GetReceiptsFromPeerTask.class)))
.thenAnswer(
new Answer<
CompletableFuture<
@ -207,6 +207,7 @@ public class CheckPointSyncChainDownloaderTest {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
() -> null,
protocolContext,
ethContext,
peerTaskExecutor,

@ -87,6 +87,7 @@ public class DownloadReceiptsStepTest {
public void shouldDownloadReceiptsForBlocks() {
DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(
() -> null,
ethProtocolManager.ethContext(),
peerTaskExecutor,
SynchronizerConfiguration.builder().isPeerTaskSystemEnabled(false).build(),
@ -112,6 +113,7 @@ public class DownloadReceiptsStepTest {
throws ExecutionException, InterruptedException {
DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(
() -> null,
ethProtocolManager.ethContext(),
peerTaskExecutor,
SynchronizerConfiguration.builder().isPeerTaskSystemEnabled(true).build(),

@ -113,6 +113,7 @@ public class FastDownloaderFactoryTest {
syncConfig,
dataDirectory,
protocolSchedule,
() -> null,
protocolContext,
metricsSystem,
ethContext,
@ -139,6 +140,7 @@ public class FastDownloaderFactoryTest {
syncConfig,
dataDirectory,
protocolSchedule,
() -> null,
protocolContext,
metricsSystem,
ethContext,
@ -168,6 +170,7 @@ public class FastDownloaderFactoryTest {
syncConfig,
dataDirectory,
protocolSchedule,
() -> null,
protocolContext,
metricsSystem,
ethContext,
@ -204,6 +207,7 @@ public class FastDownloaderFactoryTest {
syncConfig,
dataDirectory,
protocolSchedule,
() -> null,
protocolContext,
metricsSystem,
ethContext,
@ -242,6 +246,7 @@ public class FastDownloaderFactoryTest {
syncConfig,
dataDirectory,
protocolSchedule,
() -> null,
protocolContext,
metricsSystem,
ethContext,

@ -585,9 +585,10 @@ public class FastSyncActionsTest {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
() -> null,
protocolContext,
ethContext,
new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()),
new PeerTaskExecutor(null, null, new NoOpMetricsSystem()),
new SyncState(blockchain, ethContext.getEthPeers(), true, Optional.empty()),
pivotBlockSelector,
new NoOpMetricsSystem());

@ -109,9 +109,10 @@ public class FastSyncChainDownloaderTest {
syncConfig,
worldStateStorageCoordinator,
protocolSchedule,
() -> null,
protocolContext,
ethContext,
new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()),
new PeerTaskExecutor(null, null, new NoOpMetricsSystem()),
syncState,
new NoOpMetricsSystem(),
new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()),

Loading…
Cancel
Save