diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 6bb3fb117c..bd10301a67 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -55,6 +55,9 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRequestSender; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.CheckpointBlocksPeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator; @@ -652,6 +655,11 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides } final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler); + final PeerManager peerManager = new PeerManager(); + ethPeers.streamAllPeers().forEach(peerManager::addPeer); + final PeerTaskExecutor peerTaskExecutor = + new PeerTaskExecutor( + peerManager, new PeerTaskRequestSender(), currentProtocolSpecSupplier, metricsSystem); final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode()); final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint); @@ -691,7 +699,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides scheduler, peerValidators, Optional.empty(), - forkIdManager); + forkIdManager, + peerManager); final PivotBlockSelector pivotBlockSelector = createPivotSelector( @@ -703,6 +712,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides worldStateStorageCoordinator, protocolContext, ethContext, + peerTaskExecutor, syncState, ethProtocolManager, pivotBlockSelector); @@ -835,6 +845,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final EthProtocolManager ethProtocolManager, final PivotBlockSelector pivotBlockSelector) { @@ -846,6 +857,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides worldStateStorageCoordinator, ethProtocolManager.getBlockBroadcaster(), ethContext, + peerTaskExecutor, syncState, dataDirectory, storageProvider, @@ -1035,7 +1047,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides final EthScheduler scheduler, final List peerValidators, final Optional mergePeerFilter, - final ForkIdManager forkIdManager) { + final ForkIdManager forkIdManager, + final PeerManager peerManager) { return new EthProtocolManager( protocolContext.getBlockchain(), networkId, @@ -1049,7 +1062,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides mergePeerFilter, synchronizerConfiguration, scheduler, - forkIdManager); + forkIdManager, + peerManager); } /** diff --git a/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java index 0d0f8d2fd8..897f2e3ace 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java @@ -42,6 +42,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -243,7 +244,8 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde final EthScheduler scheduler, final List peerValidators, final Optional mergePeerFilter, - final ForkIdManager forkIdManager) { + final ForkIdManager forkIdManager, + final PeerManager peerManager) { return besuControllerBuilderSchedule .get(0L) .createEthProtocolManager( @@ -257,7 +259,8 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde scheduler, peerValidators, mergePeerFilter, - forkIdManager); + forkIdManager, + peerManager); } @Override diff --git a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java index f5fc75959e..f860f399da 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java @@ -34,6 +34,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -99,7 +100,8 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder { final EthScheduler scheduler, final List peerValidators, final Optional mergePeerFilter, - final ForkIdManager forkIdManager) { + final ForkIdManager forkIdManager, + final PeerManager peerManager) { var mergeContext = protocolContext.getConsensusContext(MergeContext.class); @@ -129,7 +131,8 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder { scheduler, peerValidators, filterToUse, - forkIdManager); + forkIdManager, + peerManager); return ethProtocolManager; } diff --git a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java index ee2611d9c8..9b71e0bdc9 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java @@ -40,6 +40,8 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; @@ -163,7 +165,8 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder { final EthScheduler scheduler, final List peerValidators, final Optional mergePeerFilter, - final ForkIdManager forkIdManager) { + final ForkIdManager forkIdManager, + final PeerManager peerManager) { return mergeBesuControllerBuilder.createEthProtocolManager( protocolContext, synchronizerConfiguration, @@ -175,7 +178,8 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder { scheduler, peerValidators, mergePeerFilter, - forkIdManager); + forkIdManager, + peerManager); } @Override @@ -225,6 +229,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder { final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final EthProtocolManager ethProtocolManager, final PivotBlockSelector pivotBlockSelector) { @@ -235,6 +240,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder { worldStateStorageCoordinator, protocolContext, ethContext, + peerTaskExecutor, syncState, ethProtocolManager, pivotBlockSelector); diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/BodyValidator.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/BodyValidator.java new file mode 100644 index 0000000000..6c78337281 --- /dev/null +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/BodyValidator.java @@ -0,0 +1,89 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.mainnet; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.Request; +import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.core.Withdrawal; +import org.hyperledger.besu.evm.log.LogsBloomFilter; + +import java.util.List; + +/** A utility class for body validation tasks. Implemented utilising BodyValidation */ +public class BodyValidator { + + /** + * Generates the transaction root for a list of transactions + * + * @param transactions the transactions + * @return the transaction root + */ + public Hash transactionsRoot(final List transactions) { + return BodyValidation.transactionsRoot(transactions); + } + + /** + * Generates the withdrawals root for a list of withdrawals + * + * @param withdrawals the transactions + * @return the transaction root + */ + public Hash withdrawalsRoot(final List withdrawals) { + return BodyValidation.withdrawalsRoot(withdrawals); + } + + /** + * Generates the requests root for a list of requests + * + * @param requests list of request + * @return the requests root + */ + public Hash requestsRoot(final List requests) { + return BodyValidation.requestsRoot(requests); + } + + /** + * Generates the receipt root for a list of receipts + * + * @param receipts the receipts + * @return the receipt root + */ + public Hash receiptsRoot(final List receipts) { + return BodyValidation.receiptsRoot(receipts); + } + + /** + * Generates the ommers hash for a list of ommer block headers + * + * @param ommers the ommer block headers + * @return the ommers hash + */ + public Hash ommersHash(final List ommers) { + return BodyValidation.ommersHash(ommers); + } + + /** + * Generates the logs bloom filter for a list of transaction receipts + * + * @param receipts the transaction receipts + * @return the logs bloom filter + */ + public LogsBloomFilter logsBloom(final List receipts) { + return BodyValidation.logsBloom(receipts); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 58318f9611..d69910d59c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -23,6 +23,7 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.StatusMessage; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; @@ -69,6 +70,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { private final Hash genesisHash; private final ForkIdManager forkIdManager; + private final PeerManager peerManager; private final BigInteger networkId; private final EthPeers ethPeers; private final EthMessages ethMessages; @@ -92,7 +94,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final Optional mergePeerFilter, final SynchronizerConfiguration synchronizerConfiguration, final EthScheduler scheduler, - final ForkIdManager forkIdManager) { + final ForkIdManager forkIdManager, + final PeerManager peerManager) { this.networkId = networkId; this.peerValidators = peerValidators; this.scheduler = scheduler; @@ -102,6 +105,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO); this.forkIdManager = forkIdManager; + this.peerManager = peerManager; this.ethPeers = ethPeers; this.ethMessages = ethMessages; @@ -140,7 +144,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final List peerValidators, final Optional mergePeerFilter, final SynchronizerConfiguration synchronizerConfiguration, - final EthScheduler scheduler) { + final EthScheduler scheduler, + final PeerManager peerManager) { this( blockchain, networkId, @@ -158,7 +163,8 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { blockchain, Collections.emptyList(), Collections.emptyList(), - ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled())); + ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()), + peerManager); } public EthContext ethContext() { @@ -337,7 +343,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { public void handleNewConnection(final PeerConnection connection) { ethPeers.registerNewConnection(connection, peerValidators); final EthPeer peer = ethPeers.peer(connection); - + peerManager.addPeer(peer); final Capability cap = connection.capability(getSupportedProtocol()); final ForkId latestForkId = cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null; @@ -369,6 +375,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final DisconnectReason reason, final boolean initiatedByPeer) { final boolean wasActiveConnection = ethPeers.registerDisconnect(connection); + peerManager.removePeer(connection.getPeer()); LOG.atDebug() .setMessage("Disconnect - active Connection? {} - {} - {} - {} {} - {} peers left") .addArgument(wasActiveConnection) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java index f99fb3e8a2..f4760de884 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java @@ -23,7 +23,7 @@ import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskBehavior; import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage; import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage; -import org.hyperledger.besu.ethereum.mainnet.BodyValidation; +import org.hyperledger.besu.ethereum.mainnet.BodyValidator; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import java.util.ArrayList; @@ -33,71 +33,75 @@ import java.util.List; import java.util.Map; public class GetReceiptsFromPeerTask - implements PeerTask>> { + implements PeerTask>> { - private final Collection blockHeaders; - private final Map> headersByReceiptsRoot = new HashMap<>(); + private final Collection blockHeaders; + private final BodyValidator bodyValidator; + private final Map> headersByReceiptsRoot = new HashMap<>(); - public GetReceiptsFromPeerTask(final Collection blockHeaders) { - this.blockHeaders = blockHeaders; - blockHeaders.forEach( - header -> - headersByReceiptsRoot - .computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>()) - .add(header)); - } + public GetReceiptsFromPeerTask( + final Collection blockHeaders, final BodyValidator bodyValidator) { + this.blockHeaders = blockHeaders; + this.bodyValidator = bodyValidator; - @Override - public String getSubProtocol() { - return EthProtocol.NAME; - } + blockHeaders.forEach( + header -> + headersByReceiptsRoot + .computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>()) + .add(header)); + } - @Override - public long getRequiredBlockNumber() { - return blockHeaders.stream() - .mapToLong(BlockHeader::getNumber) - .max() - .orElse(BlockHeader.GENESIS_BLOCK_NUMBER); - } + @Override + public String getSubProtocol() { + return EthProtocol.NAME; + } - @Override - public MessageData getRequestMessage() { - // Since we have to match up the data by receipt root, we only need to request receipts - // for one of the headers with each unique receipt root. - final List blockHashes = - headersByReceiptsRoot.values().stream() - .map(headers -> headers.getFirst().getHash()) - .toList(); - return GetReceiptsMessage.create(blockHashes); - } + @Override + public long getRequiredBlockNumber() { + return blockHeaders.stream() + .mapToLong(BlockHeader::getNumber) + .max() + .orElse(BlockHeader.GENESIS_BLOCK_NUMBER); + } - @Override - public Map> parseResponse(final MessageData messageData) - throws InvalidPeerTaskResponseException { - if (messageData == null) { - throw new InvalidPeerTaskResponseException(); - } - final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(messageData); - final List> receiptsByBlock = receiptsMessage.receipts(); - if (receiptsByBlock.isEmpty() || receiptsByBlock.size() > blockHeaders.size()) { - throw new InvalidPeerTaskResponseException(); - } + @Override + public MessageData getRequestMessage() { + // Since we have to match up the data by receipt root, we only need to request receipts + // for one of the headers with each unique receipt root. + final List blockHashes = + headersByReceiptsRoot.values().stream() + .map(headers -> headers.getFirst().getHash()) + .toList(); + return GetReceiptsMessage.create(blockHashes); + } - final Map> receiptsByHeader = new HashMap<>(); - for (final List receiptsInBlock : receiptsByBlock) { - final List blockHeaders = - headersByReceiptsRoot.get(BodyValidation.receiptsRoot(receiptsInBlock)); - if (blockHeaders == null) { - // Contains receipts that we didn't request, so mustn't be the response we're looking for. - throw new InvalidPeerTaskResponseException(); - } - blockHeaders.forEach(header -> receiptsByHeader.put(header, receiptsInBlock)); - } - return receiptsByHeader; + @Override + public Map> parseResponse(final MessageData messageData) + throws InvalidPeerTaskResponseException { + if (messageData == null) { + throw new InvalidPeerTaskResponseException(); + } + final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(messageData); + final List> receiptsByBlock = receiptsMessage.receipts(); + if (receiptsByBlock.isEmpty() || receiptsByBlock.size() > blockHeaders.size()) { + throw new InvalidPeerTaskResponseException(); } - @Override - public Collection getPeerTaskBehaviors() { - return List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS, PeerTaskBehavior.RETRY_WITH_SAME_PEER); + final Map> receiptsByHeader = new HashMap<>(); + for (final List receiptsInBlock : receiptsByBlock) { + final List blockHeaders = + headersByReceiptsRoot.get(bodyValidator.receiptsRoot(receiptsInBlock)); + if (blockHeaders == null) { + // Contains receipts that we didn't request, so mustn't be the response we're looking for. + throw new InvalidPeerTaskResponseException(); + } + blockHeaders.forEach(header -> receiptsByHeader.put(header, receiptsInBlock)); } -} \ No newline at end of file + return receiptsByHeader; + } + + @Override + public Collection getPeerTaskBehaviors() { + return List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS, PeerTaskBehavior.RETRY_WITH_SAME_PEER); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index b7dc2adb16..f012bbd7ae 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.checkpointsync.CheckpointDownloaderFactory; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; @@ -82,6 +83,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi final WorldStateStorageCoordinator worldStateStorageCoordinator, final BlockBroadcaster blockBroadcaster, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final Path dataDirectory, final StorageProvider storageProvider, @@ -147,6 +149,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -163,6 +166,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -179,6 +183,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java index b4bdf58541..f255ec69d9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloadBlockStep.java @@ -16,17 +16,24 @@ package org.hyperledger.besu.ethereum.eth.sync.checkpointsync; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskFeatureToggle; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.ethereum.eth.manager.task.GetBlockFromPeerTask; -import org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask; import org.hyperledger.besu.ethereum.eth.sync.fastsync.checkpoint.Checkpoint; +import org.hyperledger.besu.ethereum.mainnet.BodyValidator; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -34,16 +41,19 @@ public class CheckpointDownloadBlockStep { private final ProtocolSchedule protocolSchedule; private final EthContext ethContext; + private final PeerTaskExecutor peerTaskExecutor; private final Checkpoint checkpoint; private final MetricsSystem metricsSystem; public CheckpointDownloadBlockStep( final ProtocolSchedule protocolSchedule, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final Checkpoint checkpoint, final MetricsSystem metricsSystem) { this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; this.checkpoint = checkpoint; this.metricsSystem = metricsSystem; } @@ -65,17 +75,43 @@ public class CheckpointDownloadBlockStep { private CompletableFuture> downloadReceipts( final PeerTaskResult peerTaskResult) { final Block block = peerTaskResult.getResult(); - final GetReceiptsFromPeerTask getReceiptsFromPeerTask = - GetReceiptsFromPeerTask.forHeaders(ethContext, List.of(block.getHeader()), metricsSystem); - return getReceiptsFromPeerTask - .run() - .thenCompose( - receiptTaskResult -> { - final Optional> transactionReceipts = - Optional.ofNullable(receiptTaskResult.getResult().get(block.getHeader())); - return CompletableFuture.completedFuture( - transactionReceipts.map(receipts -> new BlockWithReceipts(block, receipts))); - }) - .exceptionally(throwable -> Optional.empty()); + if (PeerTaskFeatureToggle.usePeerTaskSystem()) { + CompletableFuture> futureReceipts = new CompletableFuture<>(); + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask(List.of(block.getHeader()), new BodyValidator()); + PeerTaskExecutorResult>> executorResult = + peerTaskExecutor.execute(task); + + if (executorResult.getResponseCode() == PeerTaskExecutorResponseCode.SUCCESS) { + List transactionReceipts = + executorResult + .getResult() + .map((map) -> map.get(block.getHeader())) + .orElseThrow( + () -> + new IllegalStateException("PeerTask response code was success, but empty")); + BlockWithReceipts blockWithReceipts = new BlockWithReceipts(block, transactionReceipts); + futureReceipts.complete(Optional.of(blockWithReceipts)); + } else { + futureReceipts.complete(Optional.empty()); + } + return futureReceipts; + + } else { + final org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask + getReceiptsFromPeerTask = + org.hyperledger.besu.ethereum.eth.manager.task.GetReceiptsFromPeerTask.forHeaders( + ethContext, List.of(block.getHeader()), metricsSystem); + return getReceiptsFromPeerTask + .run() + .thenCompose( + receiptTaskResult -> { + final Optional> transactionReceipts = + Optional.ofNullable(receiptTaskResult.getResult().get(block.getHeader())); + return CompletableFuture.completedFuture( + transactionReceipts.map(receipts -> new BlockWithReceipts(block, receipts))); + }) + .exceptionally(throwable -> Optional.empty()); + } } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java index 03df47e440..30134d9f6c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java @@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.checkpointsync; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -61,6 +62,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory { final ProtocolContext protocolContext, final MetricsSystem metricsSystem, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final WorldStateStorageCoordinator worldStateStorageCoordinator, final SyncState syncState, final Clock clock, @@ -110,6 +112,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory { protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem); @@ -127,6 +130,7 @@ public class CheckpointDownloaderFactory extends SnapDownloaderFactory { protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java index 5096b74e24..61b997e6c5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java @@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.checkpointsync; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -34,6 +35,7 @@ public class CheckpointSyncActions extends FastSyncActions { final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final PivotBlockSelector pivotBlockSelector, final MetricsSystem metricsSystem) { @@ -43,6 +45,7 @@ public class CheckpointSyncActions extends FastSyncActions { protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem); @@ -57,6 +60,7 @@ public class CheckpointSyncActions extends FastSyncActions { protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, metricsSystem, currentState, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java index 5450b9e5a4..2590e4736a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncChainDownloader.java @@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.checkpointsync; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -36,6 +37,7 @@ public class CheckpointSyncChainDownloader extends FastSyncChainDownloader { final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final MetricsSystem metricsSystem, final FastSyncState fastSyncState, @@ -55,7 +57,13 @@ public class CheckpointSyncChainDownloader extends FastSyncChainDownloader { syncState, syncTargetManager, new CheckpointSyncDownloadPipelineFactory( - config, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem), + config, + protocolSchedule, + protocolContext, + ethContext, + peerTaskExecutor, + fastSyncState, + metricsSystem), ethContext.getScheduler(), metricsSystem, syncDurationMetrics); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java index 45f3f243d8..714dfcb4af 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncDownloadPipelineFactory.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloadPipelineFactory; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; @@ -40,9 +41,17 @@ public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipel final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final FastSyncState fastSyncState, final MetricsSystem metricsSystem) { - super(syncConfig, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem); + super( + syncConfig, + protocolSchedule, + protocolContext, + ethContext, + peerTaskExecutor, + fastSyncState, + metricsSystem); } @Override @@ -76,7 +85,8 @@ public class CheckpointSyncDownloadPipelineFactory extends FastSyncDownloadPipel checkPointSource, checkpoint, protocolContext.getBlockchain()); final CheckpointDownloadBlockStep checkPointDownloadBlockStep = - new CheckpointDownloadBlockStep(protocolSchedule, ethContext, checkpoint, metricsSystem); + new CheckpointDownloadBlockStep( + protocolSchedule, ethContext, peerTaskExecutor, checkpoint, metricsSystem); return PipelineBuilder.createPipelineFrom( "fetchCheckpoints", diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java index cd57de371d..bd8b0a8a42 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java @@ -22,10 +22,17 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskFeatureToggle; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask; import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask; +import org.hyperledger.besu.ethereum.mainnet.BodyValidator; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.util.FutureUtils; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -34,18 +41,36 @@ import java.util.function.Function; public class DownloadReceiptsStep implements Function, CompletableFuture>> { private final EthContext ethContext; + private final PeerTaskExecutor peerTaskExecutor; private final MetricsSystem metricsSystem; - public DownloadReceiptsStep(final EthContext ethContext, final MetricsSystem metricsSystem) { + public DownloadReceiptsStep( + final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, + final MetricsSystem metricsSystem) { this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; this.metricsSystem = metricsSystem; } @Override public CompletableFuture> apply(final List blocks) { final List headers = blocks.stream().map(Block::getHeader).collect(toList()); - final CompletableFuture>> getReceipts = - GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem).run(); + final CompletableFuture>> getReceipts; + if (PeerTaskFeatureToggle.usePeerTaskSystem()) { + GetReceiptsFromPeerTask getReceiptsFromPeerTask = + new GetReceiptsFromPeerTask(headers, new BodyValidator()); + PeerTaskExecutorResult>> getReceiptsResult = + peerTaskExecutor.execute(getReceiptsFromPeerTask); + if (getReceiptsResult.getResponseCode() == PeerTaskExecutorResponseCode.SUCCESS + && getReceiptsResult.getResult().isPresent()) { + getReceipts = CompletableFuture.completedFuture(getReceiptsResult.getResult().get()); + } else { + getReceipts = CompletableFuture.completedFuture(Collections.emptyMap()); + } + } else { + getReceipts = GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem).run(); + } final CompletableFuture> combineWithBlocks = getReceipts.thenApply( receiptsByHeader -> combineBlocksAndReceipts(blocks, receiptsByHeader)); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java index 7f6bbae3f3..58a64bd562 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -19,6 +19,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; @@ -48,6 +49,7 @@ public class FastSyncActions { protected final ProtocolSchedule protocolSchedule; protected final ProtocolContext protocolContext; protected final EthContext ethContext; + protected final PeerTaskExecutor peerTaskExecutor; protected final SyncState syncState; protected final PivotBlockSelector pivotBlockSelector; protected final MetricsSystem metricsSystem; @@ -60,6 +62,7 @@ public class FastSyncActions { final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final PivotBlockSelector pivotBlockSelector, final MetricsSystem metricsSystem) { @@ -68,6 +71,7 @@ public class FastSyncActions { this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; this.syncState = syncState; this.pivotBlockSelector = pivotBlockSelector; this.metricsSystem = metricsSystem; @@ -164,6 +168,7 @@ public class FastSyncActions { protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, metricsSystem, currentState, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java index c36ff7cb48..1bf55a3811 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloader.java @@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -35,6 +36,7 @@ public class FastSyncChainDownloader { final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final SyncState syncState, final MetricsSystem metricsSystem, final FastSyncState fastSyncState, @@ -53,7 +55,13 @@ public class FastSyncChainDownloader { syncState, syncTargetManager, new FastSyncDownloadPipelineFactory( - config, protocolSchedule, protocolContext, ethContext, fastSyncState, metricsSystem), + config, + protocolSchedule, + protocolContext, + ethContext, + peerTaskExecutor, + fastSyncState, + metricsSystem), ethContext.getScheduler(), metricsSystem, syncDurationMetrics); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java index 07e964426c..63df4ee0cb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep; import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep; import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory; @@ -57,6 +58,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory protected final ProtocolSchedule protocolSchedule; protected final ProtocolContext protocolContext; protected final EthContext ethContext; + protected final PeerTaskExecutor peerTaskExecutor; protected final FastSyncState fastSyncState; protected final MetricsSystem metricsSystem; protected final FastSyncValidationPolicy attachedValidationPolicy; @@ -68,12 +70,14 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final FastSyncState fastSyncState, final MetricsSystem metricsSystem) { this.syncConfig = syncConfig; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; this.fastSyncState = fastSyncState; this.metricsSystem = metricsSystem; final LabelledMetric fastSyncValidationCounter = @@ -140,7 +144,7 @@ public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory final DownloadBodiesStep downloadBodiesStep = new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem); final DownloadReceiptsStep downloadReceiptsStep = - new DownloadReceiptsStep(ethContext, metricsSystem); + new DownloadReceiptsStep(ethContext, peerTaskExecutor, metricsSystem); final ImportBlocksStep importBlockStep = new ImportBlocksStep( protocolSchedule, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index 8b71a57885..1d775cc80f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -59,6 +60,7 @@ public class FastDownloaderFactory { final ProtocolContext protocolContext, final MetricsSystem metricsSystem, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final WorldStateStorageCoordinator worldStateStorageCoordinator, final SyncState syncState, final Clock clock, @@ -126,6 +128,7 @@ public class FastDownloaderFactory { protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem), diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java index 5de8ceb984..6c5ce0b04e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java @@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -57,6 +58,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory { final ProtocolContext protocolContext, final MetricsSystem metricsSystem, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final WorldStateStorageCoordinator worldStateStorageCoordinator, final SyncState syncState, final Clock clock, @@ -121,6 +123,7 @@ public class SnapDownloaderFactory extends FastDownloaderFactory { protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, pivotBlockSelector, metricsSystem), diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java index 3a3331b568..762600bdd0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java @@ -44,6 +44,7 @@ import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.EthProtocolVersion; import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection.PeerSendHandler; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager; import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage; import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; @@ -1243,7 +1244,8 @@ public final class EthProtocolManagerTest { Optional.empty(), syncConfig, mock(EthScheduler.class), - mock(ForkIdManager.class))) { + mock(ForkIdManager.class), + new PeerManager())) { return ethManager; } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java index 0b0bd1e3eb..d8b5029d95 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -30,6 +30,7 @@ import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture; import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; @@ -117,7 +118,8 @@ public class EthProtocolManagerTestUtil { mergePeerFilter, mock(SynchronizerConfiguration.class), ethScheduler, - new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false)); + new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false), + new PeerManager()); } public static EthProtocolManager create( @@ -168,7 +170,8 @@ public class EthProtocolManagerTestUtil { Optional.empty(), mock(SynchronizerConfiguration.class), ethScheduler, - forkIdManager); + forkIdManager, + new PeerManager()); } public static EthProtocolManager create(final Blockchain blockchain) { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskFeatureToggleTestHelper.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskFeatureToggleTestHelper.java new file mode 100644 index 0000000000..4410ebc40e --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskFeatureToggleTestHelper.java @@ -0,0 +1,34 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import java.lang.reflect.Field; + +import org.junit.platform.commons.util.ReflectionUtils; + +public class PeerTaskFeatureToggleTestHelper { + + public static void setPeerTaskFeatureToggle(final boolean usePeerTaskSystem) + throws IllegalAccessException { + Field usePeerTaskSystemField = + ReflectionUtils.findFields( + PeerTaskFeatureToggle.class, + (f) -> f.getName().equals("USE_PEER_TASK_SYSTEM"), + ReflectionUtils.HierarchyTraversalMode.TOP_DOWN) + .getFirst(); + usePeerTaskSystemField.setAccessible(true); + usePeerTaskSystemField.set(null, usePeerTaskSystem); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java new file mode 100644 index 0000000000..131026a276 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java @@ -0,0 +1,152 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask.task; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; +import org.hyperledger.besu.ethereum.eth.messages.EthPV63; +import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage; +import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage; +import org.hyperledger.besu.ethereum.mainnet.BodyValidator; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class GetReceiptsFromPeerTaskTest { + + @Test + public void testGetSubProtocol() { + GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null); + Assertions.assertEquals(EthProtocol.NAME, task.getSubProtocol()); + } + + @Test + public void testGetRequiredBlockNumber() { + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask( + List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null); + Assertions.assertEquals(3, task.getRequiredBlockNumber()); + } + + @Test + public void testGetRequestMessage() { + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask( + List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null); + + MessageData messageData = task.getRequestMessage(); + GetReceiptsMessage getReceiptsMessage = GetReceiptsMessage.readFrom(messageData); + + Assertions.assertEquals(EthPV63.GET_RECEIPTS, getReceiptsMessage.getCode()); + Iterable hashesInMessage = getReceiptsMessage.hashes(); + List expectedHashes = + List.of( + Hash.fromHexString(StringUtils.repeat("00", 31) + "11"), + Hash.fromHexString(StringUtils.repeat("00", 31) + "21"), + Hash.fromHexString(StringUtils.repeat("00", 31) + "31")); + List actualHashes = new ArrayList<>(); + hashesInMessage.forEach(actualHashes::add); + + Assertions.assertEquals(3, actualHashes.size()); + Assertions.assertEquals( + expectedHashes.stream().sorted().toList(), actualHashes.stream().sorted().toList()); + } + + @Test + public void testParseResponseWithNullResponseMessage() { + GetReceiptsFromPeerTask task = new GetReceiptsFromPeerTask(Collections.emptyList(), null); + Assertions.assertThrows(InvalidPeerTaskResponseException.class, () -> task.parseResponse(null)); + } + + @Test + public void testParseResponseForInvalidResponse() throws InvalidPeerTaskResponseException { + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask( + List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null); + ReceiptsMessage receiptsMessage = + ReceiptsMessage.create( + List.of( + List.of(new TransactionReceipt(1, 123, Collections.emptyList(), Optional.empty())), + List.of(new TransactionReceipt(1, 456, Collections.emptyList(), Optional.empty())), + List.of(new TransactionReceipt(1, 789, Collections.emptyList(), Optional.empty())), + List.of( + new TransactionReceipt(1, 101112, Collections.emptyList(), Optional.empty())))); + + Assertions.assertThrows( + InvalidPeerTaskResponseException.class, () -> task.parseResponse(receiptsMessage)); + } + + @Test + public void testParseResponse() throws InvalidPeerTaskResponseException { + BodyValidator bodyValidator = Mockito.mock(BodyValidator.class); + BlockHeader blockHeader1 = mockBlockHeader(1); + BlockHeader blockHeader2 = mockBlockHeader(2); + BlockHeader blockHeader3 = mockBlockHeader(3); + + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask( + List.of(blockHeader1, blockHeader2, blockHeader3), bodyValidator); + + TransactionReceipt receiptForBlock1 = + new TransactionReceipt(1, 123, Collections.emptyList(), Optional.empty()); + TransactionReceipt receiptForBlock2 = + new TransactionReceipt(1, 456, Collections.emptyList(), Optional.empty()); + TransactionReceipt receiptForBlock3 = + new TransactionReceipt(1, 789, Collections.emptyList(), Optional.empty()); + ReceiptsMessage receiptsMessage = + ReceiptsMessage.create( + List.of( + List.of(receiptForBlock1), List.of(receiptForBlock2), List.of(receiptForBlock3))); + + Mockito.when(bodyValidator.receiptsRoot(List.of(receiptForBlock1))) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + "12")); + Mockito.when(bodyValidator.receiptsRoot(List.of(receiptForBlock2))) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + "22")); + Mockito.when(bodyValidator.receiptsRoot(List.of(receiptForBlock3))) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + "32")); + + Map> resultMap = task.parseResponse(receiptsMessage); + + Assertions.assertEquals(3, resultMap.size()); + Assertions.assertEquals(List.of(receiptForBlock1), resultMap.get(blockHeader1)); + Assertions.assertEquals(List.of(receiptForBlock2), resultMap.get(blockHeader2)); + Assertions.assertEquals(List.of(receiptForBlock3), resultMap.get(blockHeader3)); + } + + private BlockHeader mockBlockHeader(final long blockNumber) { + BlockHeader blockHeader = Mockito.mock(BlockHeader.class); + Mockito.when(blockHeader.getNumber()).thenReturn(blockNumber); + // second to last hex digit indicates the blockNumber, last hex digit indicates the usage of the + // hash + Mockito.when(blockHeader.getHash()) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "1")); + Mockito.when(blockHeader.getReceiptsRoot()) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "2")); + + return blockHeader; + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java index 43f03100a7..f6b22aa967 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckPointSyncChainDownloaderTest.java @@ -22,13 +22,20 @@ import static org.mockito.Mockito.when; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; import org.hyperledger.besu.ethereum.core.Difficulty; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskFeatureToggleTestHelper; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; @@ -44,8 +51,15 @@ import org.hyperledger.besu.metrics.SyncDurationMetrics; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.storage.DataStorageFormat; +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -55,12 +69,16 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.platform.commons.util.ReflectionUtils; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class CheckPointSyncChainDownloaderTest { protected ProtocolSchedule protocolSchedule; protected EthProtocolManager ethProtocolManager; protected EthContext ethContext; + private PeerTaskExecutor peerTaskExecutor; protected ProtocolContext protocolContext; private SyncState syncState; @@ -100,6 +118,7 @@ public class CheckPointSyncChainDownloaderTest { localBlockchain = localBlockchainSetup.getBlockchain(); otherBlockchainSetup = BlockchainSetupUtil.forTesting(dataStorageFormat); otherBlockchain = otherBlockchainSetup.getBlockchain(); + otherBlockchainSetup.importFirstBlocks(30); protocolSchedule = localBlockchainSetup.getProtocolSchedule(); protocolContext = localBlockchainSetup.getProtocolContext(); ethProtocolManager = @@ -123,6 +142,57 @@ public class CheckPointSyncChainDownloaderTest { ethContext.getEthPeers(), true, Optional.of(checkpoint)); + + peerTaskExecutor = mock(PeerTaskExecutor.class); + + when(peerTaskExecutor.execute(any(GetReceiptsFromPeerTask.class))) + .thenAnswer( + new Answer>>>() { + @Override + public PeerTaskExecutorResult>> answer( + final InvocationOnMock invocationOnMock) throws Throwable { + GetReceiptsFromPeerTask task = + invocationOnMock.getArgument(0, GetReceiptsFromPeerTask.class); + + return processTask(task); + } + }); + + when(peerTaskExecutor.executeAsync(any(GetReceiptsFromPeerTask.class))) + .thenAnswer( + new Answer< + CompletableFuture< + PeerTaskExecutorResult>>>>() { + @Override + public CompletableFuture< + PeerTaskExecutorResult>>> + answer(final InvocationOnMock invocationOnMock) throws Throwable { + GetReceiptsFromPeerTask task = + invocationOnMock.getArgument(0, GetReceiptsFromPeerTask.class); + + return CompletableFuture.completedFuture(processTask(task)); + } + }); + } + + @SuppressWarnings("unchecked") + private PeerTaskExecutorResult>> processTask( + final GetReceiptsFromPeerTask task) throws IllegalAccessException { + Map> getReceiptsFromPeerTaskResult = new HashMap<>(); + List fields = + ReflectionUtils.findFields( + task.getClass(), + (field) -> field.getName().equals("blockHeaders"), + ReflectionUtils.HierarchyTraversalMode.TOP_DOWN); + fields.forEach((f) -> f.setAccessible(true)); + Collection blockHeaders = (Collection) fields.getFirst().get(task); + blockHeaders.forEach( + (bh) -> + getReceiptsFromPeerTaskResult.put( + bh, otherBlockchain.getTxReceipts(bh.getHash()).get())); + + return new PeerTaskExecutorResult<>( + getReceiptsFromPeerTaskResult, PeerTaskExecutorResponseCode.SUCCESS); } @AfterEach @@ -140,6 +210,7 @@ public class CheckPointSyncChainDownloaderTest { protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, syncState, new NoOpMetricsSystem(), new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()), @@ -148,9 +219,10 @@ public class CheckPointSyncChainDownloaderTest { @ParameterizedTest @ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class) - public void shouldSyncToPivotBlockInMultipleSegments(final DataStorageFormat storageFormat) { + public void shouldSyncToPivotBlockInMultipleSegments(final DataStorageFormat storageFormat) + throws IllegalAccessException { + PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(false); setup(storageFormat); - otherBlockchainSetup.importFirstBlocks(30); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); @@ -184,9 +256,81 @@ public class CheckPointSyncChainDownloaderTest { @ParameterizedTest @ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class) - public void shouldSyncToPivotBlockInSingleSegment(final DataStorageFormat storageFormat) { + public void shouldSyncToPivotBlockInSingleSegment(final DataStorageFormat storageFormat) + throws IllegalAccessException { + PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(false); + setup(storageFormat); + + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder(otherBlockchain); + + final long pivotBlockNumber = 10; + final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder().build(); + ethContext + .getEthPeers() + .streamAvailablePeers() + .forEach( + ethPeer -> { + ethPeer.setCheckpointHeader( + otherBlockchainSetup.getBlocks().get((int) checkpoint.blockNumber()).getHeader()); + }); + final ChainDownloader downloader = downloader(syncConfig, pivotBlockNumber); + final CompletableFuture result = downloader.start(); + + peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone()); + + assertThat(result).isCompleted(); + assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber); + assertThat(localBlockchain.getChainHeadHeader()) + .isEqualTo(otherBlockchain.getBlockHeader(pivotBlockNumber).get()); + } + + @ParameterizedTest + @ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class) + public void shouldSyncToPivotBlockInMultipleSegmentsWithPeerTaskSystem( + final DataStorageFormat storageFormat) + throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { + PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(true); + setup(storageFormat); + + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder(otherBlockchain); + + final SynchronizerConfiguration syncConfig = + SynchronizerConfiguration.builder() + .downloaderChainSegmentSize(5) + .downloaderHeadersRequestSize(3) + .build(); + final long pivotBlockNumber = 25; + ethContext + .getEthPeers() + .streamAvailablePeers() + .forEach( + ethPeer -> { + ethPeer.setCheckpointHeader( + otherBlockchainSetup.getBlocks().get((int) checkpoint.blockNumber()).getHeader()); + }); + final ChainDownloader downloader = downloader(syncConfig, pivotBlockNumber); + final CompletableFuture result = downloader.start(); + + peer.respondWhileOtherThreadsWork(responder, () -> !result.isDone()); + + assertThat(result).isCompleted(); + assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(pivotBlockNumber); + assertThat(localBlockchain.getChainHeadHeader()) + .isEqualTo(otherBlockchain.getBlockHeader(pivotBlockNumber).get()); + } + + @ParameterizedTest + @ArgumentsSource(CheckPointSyncChainDownloaderTestArguments.class) + public void shouldSyncToPivotBlockInSingleSegmentWithPeerTaskSystem( + final DataStorageFormat storageFormat) throws IllegalAccessException { + PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(true); setup(storageFormat); - otherBlockchainSetup.importFirstBlocks(30); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java index c9cfeda119..1fb86a82f7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java @@ -30,22 +30,32 @@ import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskFeatureToggleTestHelper; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetReceiptsFromPeerTask; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.storage.DataStorageFormat; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class DownloadReceiptsStepTest { private static ProtocolContext protocolContext; private static MutableBlockchain blockchain; + private PeerTaskExecutor peerTaskExecutor; private EthProtocolManager ethProtocolManager; private DownloadReceiptsStep downloadReceiptsStep; @@ -59,6 +69,7 @@ public class DownloadReceiptsStepTest { @BeforeEach public void setUp() { + peerTaskExecutor = mock(PeerTaskExecutor.class); TransactionPool transactionPool = mock(TransactionPool.class); ethProtocolManager = EthProtocolManagerTestUtil.create( @@ -69,11 +80,13 @@ public class DownloadReceiptsStepTest { transactionPool, EthProtocolConfiguration.defaultConfig()); downloadReceiptsStep = - new DownloadReceiptsStep(ethProtocolManager.ethContext(), new NoOpMetricsSystem()); + new DownloadReceiptsStep( + ethProtocolManager.ethContext(), peerTaskExecutor, new NoOpMetricsSystem()); } @Test - public void shouldDownloadReceiptsForBlocks() { + public void shouldDownloadReceiptsForBlocks() throws IllegalAccessException { + PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(false); final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); final List blocks = asList(block(1), block(2), block(3), block(4)); @@ -90,6 +103,32 @@ public class DownloadReceiptsStepTest { blockWithReceipts(4))); } + @Test + public void shouldDownloadReceiptsForBlocksUsingPeerTaskSystem() + throws IllegalAccessException, ExecutionException, InterruptedException { + PeerTaskFeatureToggleTestHelper.setPeerTaskFeatureToggle(true); + + final List blocks = asList(mockBlock(), mockBlock(), mockBlock(), mockBlock()); + Map> receiptsMap = new HashMap<>(); + blocks.forEach( + (b) -> receiptsMap.put(b.getHeader(), List.of(Mockito.mock(TransactionReceipt.class)))); + PeerTaskExecutorResult>> peerTaskResult = + new PeerTaskExecutorResult<>(receiptsMap, PeerTaskExecutorResponseCode.SUCCESS); + Mockito.when(peerTaskExecutor.execute(Mockito.any(GetReceiptsFromPeerTask.class))) + .thenReturn(peerTaskResult); + + final CompletableFuture> result = downloadReceiptsStep.apply(blocks); + + assertThat(result.get().get(0).getBlock()).isEqualTo(blocks.get(0)); + assertThat(result.get().get(0).getReceipts().size()).isEqualTo(1); + assertThat(result.get().get(1).getBlock()).isEqualTo(blocks.get(1)); + assertThat(result.get().get(1).getReceipts().size()).isEqualTo(1); + assertThat(result.get().get(2).getBlock()).isEqualTo(blocks.get(2)); + assertThat(result.get().get(2).getReceipts().size()).isEqualTo(1); + assertThat(result.get().get(3).getBlock()).isEqualTo(blocks.get(3)); + assertThat(result.get().get(3).getReceipts().size()).isEqualTo(1); + } + private Block block(final long number) { final BlockHeader header = blockchain.getBlockHeader(number).get(); return new Block(header, blockchain.getBlockBody(header.getHash()).get()); @@ -100,4 +139,11 @@ public class DownloadReceiptsStepTest { final List receipts = blockchain.getTxReceipts(block.getHash()).get(); return new BlockWithReceipts(block, receipts); } + + private Block mockBlock() { + final Block block = Mockito.mock(Block.class); + final BlockHeader blockHeader = Mockito.mock(BlockHeader.class); + Mockito.when(block.getHeader()).thenAnswer((invocationOnMock) -> blockHeader); + return block; + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java index 37ca5be2e9..bc493ebd03 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -71,6 +72,7 @@ public class FastDownloaderFactoryTest { @Mock private ProtocolContext protocolContext; @Mock private MetricsSystem metricsSystem; @Mock private EthContext ethContext; + @Mock private PeerTaskExecutor peerTaskExecutor; @Mock private SyncState syncState; @Mock private Clock clock; @Mock private Path dataDirectory; @@ -114,6 +116,7 @@ public class FastDownloaderFactoryTest { protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -139,6 +142,7 @@ public class FastDownloaderFactoryTest { protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -167,6 +171,7 @@ public class FastDownloaderFactoryTest { protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -202,6 +207,7 @@ public class FastDownloaderFactoryTest { protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, @@ -239,6 +245,7 @@ public class FastDownloaderFactoryTest { protocolContext, metricsSystem, ethContext, + peerTaskExecutor, worldStateStorageCoordinator, syncState, clock, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index 68caf2182c..6d194cb8e0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -34,6 +34,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; @@ -536,6 +537,7 @@ public class FastSyncActionsTest { protocolSchedule, protocolContext, ethContext, + new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()), new SyncState(blockchain, ethContext.getEthPeers(), true, Optional.empty()), pivotBlockSelector, new NoOpMetricsSystem()); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java index 34014246d2..da82034eaa 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java @@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; @@ -110,6 +111,7 @@ public class FastSyncChainDownloaderTest { protocolSchedule, protocolContext, ethContext, + new PeerTaskExecutor(null, null, null, new NoOpMetricsSystem()), syncState, new NoOpMetricsSystem(), new FastSyncState(otherBlockchain.getBlockHeader(pivotBlockNumber).get()), diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java index c679183b0f..7f1a342311 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java @@ -44,6 +44,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -196,7 +197,8 @@ public class TestNode implements Closeable { Collections.emptyList(), Optional.empty(), syncConfig, - scheduler); + scheduler, + new PeerManager()); final NetworkRunner networkRunner = NetworkRunner.builder() diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java index 5742637c3e..c9ef635c0f 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java @@ -45,6 +45,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -318,7 +319,8 @@ public class TransactionPoolFactoryTest { Optional.empty(), mock(SynchronizerConfiguration.class), mock(EthScheduler.class), - mock(ForkIdManager.class)); + mock(ForkIdManager.class), + new PeerManager()); } @Test