From e63f4730c6473bad15a8e01004ae3cf5882ebc19 Mon Sep 17 00:00:00 2001 From: Matilda Clerke Date: Fri, 4 Oct 2024 08:55:24 +1000 Subject: [PATCH] 7311: Make changes as discussed in walkthrough meeting Remove DefaultPeerSelector, make EthPeers implement PeerSelector interface, and add PeerTask.getPeerRequirementFilter Signed-off-by: Matilda Clerke --- .../besu/ethereum/eth/manager/EthPeers.java | 17 ++- .../manager/peertask/DefaultPeerSelector.java | 102 ------------- .../eth/manager/peertask/PeerSelector.java | 30 +--- .../eth/manager/peertask/PeerTask.java | 16 ++- .../manager/peertask/PeerTaskExecutor.java | 17 ++- .../peertask/DefaultPeerSelectorTest.java | 135 ------------------ .../peertask/PeerTaskExecutorTest.java | 21 ++- 7 files changed, 50 insertions(+), 288 deletions(-) delete mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java delete mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index d1a54d4d3d..1b413251be 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -17,6 +17,8 @@ package org.hyperledger.besu.ethereum.eth.manager; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback; +import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; import org.hyperledger.besu.ethereum.eth.sync.SnapServerChecker; @@ -26,6 +28,7 @@ import org.hyperledger.besu.ethereum.forkid.ForkId; import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.peers.Peer; +import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; @@ -61,7 +64,7 @@ import org.apache.tuweni.bytes.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class EthPeers { +public class EthPeers implements PeerSelector { private static final Logger LOG = LoggerFactory.getLogger(EthPeers.class); public static final Comparator TOTAL_DIFFICULTY = Comparator.comparing((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty()); @@ -465,6 +468,18 @@ public class EthPeers { this.trailingPeerRequirementsSupplier = tprSupplier; } + // Part of the PeerSelector interface, to be split apart later + @Override + public EthPeer getPeer(final Predicate filter) { + return streamBestPeers().filter(filter).findFirst().orElseThrow(NoAvailablePeersException::new); + } + + // Part of the PeerSelector interface, to be split apart later + @Override + public Optional getPeerByPeerId(final PeerId peerId) { + return Optional.ofNullable(activeConnections.get(peerId.getId())); + } + @FunctionalInterface public interface ConnectCallback { void onPeerConnected(EthPeer newPeer); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java deleted file mode 100644 index 41d2e9b700..0000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright contributors to Hyperledger Besu. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.manager.peertask; - -import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; -import org.hyperledger.besu.ethereum.p2p.peers.PeerId; -import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; - -import java.util.Collection; -import java.util.Comparator; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a simple PeerSelector implementation that can be used the default implementation in most - * situations - */ -public class DefaultPeerSelector implements PeerSelector { - private static final Logger LOG = LoggerFactory.getLogger(DefaultPeerSelector.class); - - private final Supplier protocolSpecSupplier; - private final Map ethPeersByPeerId = new ConcurrentHashMap<>(); - - public DefaultPeerSelector(final Supplier protocolSpecSupplier) { - this.protocolSpecSupplier = protocolSpecSupplier; - } - - /** - * Gets the highest reputation peer matching the supplied filter - * - * @param filter a filter to match prospective peers with - * @return the highest reputation peer matching the supplies filter - * @throws NoAvailablePeerException If there are no suitable peers - */ - private EthPeer getPeer(final Predicate filter) throws NoAvailablePeerException { - LOG.trace("Finding peer from pool of {} peers", ethPeersByPeerId.size()); - return ethPeersByPeerId.values().stream() - .filter(filter) - .max(Comparator.naturalOrder()) - .orElseThrow(NoAvailablePeerException::new); - } - - @Override - public EthPeer getPeer( - final Collection usedEthPeers, - final long requiredPeerHeight, - final SubProtocol requiredSubProtocol) - throws NoAvailablePeerException { - return getPeer( - (candidatePeer) -> - isPeerUnused(candidatePeer, usedEthPeers) - && (protocolSpecSupplier.get().isPoS() - || isPeerHeightHighEnough(candidatePeer, requiredPeerHeight)) - && isPeerProtocolSuitable(candidatePeer, requiredSubProtocol)); - } - - @Override - public Optional getPeerByPeerId(final PeerId peerId) { - return Optional.ofNullable(ethPeersByPeerId.get(peerId)); - } - - @Override - public void addPeer(final EthPeer ethPeer) { - ethPeersByPeerId.put(ethPeer.getConnection().getPeer(), ethPeer); - } - - @Override - public void removePeer(final PeerId peerId) { - ethPeersByPeerId.remove(peerId); - } - - private boolean isPeerUnused(final EthPeer ethPeer, final Collection usedEthPeers) { - return !usedEthPeers.contains(ethPeer); - } - - private boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) { - return ethPeer.chainState().getEstimatedHeight() >= requiredHeight; - } - - private boolean isPeerProtocolSuitable(final EthPeer ethPeer, final SubProtocol protocol) { - return ethPeer.getProtocolName().equals(protocol.getName()); - } -} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java index 93d98a193b..0801f9f00e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java @@ -16,28 +16,20 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.p2p.peers.PeerId; -import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; -import java.util.Collection; import java.util.Optional; +import java.util.function.Predicate; /** Selects the EthPeers for the PeerTaskExecutor */ public interface PeerSelector { /** - * Gets a peer with the requiredPeerHeight (if not PoS), and with the requiredSubProtocol, and - * which is not in the supplied collection of usedEthPeers + * Gets a peer matching the supplied filter * - * @param usedEthPeers a collection of EthPeers to be excluded from selection because they have - * already been used - * @param requiredPeerHeight the minimum peer height required of the selected peer - * @param requiredSubProtocol the SubProtocol required of the peer + * @param filter a Predicate\ matching desirable peers * @return a peer matching the supplied conditions - * @throws NoAvailablePeerException If there are no suitable peers */ - EthPeer getPeer( - Collection usedEthPeers, long requiredPeerHeight, SubProtocol requiredSubProtocol) - throws NoAvailablePeerException; + EthPeer getPeer(final Predicate filter); /** * Attempts to get the EthPeer identified by peerId @@ -47,18 +39,4 @@ public interface PeerSelector { * PeerSelector, or empty otherwise */ Optional getPeerByPeerId(PeerId peerId); - - /** - * Add the supplied EthPeer to the PeerSelector - * - * @param ethPeer the EthPeer to be added to the PeerSelector - */ - void addPeer(EthPeer ethPeer); - - /** - * Remove the EthPeer identified by peerId from the PeerSelector - * - * @param peerId the PeerId of the EthPeer to be removed from the PeerSelector - */ - void removePeer(PeerId peerId); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java index 36bd03531b..1c5ee76c9a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java @@ -14,10 +14,12 @@ */ package org.hyperledger.besu.ethereum.eth.manager.peertask; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import java.util.Collection; +import java.util.function.Predicate; /** * Represents a task to be executed on an EthPeer by the PeerTaskExecutor @@ -32,13 +34,6 @@ public interface PeerTask { */ SubProtocol getSubProtocol(); - /** - * Gets the minimum required block number for a peer to have to successfully execute this task - * - * @return the minimum required block number for a peer to have to successfully execute this task - */ - long getRequiredBlockNumber(); - /** * Gets the request data to send to the EthPeer * @@ -61,4 +56,11 @@ public interface PeerTask { * @return the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor */ Collection getPeerTaskBehaviors(); + + /** + * Gets a Predicate that checks if an EthPeer is suitable for this PeerTask + * + * @return a Predicate that checks if an EthPeer is suitable for this PeerTask + */ + Predicate getPeerRequirementFilter(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index 10c882e7e5..2d9a6edfce 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -15,6 +15,8 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.metrics.BesuMetricCategory; @@ -37,15 +39,18 @@ public class PeerTaskExecutor { public static final int NO_RETRIES = 1; private final PeerSelector peerSelector; private final PeerTaskRequestSender requestSender; + private final EthScheduler ethScheduler; private final LabelledMetric requestTimer; public PeerTaskExecutor( final PeerSelector peerSelector, final PeerTaskRequestSender requestSender, + final EthScheduler ethScheduler, final MetricsSystem metricsSystem) { this.peerSelector = peerSelector; this.requestSender = requestSender; + this.ethScheduler = ethScheduler; requestTimer = metricsSystem.createLabelledTimer( BesuMetricCategory.PEERS, @@ -66,10 +71,12 @@ public class PeerTaskExecutor { try { peer = peerSelector.getPeer( - usedEthPeers, peerTask.getRequiredBlockNumber(), peerTask.getSubProtocol()); + (candidatePeer) -> + peerTask.getPeerRequirementFilter().test(candidatePeer) + && !usedEthPeers.contains(candidatePeer)); usedEthPeers.add(peer); executorResult = executeAgainstPeer(peerTask, peer); - } catch (NoAvailablePeerException e) { + } catch (NoAvailablePeersException e) { executorResult = new PeerTaskExecutorResult<>( Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE); @@ -81,7 +88,8 @@ public class PeerTaskExecutor { } public CompletableFuture> executeAsync(final PeerTask peerTask) { - return CompletableFuture.supplyAsync(() -> execute(peerTask)); + return ethScheduler.scheduleSyncWorkerTask( + () -> CompletableFuture.completedFuture(execute(peerTask))); } public PeerTaskExecutorResult executeAgainstPeer( @@ -138,7 +146,8 @@ public class PeerTaskExecutor { public CompletableFuture> executeAgainstPeerAsync( final PeerTask peerTask, final EthPeer peer) { - return CompletableFuture.supplyAsync(() -> executeAgainstPeer(peerTask, peer)); + return ethScheduler.scheduleSyncWorkerTask( + () -> CompletableFuture.completedFuture(executeAgainstPeer(peerTask, peer))); } private boolean sleepBetweenRetries() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java deleted file mode 100644 index add2b1e612..0000000000 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright contributors to Hyperledger Besu. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.manager.peertask; - -import org.hyperledger.besu.ethereum.eth.EthProtocol; -import org.hyperledger.besu.ethereum.eth.SnapProtocol; -import org.hyperledger.besu.ethereum.eth.manager.ChainState; -import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.PeerReputation; -import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; -import org.hyperledger.besu.ethereum.p2p.peers.Peer; -import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; -import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; - -import java.util.HashSet; -import java.util.Set; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -public class DefaultPeerSelectorTest { - - public DefaultPeerSelector peerSelector; - - @BeforeEach - public void beforeTest() { - ProtocolSpec protocolSpec = Mockito.mock(ProtocolSpec.class); - Mockito.when(protocolSpec.isPoS()).thenReturn(false); - peerSelector = new DefaultPeerSelector(() -> protocolSpec); - } - - @Test - public void testGetPeer() throws NoAvailablePeerException { - EthPeer expectedPeer = createTestPeer(10, EthProtocol.get(), 5); - peerSelector.addPeer(expectedPeer); - EthPeer excludedForLowChainHeightPeer = createTestPeer(5, EthProtocol.get(), 50); - peerSelector.addPeer(excludedForLowChainHeightPeer); - EthPeer excludedForWrongProtocolPeer = createTestPeer(10, SnapProtocol.get(), 50); - peerSelector.addPeer(excludedForWrongProtocolPeer); - EthPeer excludedForLowReputationPeer = createTestPeer(10, EthProtocol.get(), 1); - peerSelector.addPeer(excludedForLowReputationPeer); - EthPeer excludedForBeingAlreadyUsedPeer = createTestPeer(10, EthProtocol.get(), 50); - peerSelector.addPeer(excludedForBeingAlreadyUsedPeer); - - Set usedEthPeers = new HashSet<>(); - usedEthPeers.add(excludedForBeingAlreadyUsedPeer); - - EthPeer result = peerSelector.getPeer(usedEthPeers, 10, EthProtocol.get()); - - Assertions.assertSame(expectedPeer, result); - } - - @Test - public void testGetPeerButNoPeerMatchesFilter() { - EthPeer expectedPeer = createTestPeer(10, EthProtocol.get(), 5); - peerSelector.addPeer(expectedPeer); - EthPeer excludedForLowChainHeightPeer = createTestPeer(5, EthProtocol.get(), 50); - peerSelector.addPeer(excludedForLowChainHeightPeer); - EthPeer excludedForWrongProtocolPeer = createTestPeer(10, SnapProtocol.get(), 50); - peerSelector.addPeer(excludedForWrongProtocolPeer); - EthPeer excludedForLowReputationPeer = createTestPeer(10, EthProtocol.get(), 1); - peerSelector.addPeer(excludedForLowReputationPeer); - EthPeer excludedForBeingAlreadyUsedPeer = createTestPeer(10, EthProtocol.get(), 50); - peerSelector.addPeer(excludedForBeingAlreadyUsedPeer); - - Set usedEthPeers = new HashSet<>(); - usedEthPeers.add(excludedForBeingAlreadyUsedPeer); - - Assertions.assertThrows( - NoAvailablePeerException.class, - () -> peerSelector.getPeer(usedEthPeers, 10, new MockSubProtocol())); - } - - private EthPeer createTestPeer( - final long chainHeight, final SubProtocol protocol, final int reputation) { - EthPeer ethPeer = Mockito.mock(EthPeer.class); - PeerConnection peerConnection = Mockito.mock(PeerConnection.class); - Peer peer = Mockito.mock(Peer.class); - ChainState chainState = Mockito.mock(ChainState.class); - PeerReputation peerReputation = Mockito.mock(PeerReputation.class); - - Mockito.when(ethPeer.getConnection()).thenReturn(peerConnection); - Mockito.when(peerConnection.getPeer()).thenReturn(peer); - Mockito.when(ethPeer.getProtocolName()).thenReturn(protocol.getName()); - Mockito.when(ethPeer.chainState()).thenReturn(chainState); - Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight); - Mockito.when(ethPeer.getReputation()).thenReturn(peerReputation); - Mockito.when(peerReputation.getScore()).thenReturn(reputation); - - Mockito.when(ethPeer.compareTo(Mockito.any(EthPeer.class))) - .thenAnswer( - (invocationOnMock) -> { - EthPeer otherPeer = invocationOnMock.getArgument(0, EthPeer.class); - return Integer.compare(reputation, otherPeer.getReputation().getScore()); - }); - return ethPeer; - } - - private static class MockSubProtocol implements SubProtocol { - - @Override - public String getName() { - return "Mock"; - } - - @Override - public int messageSpace(final int protocolVersion) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isValidMessageCode(final int protocolVersion, final int code) { - throw new UnsupportedOperationException(); - } - - @Override - public String messageName(final int protocolVersion, final int code) { - throw new UnsupportedOperationException(); - } - } -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java index 15b1747bc7..98afe58b11 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -15,16 +15,17 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -37,6 +38,7 @@ import org.mockito.MockitoAnnotations; public class PeerTaskExecutorTest { private @Mock PeerSelector peerSelector; private @Mock PeerTaskRequestSender requestSender; + private @Mock EthScheduler ethScheduler; private @Mock PeerTask peerTask; private @Mock SubProtocol subprotocol; private @Mock MessageData requestMessageData; @@ -49,7 +51,8 @@ public class PeerTaskExecutorTest { @BeforeEach public void beforeTest() { mockCloser = MockitoAnnotations.openMocks(this); - peerTaskExecutor = new PeerTaskExecutor(peerSelector, requestSender, new NoOpMetricsSystem()); + peerTaskExecutor = + new PeerTaskExecutor(peerSelector, requestSender, ethScheduler, new NoOpMetricsSystem()); } @AfterEach @@ -201,14 +204,10 @@ public class PeerTaskExecutorTest { NoAvailablePeerException { Object responseObject = new Object(); - Mockito.when( - peerSelector.getPeer( - Mockito.any(Collection.class), Mockito.eq(10L), Mockito.eq(subprotocol))) - .thenReturn(ethPeer); + Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); - Mockito.when(peerTask.getRequiredBlockNumber()).thenReturn(10L); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -232,22 +231,18 @@ public class PeerTaskExecutorTest { ExecutionException, InterruptedException, TimeoutException, - InvalidPeerTaskResponseException, - NoAvailablePeerException { + InvalidPeerTaskResponseException { Object responseObject = new Object(); int requestMessageDataCode = 123; EthPeer peer2 = Mockito.mock(EthPeer.class); - Mockito.when( - peerSelector.getPeer( - Mockito.any(Collection.class), Mockito.eq(10L), Mockito.eq(subprotocol))) + Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))) .thenReturn(ethPeer) .thenReturn(peer2); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()) .thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS)); - Mockito.when(peerTask.getRequiredBlockNumber()).thenReturn(10L); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenThrow(new TimeoutException());