7311: Add GetReceiptsFromPeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7638/head
Matilda Clerke 2 months ago
parent c03bffbc93
commit b2a45c201e
  1. 20
      besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java
  2. 7
      besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java
  3. 7
      besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java
  4. 10
      besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java
  5. 89
      ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/BodyValidator.java
  6. 15
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java
  7. 122
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java
  8. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java
  9. 62
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java
  10. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java
  11. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java
  12. 10
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java
  13. 14
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java
  14. 31
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java
  15. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java
  16. 10
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java
  17. 6
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java
  18. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java
  19. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java
  20. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java
  21. 7
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java
  22. 34
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskFeatureToggleTestHelper.java
  23. 152
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java
  24. 152
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java
  25. 50
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java
  26. 7
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java
  27. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java
  28. 2
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java
  29. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java
  30. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java

@ -55,6 +55,9 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRequestSender;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.CheckpointBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator;
@ -652,6 +655,11 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
}
final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final PeerManager peerManager = new PeerManager();
ethPeers.streamAllPeers().forEach(peerManager::addPeer);
final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(
peerManager, new PeerTaskRequestSender(), currentProtocolSpecSupplier, metricsSystem);
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);
@ -691,7 +699,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
scheduler,
peerValidators,
Optional.empty(),
forkIdManager);
forkIdManager,
peerManager);
final PivotBlockSelector pivotBlockSelector =
createPivotSelector(
@ -703,6 +712,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
worldStateStorageCoordinator,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
ethProtocolManager,
pivotBlockSelector);
@ -835,6 +845,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {
@ -846,6 +857,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
worldStateStorageCoordinator,
ethProtocolManager.getBlockBroadcaster(),
ethContext,
peerTaskExecutor,
syncState,
dataDirectory,
storageProvider,
@ -1035,7 +1047,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
return new EthProtocolManager(
protocolContext.getBlockchain(),
networkId,
@ -1049,7 +1062,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides
mergePeerFilter,
synchronizerConfiguration,
scheduler,
forkIdManager);
forkIdManager,
peerManager);
}
/**

@ -42,6 +42,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -243,7 +244,8 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
return besuControllerBuilderSchedule
.get(0L)
.createEthProtocolManager(
@ -257,7 +259,8 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde
scheduler,
peerValidators,
mergePeerFilter,
forkIdManager);
forkIdManager,
peerManager);
}
@Override

@ -34,6 +34,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -99,7 +100,8 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
var mergeContext = protocolContext.getConsensusContext(MergeContext.class);
@ -129,7 +131,8 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder {
scheduler,
peerValidators,
filterToUse,
forkIdManager);
forkIdManager,
peerManager);
return ethProtocolManager;
}

@ -40,6 +40,8 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
@ -163,7 +165,8 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
return mergeBesuControllerBuilder.createEthProtocolManager(
protocolContext,
synchronizerConfiguration,
@ -175,7 +178,8 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
scheduler,
peerValidators,
mergePeerFilter,
forkIdManager);
forkIdManager,
peerManager);
}
@Override
@ -225,6 +229,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {
@ -235,6 +240,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder {
worldStateStorageCoordinator,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
ethProtocolManager,
pivotBlockSelector);

@ -0,0 +1,89 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.mainnet;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Request;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.evm.log.LogsBloomFilter;
import java.util.List;
/** A utility class for body validation tasks. Implemented utilising BodyValidation */
public class BodyValidator {
/**
* Generates the transaction root for a list of transactions
*
* @param transactions the transactions
* @return the transaction root
*/
public Hash transactionsRoot(final List<Transaction> transactions) {
return BodyValidation.transactionsRoot(transactions);
}
/**
* Generates the withdrawals root for a list of withdrawals
*
* @param withdrawals the transactions
* @return the transaction root
*/
public Hash withdrawalsRoot(final List<Withdrawal> withdrawals) {
return BodyValidation.withdrawalsRoot(withdrawals);
}
/**
* Generates the requests root for a list of requests
*
* @param requests list of request
* @return the requests root
*/
public Hash requestsRoot(final List<Request> requests) {
return BodyValidation.requestsRoot(requests);
}
/**
* Generates the receipt root for a list of receipts
*
* @param receipts the receipts
* @return the receipt root
*/
public Hash receiptsRoot(final List<TransactionReceipt> receipts) {
return BodyValidation.receiptsRoot(receipts);
}
/**
* Generates the ommers hash for a list of ommer block headers
*
* @param ommers the ommer block headers
* @return the ommers hash
*/
public Hash ommersHash(final List<BlockHeader> ommers) {
return BodyValidation.ommersHash(ommers);
}
/**
* Generates the logs bloom filter for a list of transaction receipts
*
* @param receipts the transaction receipts
* @return the logs bloom filter
*/
public LogsBloomFilter logsBloom(final List<TransactionReceipt> receipts) {
return BodyValidation.logsBloom(receipts);
}
}

@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
@ -69,6 +70,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private final Hash genesisHash;
private final ForkIdManager forkIdManager;
private final PeerManager peerManager;
private final BigInteger networkId;
private final EthPeers ethPeers;
private final EthMessages ethMessages;
@ -92,7 +94,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler,
final ForkIdManager forkIdManager) {
final ForkIdManager forkIdManager,
final PeerManager peerManager) {
this.networkId = networkId;
this.peerValidators = peerValidators;
this.scheduler = scheduler;
@ -102,6 +105,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO);
this.forkIdManager = forkIdManager;
this.peerManager = peerManager;
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
@ -140,7 +144,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler) {
final EthScheduler scheduler,
final PeerManager peerManager) {
this(
blockchain,
networkId,
@ -158,7 +163,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
blockchain,
Collections.emptyList(),
Collections.emptyList(),
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()));
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()),
peerManager);
}
public EthContext ethContext() {
@ -337,7 +343,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerNewConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection);
peerManager.addPeer(peer);
final Capability cap = connection.capability(getSupportedProtocol());
final ForkId latestForkId =
cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null;
@ -369,6 +375,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final DisconnectReason reason,
final boolean initiatedByPeer) {
final boolean wasActiveConnection = ethPeers.registerDisconnect(connection);
peerManager.removePeer(connection.getPeer());
LOG.atDebug()
.setMessage("Disconnect - active Connection? {} - {} - {} - {} {} - {} peers left")
.addArgument(wasActiveConnection)

@ -23,7 +23,7 @@ import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskBehavior;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidation;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import java.util.ArrayList;
@ -33,71 +33,75 @@ import java.util.List;
import java.util.Map;
public class GetReceiptsFromPeerTask
implements PeerTask<Map<BlockHeader, List<TransactionReceipt>>> {
implements PeerTask<Map<BlockHeader, List<TransactionReceipt>>> {
private final Collection<BlockHeader> blockHeaders;
private final Map<Hash, List<BlockHeader>> headersByReceiptsRoot = new HashMap<>();
private final Collection<BlockHeader> blockHeaders;
private final BodyValidator bodyValidator;
private final Map<Hash, List<BlockHeader>> headersByReceiptsRoot = new HashMap<>();
public GetReceiptsFromPeerTask(final Collection<BlockHeader> blockHeaders) {
this.blockHeaders = blockHeaders;
blockHeaders.forEach(
header ->
headersByReceiptsRoot
.computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>())
.add(header));
}
public GetReceiptsFromPeerTask(
final Collection<BlockHeader> blockHeaders, final BodyValidator bodyValidator) {
this.blockHeaders = blockHeaders;
this.bodyValidator = bodyValidator;
@Override
public String getSubProtocol() {
return EthProtocol.NAME;
}
blockHeaders.forEach(
header ->
headersByReceiptsRoot
.computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>())
.add(header));
}
@Override
public long getRequiredBlockNumber() {
return blockHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
}
@Override
public String getSubProtocol() {
return EthProtocol.NAME;
}
@Override
public MessageData getRequestMessage() {
// Since we have to match up the data by receipt root, we only need to request receipts
// for one of the headers with each unique receipt root.
final List<Hash> blockHashes =
headersByReceiptsRoot.values().stream()
.map(headers -> headers.getFirst().getHash())
.toList();
return GetReceiptsMessage.create(blockHashes);
}
@Override
public long getRequiredBlockNumber() {
return blockHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
}
@Override
public Map<BlockHeader, List<TransactionReceipt>> parseResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException {
if (messageData == null) {
throw new InvalidPeerTaskResponseException();
}
final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(messageData);
final List<List<TransactionReceipt>> receiptsByBlock = receiptsMessage.receipts();
if (receiptsByBlock.isEmpty() || receiptsByBlock.size() > blockHeaders.size()) {
throw new InvalidPeerTaskResponseException();
}
@Override
public MessageData getRequestMessage() {
// Since we have to match up the data by receipt root, we only need to request receipts
// for one of the headers with each unique receipt root.
final List<Hash> blockHashes =
headersByReceiptsRoot.values().stream()
.map(headers -> headers.getFirst().getHash())
.toList();
return GetReceiptsMessage.create(blockHashes);
}
final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader = new HashMap<>();
for (final List<TransactionReceipt> receiptsInBlock : receiptsByBlock) {
final List<BlockHeader> blockHeaders =
headersByReceiptsRoot.get(BodyValidation.receiptsRoot(receiptsInBlock));
if (blockHeaders == null) {
// Contains receipts that we didn't request, so mustn't be the response we're looking for.
throw new InvalidPeerTaskResponseException();
}
blockHeaders.forEach(header -> receiptsByHeader.put(header, receiptsInBlock));
}
return receiptsByHeader;
@Override
public Map<BlockHeader, List<TransactionReceipt>> parseResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException {
if (messageData == null) {
throw new InvalidPeerTaskResponseException();
}
final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(messageData);
final List<List<TransactionReceipt>> receiptsByBlock = receiptsMessage.receipts();
if (receiptsByBlock.isEmpty() || receiptsByBlock.size() > blockHeaders.size()) {
throw new InvalidPeerTaskResponseException();
}
@Override
public Collection<PeerTaskBehavior> getPeerTaskBehaviors() {
return List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS, PeerTaskBehavior.RETRY_WITH_SAME_PEER);
final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader = new HashMap<>();
for (final List<TransactionReceipt> receiptsInBlock : receiptsByBlock) {
final List<BlockHeader> blockHeaders =
headersByReceiptsRoot.get(bodyValidator.receiptsRoot(receiptsInBlock));
if (blockHeaders == null) {
// Contains receipts that we didn't request, so mustn't be the response we're looking for.
throw new InvalidPeerTaskResponseException();
}
blockHeaders.forEach(header -> receiptsByHeader.put(header, receiptsInBlock));
}
}
return receiptsByHeader;
}
@Override
public Collection<PeerTaskBehavior> getPeerTaskBehaviors() {
return List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS, PeerTaskBehavior.RETRY_WITH_SAME_PEER);
}
}

@ -22,6 +22,7 @@ import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.checkpointsync.CheckpointDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
@ -82,6 +83,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final BlockBroadcaster blockBroadcaster,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final Path dataDirectory,
final StorageProvider storageProvider,
@ -147,6 +149,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
@ -163,6 +166,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
@ -179,6 +183,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,

@ -16,17 +16,24 @@ package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskFeatureToggle;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.GetBlockFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -34,16 +41,19 @@ public class CheckpointDownloadBlockStep {
private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final PeerTaskExecutor peerTaskExecutor;
private final Checkpoint checkpoint;
private final MetricsSystem metricsSystem;
public CheckpointDownloadBlockStep(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final Checkpoint checkpoint,
final MetricsSystem metricsSystem) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.checkpoint = checkpoint;
this.metricsSystem = metricsSystem;
}
@ -65,17 +75,43 @@ public class CheckpointDownloadBlockStep {
private CompletableFuture<Optional<BlockWithReceipts>> downloadReceipts(
final PeerTaskResult<Block> peerTaskResult) {
final Block block = peerTaskResult.getResult();
final GetReceiptsFromPeerTask getReceiptsFromPeerTask =
GetReceiptsFromPeerTask.forHeaders(ethContext, List.of(block.getHeader()), metricsSystem);
return getReceiptsFromPeerTask
.run()
.thenCompose(
receiptTaskResult -> {
final Optional<List<TransactionReceipt>> transactionReceipts =
Optional.ofNullable(receiptTaskResult.getResult().get(block.getHeader()));
return CompletableFuture.completedFuture(
transactionReceipts.map(receipts -> new BlockWithReceipts(block, receipts)));
})
.exceptionally(throwable -> Optional.empty());
if (PeerTaskFeatureToggle.usePeerTaskSystem()) {
CompletableFuture<Optional<BlockWithReceipts>> futureReceipts = new CompletableFuture<>();
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(List.of(block.getHeader()), new BodyValidator());
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> executorResult =
peerTaskExecutor.execute(task);
if (executorResult.getResponseCode() == PeerTaskExecutorResponseCode.SUCCESS) {
List<TransactionReceipt> transactionReceipts =
executorResult
.getResult()
.map((map) -> map.get(block.getHeader()))
.orElseThrow(
() ->
new IllegalStateException("PeerTask response code was success, but empty"));
BlockWithReceipts blockWithReceipts = new BlockWithReceipts(block, transactionReceipts);
futureReceipts.complete(Optional.of(blockWithReceipts));
} else {
futureReceipts.complete(Optional.empty());
}
return futureReceipts;
} else {
final org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask
getReceiptsFromPeerTask =
org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask.forHeaders(
ethContext, List.of(block.getHeader()), metricsSystem);
return getReceiptsFromPeerTask
.run()
.thenCompose(
receiptTaskResult -> {
final Optional<List<TransactionReceipt>> transactionReceipts =
Optional.ofNullable(receiptTaskResult.getResult().get(block.getHeader()));
return CompletableFuture.completedFuture(
transactionReceipts.map(receipts -> new BlockWithReceipts(block, receipts)));
})
.exceptionally(throwable -> Optional.empty());
}
}
}

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -61,6 +62,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
final ProtocolContext protocolContext,
final MetricsSystem metricsSystem,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final SyncState syncState,
final Clock clock,
@ -110,6 +112,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
pivotBlockSelector,
metricsSystem);
@ -127,6 +130,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory {
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
pivotBlockSelector,
metricsSystem);

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -34,6 +35,7 @@ public class CheckpointSyncActions extends FastSyncActions {
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final PivotBlockSelector pivotBlockSelector,
final MetricsSystem metricsSystem) {
@ -43,6 +45,7 @@ public class CheckpointSyncActions extends FastSyncActions {
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
pivotBlockSelector,
metricsSystem);
@ -57,6 +60,7 @@ public class CheckpointSyncActions extends FastSyncActions {
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
metricsSystem,
currentState,

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.checkpointsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -36,6 +37,7 @@ public class CheckpointSyncChainDownloader extends FastSyncChainDownloader {
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final MetricsSystem metricsSystem,
final FastSyncState fastSyncState,
@ -55,7 +57,13 @@ public class CheckpointSyncChainDownloader extends FastSyncChainDownloader {
syncState,
syncTargetManager,
new CheckpointSyncDownloadPipelineFactory(
config, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem),
config,
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
fastSyncState,
metricsSystem),
ethContext.getScheduler(),
metricsSystem,
syncDurationMetrics);

@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloadPipelineFactory;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
@ -40,9 +41,17 @@ public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipel
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final FastSyncState fastSyncState,
final MetricsSystem metricsSystem) {
super(syncConfig, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem);
super(
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
fastSyncState,
metricsSystem);
}
@Override
@ -76,7 +85,8 @@ public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipel
checkPointSource, checkpoint, protocolContext.getBlockchain());
final CheckpointDownloadBlockStep checkPointDownloadBlockStep =
new CheckpointDownloadBlockStep(protocolSchedule, ethContext, checkpoint, metricsSystem);
new CheckpointDownloadBlockStep(
protocolSchedule, ethContext, peerTaskExecutor, checkpoint, metricsSystem);
return PipelineBuilder.createPipelineFrom(
"fetchCheckpoints",

@ -22,10 +22,17 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskFeatureToggle;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask;
import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.FutureUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -34,18 +41,36 @@ import java.util.function.Function;
public class DownloadReceiptsStep
implements Function<List<Block>, CompletableFuture<List<BlockWithReceipts>>> {
private final EthContext ethContext;
private final PeerTaskExecutor peerTaskExecutor;
private final MetricsSystem metricsSystem;
public DownloadReceiptsStep(final EthContext ethContext, final MetricsSystem metricsSystem) {
public DownloadReceiptsStep(
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.metricsSystem = metricsSystem;
}
@Override
public CompletableFuture<List<BlockWithReceipts>> apply(final List<Block> blocks) {
final List<BlockHeader> headers = blocks.stream().map(Block::getHeader).collect(toList());
final CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> getReceipts =
GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem).run();
final CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> getReceipts;
if (PeerTaskFeatureToggle.usePeerTaskSystem()) {
GetReceiptsFromPeerTask getReceiptsFromPeerTask =
new GetReceiptsFromPeerTask(headers, new BodyValidator());
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> getReceiptsResult =
peerTaskExecutor.execute(getReceiptsFromPeerTask);
if (getReceiptsResult.getResponseCode() == PeerTaskExecutorResponseCode.SUCCESS
&& getReceiptsResult.getResult().isPresent()) {
getReceipts = CompletableFuture.completedFuture(getReceiptsResult.getResult().get());
} else {
getReceipts = CompletableFuture.completedFuture(Collections.emptyMap());
}
} else {
getReceipts = GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem).run();
}
final CompletableFuture<List<BlockWithReceipts>> combineWithBlocks =
getReceipts.thenApply(
receiptsByHeader -> combineBlocksAndReceipts(blocks, receiptsByHeader));

@ -19,6 +19,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
@ -48,6 +49,7 @@ public class FastSyncActions {
protected final ProtocolSchedule protocolSchedule;
protected final ProtocolContext protocolContext;
protected final EthContext ethContext;
protected final PeerTaskExecutor peerTaskExecutor;
protected final SyncState syncState;
protected final PivotBlockSelector pivotBlockSelector;
protected final MetricsSystem metricsSystem;
@ -60,6 +62,7 @@ public class FastSyncActions {
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final PivotBlockSelector pivotBlockSelector,
final MetricsSystem metricsSystem) {
@ -68,6 +71,7 @@ public class FastSyncActions {
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.syncState = syncState;
this.pivotBlockSelector = pivotBlockSelector;
this.metricsSystem = metricsSystem;
@ -164,6 +168,7 @@ public class FastSyncActions {
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
metricsSystem,
currentState,

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -35,6 +36,7 @@ public class FastSyncChainDownloader {
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final MetricsSystem metricsSystem,
final FastSyncState fastSyncState,
@ -53,7 +55,13 @@ public class FastSyncChainDownloader {
syncState,
syncTargetManager,
new FastSyncDownloadPipelineFactory(
config, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem),
config,
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
fastSyncState,
metricsSystem),
ethContext.getScheduler(),
metricsSystem,
syncDurationMetrics);

@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory;
@ -57,6 +58,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
protected final ProtocolSchedule protocolSchedule;
protected final ProtocolContext protocolContext;
protected final EthContext ethContext;
protected final PeerTaskExecutor peerTaskExecutor;
protected final FastSyncState fastSyncState;
protected final MetricsSystem metricsSystem;
protected final FastSyncValidationPolicy attachedValidationPolicy;
@ -68,12 +70,14 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
final ProtocolSchedule protocolSchedule,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final FastSyncState fastSyncState,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.peerTaskExecutor = peerTaskExecutor;
this.fastSyncState = fastSyncState;
this.metricsSystem = metricsSystem;
final LabelledMetric<Counter> fastSyncValidationCounter =
@ -140,7 +144,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
final DownloadReceiptsStep downloadReceiptsStep =
new DownloadReceiptsStep(ethContext, metricsSystem);
new DownloadReceiptsStep(ethContext, peerTaskExecutor, metricsSystem);
final ImportBlocksStep importBlockStep =
new ImportBlocksStep(
protocolSchedule,

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -59,6 +60,7 @@ public class FastDownloaderFactory {
final ProtocolContext protocolContext,
final MetricsSystem metricsSystem,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final SyncState syncState,
final Clock clock,
@ -126,6 +128,7 @@ public class FastDownloaderFactory {
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
pivotBlockSelector,
metricsSystem),

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -57,6 +58,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
final ProtocolContext protocolContext,
final MetricsSystem metricsSystem,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final SyncState syncState,
final Clock clock,
@ -121,6 +123,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory {
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
pivotBlockSelector,
metricsSystem),

@ -44,6 +44,7 @@ import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.EthProtocolVersion;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection.PeerSendHandler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
@ -1243,7 +1244,8 @@ public final class EthProtocolManagerTest {
Optional.empty(),
syncConfig,
mock(EthScheduler.class),
mock(ForkIdManager.class))) {
mock(ForkIdManager.class),
new PeerManager())) {
return ethManager;
}

@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
@ -117,7 +118,8 @@ public class EthProtocolManagerTestUtil {
mergePeerFilter,
mock(SynchronizerConfiguration.class),
ethScheduler,
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false));
new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false),
new PeerManager());
}
public static EthProtocolManager create(
@ -168,7 +170,8 @@ public class EthProtocolManagerTestUtil {
Optional.empty(),
mock(SynchronizerConfiguration.class),
ethScheduler,
forkIdManager);
forkIdManager,
new PeerManager());
}
public static EthProtocolManager create(final Blockchain blockchain) {

@ -0,0 +1,34 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import java.lang.reflect.Field;
import org.junit.platform.commons.util.ReflectionUtils;
public class PeerTaskFeatureToggleTestHelper {
public static void setPeerTaskFeatureToggle(final boolean usePeerTaskSystem)
throws IllegalAccessException {
Field usePeerTaskSystemField =
ReflectionUtils.findFields(
PeerTaskFeatureToggle.class,
(f) -> f.getName().equals("USE_PEER_TASK_SYSTEM"),
ReflectionUtils.HierarchyTraversalMode.TOP_DOWN)
.getFirst();
usePeerTaskSystemField.setAccessible(true);
usePeerTaskSystemField.set(null, usePeerTaskSystem);
}
}

@ -0,0 +1,152 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class GetReceiptsFromPeerTaskTest {
@Test
public void testGetSubProtocol() {
GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null);
Assertions.assertEquals(EthProtocol.NAME, task.getSubProtocol());
}
@Test
public void testGetRequiredBlockNumber() {
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null);
Assertions.assertEquals(3, task.getRequiredBlockNumber());
}
@Test
public void testGetRequestMessage() {
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null);
MessageData messageData = task.getRequestMessage();
GetReceiptsMessage getReceiptsMessage = GetReceiptsMessage.readFrom(messageData);
Assertions.assertEquals(EthPV63.GET_RECEIPTS, getReceiptsMessage.getCode());
Iterable<Hash> hashesInMessage = getReceiptsMessage.hashes();
List<Hash> expectedHashes =
List.of(
Hash.fromHexString(StringUtils.repeat("00", 31) + "11"),
Hash.fromHexString(StringUtils.repeat("00", 31) + "21"),
Hash.fromHexString(StringUtils.repeat("00", 31) + "31"));
List<Hash> actualHashes = new ArrayList<>();
hashesInMessage.forEach(actualHashes::add);
Assertions.assertEquals(3, actualHashes.size());
Assertions.assertEquals(
expectedHashes.stream().sorted().toList(), actualHashes.stream().sorted().toList());
}
@Test
public void testParseResponseWithNullResponseMessage() {
GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null);
Assertions.assertThrows(InvalidPeerTaskResponseException.class, () -> task.parseResponse(null));
}
@Test
public void testParseResponseForInvalidResponse() throws InvalidPeerTaskResponseException {
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(
List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null);
ReceiptsMessage receiptsMessage =
ReceiptsMessage.create(
List.of(
List.of(new TransactionReceipt(1, 123, Collections.emptyList(), Optional.empty())),
List.of(new TransactionReceipt(1, 456, Collections.emptyList(), Optional.empty())),
List.of(new TransactionReceipt(1, 789, Collections.emptyList(), Optional.empty())),
List.of(
new TransactionReceipt(1, 101112, Collections.emptyList(), Optional.empty()))));
Assertions.assertThrows(
InvalidPeerTaskResponseException.class, () -> task.parseResponse(receiptsMessage));
}
@Test
public void testParseResponse() throws InvalidPeerTaskResponseException {
BodyValidator bodyValidator = Mockito.mock(BodyValidator.class);
BlockHeader blockHeader1 = mockBlockHeader(1);
BlockHeader blockHeader2 = mockBlockHeader(2);
BlockHeader blockHeader3 = mockBlockHeader(3);
GetReceiptsFromPeerTask task =
new GetReceiptsFromPeerTask(
List.of(blockHeader1, blockHeader2, blockHeader3), bodyValidator);
TransactionReceipt receiptForBlock1 =
new TransactionReceipt(1, 123, Collections.emptyList(), Optional.empty());
TransactionReceipt receiptForBlock2 =
new TransactionReceipt(1, 456, Collections.emptyList(), Optional.empty());
TransactionReceipt receiptForBlock3 =
new TransactionReceipt(1, 789, Collections.emptyList(), Optional.empty());
ReceiptsMessage receiptsMessage =
ReceiptsMessage.create(
List.of(
List.of(receiptForBlock1), List.of(receiptForBlock2), List.of(receiptForBlock3)));
Mockito.when(bodyValidator.receiptsRoot(List.of(receiptForBlock1)))
.thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + "12"));
Mockito.when(bodyValidator.receiptsRoot(List.of(receiptForBlock2)))
.thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + "22"));
Mockito.when(bodyValidator.receiptsRoot(List.of(receiptForBlock3)))
.thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + "32"));
Map<BlockHeader, List<TransactionReceipt>> resultMap = task.parseResponse(receiptsMessage);
Assertions.assertEquals(3, resultMap.size());
Assertions.assertEquals(List.of(receiptForBlock1), resultMap.get(blockHeader1));
Assertions.assertEquals(List.of(receiptForBlock2), resultMap.get(blockHeader2));
Assertions.assertEquals(List.of(receiptForBlock3), resultMap.get(blockHeader3));
}
private BlockHeader mockBlockHeader(final long blockNumber) {
BlockHeader blockHeader = Mockito.mock(BlockHeader.class);
Mockito.when(blockHeader.getNumber()).thenReturn(blockNumber);
// second to last hex digit indicates the blockNumber, last hex digit indicates the usage of the
// hash
Mockito.when(blockHeader.getHash())
.thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "1"));
Mockito.when(blockHeader.getReceiptsRoot())
.thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "2"));
return blockHeader;
}
}

@ -22,13 +22,20 @@ import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskFeatureToggleTestHelper;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
@ -44,8 +51,15 @@ import org.hyperledger.besu.metrics.SyncDurationMetrics;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
@ -55,12 +69,16 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.platform.commons.util.ReflectionUtils;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class CheckPointSyncChainDownloaderTest {
protected ProtocolSchedule protocolSchedule;
protected EthProtocolManager ethProtocolManager;
protected EthContext ethContext;
private PeerTaskExecutor peerTaskExecutor;
protected ProtocolContext protocolContext;
private SyncState syncState;
@ -100,6 +118,7 @@ public class CheckPointSyncChainDownloaderTest {
localBlockchain = localBlockchainSetup.getBlockchain();
otherBlockchainSetup = BlockchainSetupUtil.forTesting(dataStorageFormat);
otherBlockchain = otherBlockchainSetup.getBlockchain();
otherBlockchainSetup.importFirstBlocks(30);
protocolSchedule = localBlockchainSetup.getProtocolSchedule();
protocolContext = localBlockchainSetup.getProtocolContext();
ethProtocolManager =
@ -123,6 +142,57 @@ public class CheckPointSyncChainDownloaderTest {
ethContext.getEthPeers(),
true,
Optional.of(checkpoint));
peerTaskExecutor = mock(PeerTaskExecutor.class);
when(peerTaskExecutor.execute(any(GetReceiptsFromPeerTask.class)))
.thenAnswer(
new Answer<PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>>>() {
@Override
public PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> answer(
final InvocationOnMock invocationOnMock) throws Throwable {
GetReceiptsFromPeerTask task =
invocationOnMock.getArgument(0, GetReceiptsFromPeerTask.class);
return processTask(task);
}
});
when(peerTaskExecutor.executeAsync(any(GetReceiptsFromPeerTask.class)))
.thenAnswer(
new Answer<
CompletableFuture<
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>>>>() {
@Override
public CompletableFuture<
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>>>
answer(final InvocationOnMock invocationOnMock) throws Throwable {
GetReceiptsFromPeerTask task =
invocationOnMock.getArgument(0, GetReceiptsFromPeerTask.class);
return CompletableFuture.completedFuture(processTask(task));
}
});
}
@SuppressWarnings("unchecked")
private PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> processTask(
final GetReceiptsFromPeerTask task) throws IllegalAccessException {
Map<BlockHeader, List<TransactionReceipt>> getReceiptsFromPeerTaskResult = new HashMap<>();
List<Field> fields =
ReflectionUtils.findFields(
task.getClass(),
(field) -> field.getName().equals("blockHeaders"),
ReflectionUtils.HierarchyTraversalMode.TOP_DOWN);
fields.forEach((f) -> f.setAccessible(true));
Collection<BlockHeader> blockHeaders = (Collection<BlockHeader>) fields.getFirst().get(task);
blockHeaders.forEach(
(bh) ->
getReceiptsFromPeerTaskResult.put(
bh, otherBlockchain.getTxReceipts(bh.getHash()).get()));
return new PeerTaskExecutorResult<>(
getReceiptsFromPeerTaskResult, PeerTaskExecutorResponseCode.SUCCESS);
}
@AfterEach
@ -140,6 +210,7 @@ public class CheckPointSyncChainDownloaderTest {
protocolSchedule,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
new NoOpMetricsSystem(),
new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()),
@ -148,9 +219,10 @@ public class CheckPointSyncChainDownloaderTest {
@ParameterizedTest
@ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class)
public void shouldSyncToPivotBlockInMultipleSegments(final DataStorageFormat storageFormat) {
public void shouldSyncToPivotBlockInMultipleSegments(final DataStorageFormat storageFormat)
throws IllegalAccessException {
PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(false);
setup(storageFormat);
otherBlockchainSetup.importFirstBlocks(30);
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
@ -184,9 +256,81 @@ public class CheckPointSyncChainDownloaderTest {
@ParameterizedTest
@ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class)
public void shouldSyncToPivotBlockInSingleSegment(final DataStorageFormat storageFormat) {
public void shouldSyncToPivotBlockInSingleSegment(final DataStorageFormat storageFormat)
throws IllegalAccessException {
PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(false);
setup(storageFormat);
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(otherBlockchain);
final long pivotBlockNumber = 10;
final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder().build();
ethContext
.getEthPeers()
.streamAvailablePeers()
.forEach(
ethPeer -> {
ethPeer.setCheckpointHeader(
otherBlockchainSetup.getBlocks().get((int) checkpoint.blockNumber()).getHeader());
});
final ChainDownloader downloader = downloader(syncConfig, pivotBlockNumber);
final CompletableFuture<Void> result = downloader.start();
peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone());
assertThat(result).isCompleted();
assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber);
assertThat(localBlockchain.getChainHeadHeader())
.isEqualTo(otherBlockchain.getBlockHeader(pivotBlockNumber).get());
}
@ParameterizedTest
@ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class)
public void shouldSyncToPivotBlockInMultipleSegmentsWithPeerTaskSystem(
final DataStorageFormat storageFormat)
throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {
PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(true);
setup(storageFormat);
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
final RespondingEthPeer.Responder responder =
RespondingEthPeer.blockchainResponder(otherBlockchain);
final SynchronizerConfiguration syncConfig =
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build();
final long pivotBlockNumber = 25;
ethContext
.getEthPeers()
.streamAvailablePeers()
.forEach(
ethPeer -> {
ethPeer.setCheckpointHeader(
otherBlockchainSetup.getBlocks().get((int) checkpoint.blockNumber()).getHeader());
});
final ChainDownloader downloader = downloader(syncConfig, pivotBlockNumber);
final CompletableFuture<Void> result = downloader.start();
peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone());
assertThat(result).isCompleted();
assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber);
assertThat(localBlockchain.getChainHeadHeader())
.isEqualTo(otherBlockchain.getBlockHeader(pivotBlockNumber).get());
}
@ParameterizedTest
@ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class)
public void shouldSyncToPivotBlockInSingleSegmentWithPeerTaskSystem(
final DataStorageFormat storageFormat) throws IllegalAccessException {
PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(true);
setup(storageFormat);
otherBlockchainSetup.importFirstBlocks(30);
final RespondingEthPeer peer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);

@ -30,22 +30,32 @@ import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskFeatureToggleTestHelper;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class DownloadReceiptsStepTest {
private static ProtocolContext protocolContext;
private static MutableBlockchain blockchain;
private PeerTaskExecutor peerTaskExecutor;
private EthProtocolManager ethProtocolManager;
private DownloadReceiptsStep downloadReceiptsStep;
@ -59,6 +69,7 @@ public class DownloadReceiptsStepTest {
@BeforeEach
public void setUp() {
peerTaskExecutor = mock(PeerTaskExecutor.class);
TransactionPool transactionPool = mock(TransactionPool.class);
ethProtocolManager =
EthProtocolManagerTestUtil.create(
@ -69,11 +80,13 @@ public class DownloadReceiptsStepTest {
transactionPool,
EthProtocolConfiguration.defaultConfig());
downloadReceiptsStep =
new DownloadReceiptsStep(ethProtocolManager.ethContext(), new NoOpMetricsSystem());
new DownloadReceiptsStep(
ethProtocolManager.ethContext(), peerTaskExecutor, new NoOpMetricsSystem());
}
@Test
public void shouldDownloadReceiptsForBlocks() {
public void shouldDownloadReceiptsForBlocks() throws IllegalAccessException {
PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(false);
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final List<Block> blocks = asList(block(1), block(2), block(3), block(4));
@ -90,6 +103,32 @@ public class DownloadReceiptsStepTest {
blockWithReceipts(4)));
}
@Test
public void shouldDownloadReceiptsForBlocksUsingPeerTaskSystem()
throws IllegalAccessException, ExecutionException, InterruptedException {
PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(true);
final List<Block> blocks = asList(mockBlock(), mockBlock(), mockBlock(), mockBlock());
Map<BlockHeader, List<TransactionReceipt>> receiptsMap = new HashMap<>();
blocks.forEach(
(b) -> receiptsMap.put(b.getHeader(), List.of(Mockito.mock(TransactionReceipt.class))));
PeerTaskExecutorResult<Map<BlockHeader, List<TransactionReceipt>>> peerTaskResult =
new PeerTaskExecutorResult<>(receiptsMap, PeerTaskExecutorResponseCode.SUCCESS);
Mockito.when(peerTaskExecutor.execute(Mockito.any(GetReceiptsFromPeerTask.class)))
.thenReturn(peerTaskResult);
final CompletableFuture<List<BlockWithReceipts>> result = downloadReceiptsStep.apply(blocks);
assertThat(result.get().get(0).getBlock()).isEqualTo(blocks.get(0));
assertThat(result.get().get(0).getReceipts().size()).isEqualTo(1);
assertThat(result.get().get(1).getBlock()).isEqualTo(blocks.get(1));
assertThat(result.get().get(1).getReceipts().size()).isEqualTo(1);
assertThat(result.get().get(2).getBlock()).isEqualTo(blocks.get(2));
assertThat(result.get().get(2).getReceipts().size()).isEqualTo(1);
assertThat(result.get().get(3).getBlock()).isEqualTo(blocks.get(3));
assertThat(result.get().get(3).getReceipts().size()).isEqualTo(1);
}
private Block block(final long number) {
final BlockHeader header = blockchain.getBlockHeader(number).get();
return new Block(header, blockchain.getBlockBody(header.getHash()).get());
@ -100,4 +139,11 @@ public class DownloadReceiptsStepTest {
final List<TransactionReceipt> receipts = blockchain.getTxReceipts(block.getHash()).get();
return new BlockWithReceipts(block, receipts);
}
private Block mockBlock() {
final Block block = Mockito.mock(Block.class);
final BlockHeader blockHeader = Mockito.mock(BlockHeader.class);
Mockito.when(block.getHeader()).thenAnswer((invocationOnMock) -> blockHeader);
return block;
}
}

@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -71,6 +72,7 @@ public class FastDownloaderFactoryTest {
@Mock private ProtocolContext protocolContext;
@Mock private MetricsSystem metricsSystem;
@Mock private EthContext ethContext;
@Mock private PeerTaskExecutor peerTaskExecutor;
@Mock private SyncState syncState;
@Mock private Clock clock;
@Mock private Path dataDirectory;
@ -114,6 +116,7 @@ public class FastDownloaderFactoryTest {
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
@ -139,6 +142,7 @@ public class FastDownloaderFactoryTest {
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
@ -167,6 +171,7 @@ public class FastDownloaderFactoryTest {
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
@ -202,6 +207,7 @@ public class FastDownloaderFactoryTest {
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
@ -239,6 +245,7 @@ public class FastDownloaderFactoryTest {
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,

@ -34,6 +34,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
@ -536,6 +537,7 @@ public class FastSyncActionsTest {
protocolSchedule,
protocolContext,
ethContext,
new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()),
new SyncState(blockchain, ethContext.getEthPeers(), true, Optional.empty()),
pivotBlockSelector,
new NoOpMetricsSystem());

@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
@ -110,6 +111,7 @@ public class FastSyncChainDownloaderTest {
protocolSchedule,
protocolContext,
ethContext,
new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()),
syncState,
new NoOpMetricsSystem(),
new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()),

@ -44,6 +44,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@ -196,7 +197,8 @@ public class TestNode implements Closeable {
Collections.emptyList(),
Optional.empty(),
syncConfig,
scheduler);
scheduler,
new PeerManager());
final NetworkRunner networkRunner =
NetworkRunner.builder()

@ -45,6 +45,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
@ -318,7 +319,8 @@ public class TransactionPoolFactoryTest {
Optional.empty(),
mock(SynchronizerConfiguration.class),
mock(EthScheduler.class),
mock(ForkIdManager.class));
mock(ForkIdManager.class),
new PeerManager());
}
@Test

Loading…
Cancel
Save