diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index b85ad75736..61c6d057c8 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -55,8 +55,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors; -import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector; -import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRequestSender; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; @@ -656,10 +654,8 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides } final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler); - final PeerSelector peerSelector = new DefaultPeerSelector(currentProtocolSpecSupplier); - ethPeers.streamAllPeers().forEach(peerSelector::addPeer); final PeerTaskExecutor peerTaskExecutor = - new PeerTaskExecutor(peerSelector, new PeerTaskRequestSender(), scheduler, metricsSystem); + new PeerTaskExecutor(ethPeers, new PeerTaskRequestSender(), scheduler, metricsSystem); final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode()); final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint); @@ -699,8 +695,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides scheduler, peerValidators, Optional.empty(), - forkIdManager, - peerSelector); + forkIdManager); final PivotBlockSelector pivotBlockSelector = createPivotSelector( @@ -1035,7 +1030,6 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides * @param peerValidators the peer validators * @param mergePeerFilter the merge peer filter * @param forkIdManager the fork id manager - * @param peerSelector the PeerSelector * @return the eth protocol manager */ protected EthProtocolManager createEthProtocolManager( @@ -1049,8 +1043,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides final EthScheduler scheduler, final List peerValidators, final Optional mergePeerFilter, - final ForkIdManager forkIdManager, - final PeerSelector peerSelector) { + final ForkIdManager forkIdManager) { return new EthProtocolManager( protocolContext.getBlockchain(), networkId, @@ -1064,8 +1057,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides mergePeerFilter, synchronizerConfiguration, scheduler, - forkIdManager, - peerSelector); + forkIdManager); } /** diff --git a/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java index 2e28a3179c..0d0f8d2fd8 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/ConsensusScheduleBesuControllerBuilder.java @@ -42,7 +42,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; -import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -244,8 +243,7 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde final EthScheduler scheduler, final List peerValidators, final Optional mergePeerFilter, - final ForkIdManager forkIdManager, - final PeerSelector peerSelector) { + final ForkIdManager forkIdManager) { return besuControllerBuilderSchedule .get(0L) .createEthProtocolManager( @@ -259,8 +257,7 @@ public class ConsensusScheduleBesuControllerBuilder extends BesuControllerBuilde scheduler, peerValidators, mergePeerFilter, - forkIdManager, - peerSelector); + forkIdManager); } @Override diff --git a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java index 88629a7441..f5fc75959e 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java @@ -34,7 +34,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; -import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -100,8 +99,7 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder { final EthScheduler scheduler, final List peerValidators, final Optional mergePeerFilter, - final ForkIdManager forkIdManager, - final PeerSelector peerSelector) { + final ForkIdManager forkIdManager) { var mergeContext = protocolContext.getConsensusContext(MergeContext.class); @@ -131,8 +129,7 @@ public class MergeBesuControllerBuilder extends BesuControllerBuilder { scheduler, peerValidators, filterToUse, - forkIdManager, - peerSelector); + forkIdManager); return ethProtocolManager; } diff --git a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java index 0f56d0c618..703592f90a 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java @@ -40,7 +40,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; -import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer; @@ -165,8 +164,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder { final EthScheduler scheduler, final List peerValidators, final Optional mergePeerFilter, - final ForkIdManager forkIdManager, - final PeerSelector peerSelector) { + final ForkIdManager forkIdManager) { return mergeBesuControllerBuilder.createEthProtocolManager( protocolContext, synchronizerConfiguration, @@ -178,8 +176,7 @@ public class TransitionBesuControllerBuilder extends BesuControllerBuilder { scheduler, peerValidators, mergePeerFilter, - forkIdManager, - peerSelector); + forkIdManager); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 97e91f6bad..30cd03c15c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -23,7 +23,6 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; -import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.StatusMessage; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; @@ -70,7 +69,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { private final Hash genesisHash; private final ForkIdManager forkIdManager; - private final PeerSelector peerSelector; private final BigInteger networkId; private final EthPeers ethPeers; private final EthMessages ethMessages; @@ -94,8 +92,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final Optional mergePeerFilter, final SynchronizerConfiguration synchronizerConfiguration, final EthScheduler scheduler, - final ForkIdManager forkIdManager, - final PeerSelector peerSelector) { + final ForkIdManager forkIdManager) { this.networkId = networkId; this.peerValidators = peerValidators; this.scheduler = scheduler; @@ -105,7 +102,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO); this.forkIdManager = forkIdManager; - this.peerSelector = peerSelector; this.ethPeers = ethPeers; this.ethMessages = ethMessages; @@ -144,8 +140,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final List peerValidators, final Optional mergePeerFilter, final SynchronizerConfiguration synchronizerConfiguration, - final EthScheduler scheduler, - final PeerSelector peerSelector) { + final EthScheduler scheduler) { this( blockchain, networkId, @@ -163,8 +158,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { blockchain, Collections.emptyList(), Collections.emptyList(), - ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled()), - peerSelector); + ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled())); } public EthContext ethContext() { @@ -343,7 +337,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { public void handleNewConnection(final PeerConnection connection) { ethPeers.registerNewConnection(connection, peerValidators); final EthPeer peer = ethPeers.peer(connection); - peerSelector.addPeer(peer); final Capability cap = connection.capability(getSupportedProtocol()); final ForkId latestForkId = cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null; @@ -375,7 +368,6 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final DisconnectReason reason, final boolean initiatedByPeer) { final boolean wasActiveConnection = ethPeers.registerDisconnect(connection); - peerSelector.removePeer(connection.getPeer()); LOG.atDebug() .setMessage("Disconnect - active Connection? {} - {} - {} - {} {} - {} peers left") .addArgument(wasActiveConnection) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java index 75263f460e..cb84f82033 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTask.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRetryBehavior; @@ -32,6 +33,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; public class GetReceiptsFromPeerTask implements PeerTask>> { @@ -39,6 +41,7 @@ public class GetReceiptsFromPeerTask private final Collection blockHeaders; private final BodyValidator bodyValidator; private final Map> headersByReceiptsRoot = new HashMap<>(); + private final long requiredBlockchainHeight; public GetReceiptsFromPeerTask( final Collection blockHeaders, final BodyValidator bodyValidator) { @@ -50,6 +53,12 @@ public class GetReceiptsFromPeerTask headersByReceiptsRoot .computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>()) .add(header)); + + requiredBlockchainHeight = + blockHeaders.stream() + .mapToLong(BlockHeader::getNumber) + .max() + .orElse(BlockHeader.GENESIS_BLOCK_NUMBER); } @Override @@ -57,14 +66,6 @@ public class GetReceiptsFromPeerTask return EthProtocol.get(); } - @Override - public long getRequiredBlockNumber() { - return blockHeaders.stream() - .mapToLong(BlockHeader::getNumber) - .max() - .orElse(BlockHeader.GENESIS_BLOCK_NUMBER); - } - @Override public MessageData getRequestMessage() { // Since we have to match up the data by receipt root, we only need to request receipts @@ -106,4 +107,11 @@ public class GetReceiptsFromPeerTask return List.of( PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS, PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER); } + + @Override + public Predicate getPeerRequirementFilter() { + return (ethPeer) -> + ethPeer.getProtocolName().equals(getSubProtocol().getName()) + && ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight; + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java index c3741292a0..86b2f31fef 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStep.java @@ -22,7 +22,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockWithReceipts; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; @@ -32,18 +31,15 @@ import org.hyperledger.besu.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask; import org.hyperledger.besu.ethereum.mainnet.BodyValidator; import org.hyperledger.besu.plugin.services.MetricsSystem; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -import com.google.common.collect.Lists; - public class DownloadReceiptsStep implements Function, CompletableFuture>> { + private final EthContext ethContext; private final PeerTaskExecutor peerTaskExecutor; private final SynchronizerConfiguration synchronizerConfiguration; @@ -64,49 +60,43 @@ public class DownloadReceiptsStep public CompletableFuture> apply(final List blocks) { final List headers = blocks.stream().map(Block::getHeader).collect(toList()); if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { - return CompletableFuture.supplyAsync( - () -> { - Map> getReceipts = new ConcurrentHashMap<>(); - do { - List> blockHeaderSubLists = Lists.partition(headers, 20); - List>>> tasks = new ArrayList<>(); - for (List blockHeaderSubList : blockHeaderSubLists) { - tasks.add(new GetReceiptsFromPeerTask(blockHeaderSubList, new BodyValidator())); - } - Collection< - CompletableFuture< - PeerTaskExecutorResult>>>> - taskExecutions = peerTaskExecutor.executeBatchAsync(tasks); - for (CompletableFuture< - PeerTaskExecutorResult>>> - taskExecution : taskExecutions) { - taskExecution.thenAccept( - (getReceiptsResult) -> { - if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS - && getReceiptsResult.result().isPresent()) { - Map> taskResult = - getReceiptsResult.result().get(); - taskResult - .keySet() - .forEach( - (blockHeader) -> - getReceipts.merge( - blockHeader, - taskResult.get(blockHeader), - (initialReceipts, newReceipts) -> { - throw new IllegalStateException( - "Unexpectedly got receipts for block header already populated!"); - })); - } - }); - } - taskExecutions.forEach(CompletableFuture::join); - // remove all the headers we found receipts for - headers.removeAll(getReceipts.keySet()); - // repeat until all headers have receipts - } while (!headers.isEmpty()); - return combineBlocksAndReceipts(blocks, getReceipts); - }); + return ethContext + .getScheduler() + .scheduleSyncWorkerTask( + () -> { + Map> getReceipts = new ConcurrentHashMap<>(); + do { + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask(headers, new BodyValidator()); + PeerTaskExecutorResult>> + getReceiptsResult = peerTaskExecutor.execute(task); + if (getReceiptsResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS + && getReceiptsResult.result().isPresent()) { + Map> taskResult = + getReceiptsResult.result().get(); + taskResult + .keySet() + .forEach( + (blockHeader) -> + getReceipts.merge( + blockHeader, + taskResult.get(blockHeader), + (initialReceipts, newReceipts) -> { + throw new IllegalStateException( + "Unexpectedly got receipts for block header already populated!"); + })); + } else if (getReceiptsResult.responseCode() + == PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE) { + throw new RuntimeException( + "No peer available, unable to complete DownloadReceiptsStep"); + } + // remove all the headers we found receipts for + headers.removeAll(getReceipts.keySet()); + // repeat until all headers have receipts + } while (!headers.isEmpty()); + return CompletableFuture.completedFuture( + combineBlocksAndReceipts(blocks, getReceipts)); + }); } else { return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, metricsSystem) diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java index 95f3d76222..3a3331b568 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTest.java @@ -44,7 +44,6 @@ import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.EthProtocolVersion; import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection.PeerSendHandler; -import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector; import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage; import org.hyperledger.besu.ethereum.eth.messages.BlockHeadersMessage; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; @@ -1244,8 +1243,7 @@ public final class EthProtocolManagerTest { Optional.empty(), syncConfig, mock(EthScheduler.class), - mock(ForkIdManager.class), - new DefaultPeerSelector(() -> null))) { + mock(ForkIdManager.class))) { return ethManager; } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java index 16199571e6..0b0bd1e3eb 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -30,7 +30,6 @@ import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.core.ProtocolScheduleFixture; import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; -import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; @@ -118,8 +117,7 @@ public class EthProtocolManagerTestUtil { mergePeerFilter, mock(SynchronizerConfiguration.class), ethScheduler, - new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false), - new DefaultPeerSelector(() -> null)); + new ForkIdManager(blockchain, Collections.emptyList(), Collections.emptyList(), false)); } public static EthProtocolManager create( @@ -170,8 +168,7 @@ public class EthProtocolManagerTestUtil { Optional.empty(), mock(SynchronizerConfiguration.class), ethScheduler, - forkIdManager, - new DefaultPeerSelector(() -> null)); + forkIdManager); } public static EthProtocolManager create(final Blockchain blockchain) { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java index f8f1c88e1e..4529d9d396 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetReceiptsFromPeerTaskTest.java @@ -18,6 +18,8 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.manager.ChainState; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; import org.hyperledger.besu.ethereum.eth.messages.EthPV63; import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage; @@ -44,14 +46,6 @@ public class GetReceiptsFromPeerTaskTest { Assertions.assertEquals(EthProtocol.get(), task.getSubProtocol()); } - @Test - public void testGetRequiredBlockNumber() { - GetReceiptsFromPeerTask task = - new GetReceiptsFromPeerTask( - List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), null); - Assertions.assertEquals(3, task.getRequiredBlockNumber()); - } - @Test public void testGetRequestMessage() { GetReceiptsFromPeerTask task = @@ -137,6 +131,24 @@ public class GetReceiptsFromPeerTaskTest { Assertions.assertEquals(List.of(receiptForBlock3), resultMap.get(blockHeader3)); } + @Test + public void testGetPeerRequirementFilter() { + BlockHeader blockHeader1 = mockBlockHeader(1); + BlockHeader blockHeader2 = mockBlockHeader(2); + BlockHeader blockHeader3 = mockBlockHeader(3); + + GetReceiptsFromPeerTask task = + new GetReceiptsFromPeerTask(List.of(blockHeader1, blockHeader2, blockHeader3), null); + + EthPeer failForIncorrectProtocol = mockPeer("incorrectProtocol", 5); + EthPeer failForShortChainHeight = mockPeer("incorrectProtocol", 1); + EthPeer successfulCandidate = mockPeer(EthProtocol.NAME, 5); + + Assertions.assertFalse(task.getPeerRequirementFilter().test(failForIncorrectProtocol)); + Assertions.assertFalse(task.getPeerRequirementFilter().test(failForShortChainHeight)); + Assertions.assertTrue(task.getPeerRequirementFilter().test(successfulCandidate)); + } + private BlockHeader mockBlockHeader(final long blockNumber) { BlockHeader blockHeader = Mockito.mock(BlockHeader.class); Mockito.when(blockHeader.getNumber()).thenReturn(blockNumber); @@ -149,4 +161,15 @@ public class GetReceiptsFromPeerTaskTest { return blockHeader; } + + private EthPeer mockPeer(final String protocol, final long chainHeight) { + EthPeer ethPeer = Mockito.mock(EthPeer.class); + ChainState chainState = Mockito.mock(ChainState.class); + + Mockito.when(ethPeer.getProtocolName()).thenReturn(protocol); + Mockito.when(ethPeer.chainState()).thenReturn(chainState); + Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight); + + return ethPeer; + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java index 554e09c3e9..c349df1c38 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/DownloadReceiptsStepTest.java @@ -124,8 +124,8 @@ public class DownloadReceiptsStepTest { PeerTaskExecutorResult>> peerTaskResult = new PeerTaskExecutorResult<>( Optional.of(receiptsMap), PeerTaskExecutorResponseCode.SUCCESS); - Mockito.when(peerTaskExecutor.execute(Mockito.any(GetReceiptsFromPeerTask.class))) - .thenReturn(peerTaskResult); + Mockito.when(peerTaskExecutor.executeAsync(Mockito.any(GetReceiptsFromPeerTask.class))) + .thenReturn(CompletableFuture.completedFuture(peerTaskResult)); final CompletableFuture> result = downloadReceiptsStep.apply(blocks); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java index c41e326fac..c679183b0f 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TestNode.java @@ -44,7 +44,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; -import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -197,8 +196,7 @@ public class TestNode implements Closeable { Collections.emptyList(), Optional.empty(), syncConfig, - scheduler, - new DefaultPeerSelector(() -> null)); + scheduler); final NetworkRunner networkRunner = NetworkRunner.builder() diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java index f4dac8a244..5742637c3e 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java @@ -45,7 +45,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; -import org.hyperledger.besu.ethereum.eth.manager.peertask.DefaultPeerSelector; import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -319,8 +318,7 @@ public class TransactionPoolFactoryTest { Optional.empty(), mock(SynchronizerConfiguration.class), mock(EthScheduler.class), - mock(ForkIdManager.class), - new DefaultPeerSelector(() -> null)); + mock(ForkIdManager.class)); } @Test