diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java index c334773d16..41d2e9b700 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java @@ -15,13 +15,17 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.peers.PeerId; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; +import java.util.Collection; import java.util.Comparator; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +37,13 @@ import org.slf4j.LoggerFactory; public class DefaultPeerSelector implements PeerSelector { private static final Logger LOG = LoggerFactory.getLogger(DefaultPeerSelector.class); + private final Supplier protocolSpecSupplier; private final Map ethPeersByPeerId = new ConcurrentHashMap<>(); + public DefaultPeerSelector(final Supplier protocolSpecSupplier) { + this.protocolSpecSupplier = protocolSpecSupplier; + } + /** * Gets the highest reputation peer matching the supplied filter * @@ -42,8 +51,7 @@ public class DefaultPeerSelector implements PeerSelector { * @return the highest reputation peer matching the supplies filter * @throws NoAvailablePeerException If there are no suitable peers */ - @Override - public EthPeer getPeer(final Predicate filter) throws NoAvailablePeerException { + private EthPeer getPeer(final Predicate filter) throws NoAvailablePeerException { LOG.trace("Finding peer from pool of {} peers", ethPeersByPeerId.size()); return ethPeersByPeerId.values().stream() .filter(filter) @@ -51,6 +59,20 @@ public class DefaultPeerSelector implements PeerSelector { .orElseThrow(NoAvailablePeerException::new); } + @Override + public EthPeer getPeer( + final Collection usedEthPeers, + final long requiredPeerHeight, + final SubProtocol requiredSubProtocol) + throws NoAvailablePeerException { + return getPeer( + (candidatePeer) -> + isPeerUnused(candidatePeer, usedEthPeers) + && (protocolSpecSupplier.get().isPoS() + || isPeerHeightHighEnough(candidatePeer, requiredPeerHeight)) + && isPeerProtocolSuitable(candidatePeer, requiredSubProtocol)); + } + @Override public Optional getPeerByPeerId(final PeerId peerId) { return Optional.ofNullable(ethPeersByPeerId.get(peerId)); @@ -65,4 +87,16 @@ public class DefaultPeerSelector implements PeerSelector { public void removePeer(final PeerId peerId) { ethPeersByPeerId.remove(peerId); } + + private boolean isPeerUnused(final EthPeer ethPeer, final Collection usedEthPeers) { + return !usedEthPeers.contains(ethPeer); + } + + private boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) { + return ethPeer.chainState().getEstimatedHeight() >= requiredHeight; + } + + private boolean isPeerProtocolSuitable(final EthPeer ethPeer, final SubProtocol protocol) { + return ethPeer.getProtocolName().equals(protocol.getName()); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java index 3f5589f93b..93d98a193b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java @@ -16,21 +16,28 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.p2p.peers.PeerId; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; +import java.util.Collection; import java.util.Optional; -import java.util.function.Predicate; /** Selects the EthPeers for the PeerTaskExecutor */ public interface PeerSelector { /** - * Gets a peer matching the supplied filter + * Gets a peer with the requiredPeerHeight (if not PoS), and with the requiredSubProtocol, and + * which is not in the supplied collection of usedEthPeers * - * @param filter a filter to match prospective peers with - * @return a peer matching the supplied filter + * @param usedEthPeers a collection of EthPeers to be excluded from selection because they have + * already been used + * @param requiredPeerHeight the minimum peer height required of the selected peer + * @param requiredSubProtocol the SubProtocol required of the peer + * @return a peer matching the supplied conditions * @throws NoAvailablePeerException If there are no suitable peers */ - EthPeer getPeer(final Predicate filter) throws NoAvailablePeerException; + EthPeer getPeer( + Collection usedEthPeers, long requiredPeerHeight, SubProtocol requiredSubProtocol) + throws NoAvailablePeerException; /** * Attempts to get the EthPeer identified by peerId @@ -39,19 +46,19 @@ public interface PeerSelector { * @return An Optional\ containing the EthPeer identified by peerId if present in the * PeerSelector, or empty otherwise */ - Optional getPeerByPeerId(final PeerId peerId); + Optional getPeerByPeerId(PeerId peerId); /** * Add the supplied EthPeer to the PeerSelector * * @param ethPeer the EthPeer to be added to the PeerSelector */ - void addPeer(final EthPeer ethPeer); + void addPeer(EthPeer ethPeer); /** * Remove the EthPeer identified by peerId from the PeerSelector * * @param peerId the PeerId of the EthPeer to be removed from the PeerSelector */ - void removePeer(final PeerId peerId); + void removePeer(PeerId peerId); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index caef72ba76..5a71a8f260 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -18,14 +18,13 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; -import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.plugin.services.metrics.OperationTimer; -import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -40,17 +39,15 @@ public class PeerTaskExecutor { public static final int NO_RETRIES = 1; private final PeerSelector peerSelector; private final PeerTaskRequestSender requestSender; - private final Supplier protocolSpecSupplier; + private final LabelledMetric requestTimer; public PeerTaskExecutor( final PeerSelector peerSelector, final PeerTaskRequestSender requestSender, - final Supplier protocolSpecSupplier, final MetricsSystem metricsSystem) { this.peerSelector = peerSelector; this.requestSender = requestSender; - this.protocolSpecSupplier = protocolSpecSupplier; requestTimer = metricsSystem.createLabelledTimer( BesuMetricCategory.PEERS, @@ -65,23 +62,19 @@ public class PeerTaskExecutor { peerTask.getPeerTaskBehaviors().contains(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS) ? RETRIES_WITH_OTHER_PEER : NO_RETRIES; - final Collection usedEthPeers = new ArrayList<>(); + final Collection usedEthPeers = new HashSet<>(); do { EthPeer peer; try { peer = peerSelector.getPeer( - (candidatePeer) -> - isPeerUnused(candidatePeer, usedEthPeers) - && (protocolSpecSupplier.get().isPoS() - || isPeerHeightHighEnough( - candidatePeer, peerTask.getRequiredBlockNumber())) - && isPeerProtocolSuitable(candidatePeer, peerTask.getSubProtocol())); + usedEthPeers, peerTask.getRequiredBlockNumber(), peerTask.getSubProtocol()); usedEthPeers.add(peer); executorResult = executeAgainstPeer(peerTask, peer); } catch (NoAvailablePeerException e) { executorResult = - new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE); + new PeerTaskExecutorResult<>( + Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE); } } while (--triesRemaining > 0 && executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS); @@ -103,7 +96,6 @@ public class PeerTaskExecutor { : NO_RETRIES; do { try { - T result; try (final OperationTimer.TimingContext ignored = requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) { @@ -113,24 +105,30 @@ public class PeerTaskExecutor { result = peerTask.parseResponse(responseMessageData); } peer.recordUsefulResponse(); - executorResult = new PeerTaskExecutorResult<>(Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS); + executorResult = + new PeerTaskExecutorResult<>( + Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS); } catch (PeerConnection.PeerNotConnected e) { executorResult = - new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED); + new PeerTaskExecutorResult<>( + Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED); } catch (InterruptedException | TimeoutException e) { peer.recordRequestTimeout(requestMessageData.getCode()); - executorResult = new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT); + executorResult = + new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT); } catch (InvalidPeerTaskResponseException e) { peer.recordUselessResponse(e.getMessage()); executorResult = - new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE); + new PeerTaskExecutorResult<>( + Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE); } catch (ExecutionException e) { executorResult = - new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); + new PeerTaskExecutorResult<>( + Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); } } while (--triesRemaining > 0 && executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS @@ -154,17 +152,4 @@ public class PeerTaskExecutor { return false; } } - - private static boolean isPeerUnused( - final EthPeer ethPeer, final Collection usedEthPeers) { - return !usedEthPeers.contains(ethPeer); - } - - private static boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) { - return ethPeer.chainState().getEstimatedHeight() >= requiredHeight; - } - - private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final SubProtocol protocol) { - return ethPeer.getProtocolName().equals(protocol.getName()); - } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java index ef528cd2ae..86dec85c29 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java @@ -16,8 +16,5 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import java.util.Optional; -public record PeerTaskExecutorResult ( - Optional result, - PeerTaskExecutorResponseCode responseCode -) -{} +public record PeerTaskExecutorResult( + Optional result, PeerTaskExecutorResponseCode responseCode) {} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java index 8913b80a45..add2b1e612 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java @@ -14,19 +14,23 @@ */ package org.hyperledger.besu.ethereum.eth.manager.peertask; +import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.SnapProtocol; +import org.hyperledger.besu.ethereum.eth.manager.ChainState; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection; +import org.hyperledger.besu.ethereum.eth.manager.PeerReputation; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; -import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; -import java.time.Clock; -import java.util.Collections; +import java.util.HashSet; import java.util.Set; -import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class DefaultPeerSelectorTest { @@ -34,67 +38,98 @@ public class DefaultPeerSelectorTest { @BeforeEach public void beforeTest() { - peerSelector = new DefaultPeerSelector(); + ProtocolSpec protocolSpec = Mockito.mock(ProtocolSpec.class); + Mockito.when(protocolSpec.isPoS()).thenReturn(false); + peerSelector = new DefaultPeerSelector(() -> protocolSpec); } @Test public void testGetPeer() throws NoAvailablePeerException { - EthPeer protocol1With5ReputationPeer = - createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5); - peerSelector.addPeer(protocol1With5ReputationPeer); - EthPeer protocol1With4ReputationPeer = - createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4); - peerSelector.addPeer(protocol1With4ReputationPeer); - EthPeer protocol2With50ReputationPeer = - createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50); - peerSelector.addPeer(protocol2With50ReputationPeer); - EthPeer protocol2With4ReputationPeer = - createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4); - peerSelector.addPeer(protocol2With4ReputationPeer); - - EthPeer result = peerSelector.getPeer((p) -> p.getProtocolName().equals("protocol1")); - - Assertions.assertSame(protocol1With5ReputationPeer, result); + EthPeer expectedPeer = createTestPeer(10, EthProtocol.get(), 5); + peerSelector.addPeer(expectedPeer); + EthPeer excludedForLowChainHeightPeer = createTestPeer(5, EthProtocol.get(), 50); + peerSelector.addPeer(excludedForLowChainHeightPeer); + EthPeer excludedForWrongProtocolPeer = createTestPeer(10, SnapProtocol.get(), 50); + peerSelector.addPeer(excludedForWrongProtocolPeer); + EthPeer excludedForLowReputationPeer = createTestPeer(10, EthProtocol.get(), 1); + peerSelector.addPeer(excludedForLowReputationPeer); + EthPeer excludedForBeingAlreadyUsedPeer = createTestPeer(10, EthProtocol.get(), 50); + peerSelector.addPeer(excludedForBeingAlreadyUsedPeer); + + Set usedEthPeers = new HashSet<>(); + usedEthPeers.add(excludedForBeingAlreadyUsedPeer); + + EthPeer result = peerSelector.getPeer(usedEthPeers, 10, EthProtocol.get()); + + Assertions.assertSame(expectedPeer, result); } @Test public void testGetPeerButNoPeerMatchesFilter() { - EthPeer protocol1With5ReputationPeer = - createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5); - peerSelector.addPeer(protocol1With5ReputationPeer); - EthPeer protocol1With4ReputationPeer = - createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4); - peerSelector.addPeer(protocol1With4ReputationPeer); - EthPeer protocol2With50ReputationPeer = - createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50); - peerSelector.addPeer(protocol2With50ReputationPeer); - EthPeer protocol2With4ReputationPeer = - createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4); - peerSelector.addPeer(protocol2With4ReputationPeer); + EthPeer expectedPeer = createTestPeer(10, EthProtocol.get(), 5); + peerSelector.addPeer(expectedPeer); + EthPeer excludedForLowChainHeightPeer = createTestPeer(5, EthProtocol.get(), 50); + peerSelector.addPeer(excludedForLowChainHeightPeer); + EthPeer excludedForWrongProtocolPeer = createTestPeer(10, SnapProtocol.get(), 50); + peerSelector.addPeer(excludedForWrongProtocolPeer); + EthPeer excludedForLowReputationPeer = createTestPeer(10, EthProtocol.get(), 1); + peerSelector.addPeer(excludedForLowReputationPeer); + EthPeer excludedForBeingAlreadyUsedPeer = createTestPeer(10, EthProtocol.get(), 50); + peerSelector.addPeer(excludedForBeingAlreadyUsedPeer); + + Set usedEthPeers = new HashSet<>(); + usedEthPeers.add(excludedForBeingAlreadyUsedPeer); Assertions.assertThrows( NoAvailablePeerException.class, - () -> peerSelector.getPeer((p) -> p.getProtocolName().equals("fake protocol"))); + () -> peerSelector.getPeer(usedEthPeers, 10, new MockSubProtocol())); } private EthPeer createTestPeer( - final Set connectionCapabilities, - final String protocolName, - final int reputationAdjustment) { - PeerConnection peerConnection = new MockPeerConnection(connectionCapabilities); - EthPeer peer = - new EthPeer( - peerConnection, - protocolName, - null, - Collections.emptyList(), - 1, - Clock.systemUTC(), - Collections.emptyList(), - Bytes.EMPTY); - for (int i = 0; i < reputationAdjustment; i++) { - peer.getReputation().recordUsefulResponse(); + final long chainHeight, final SubProtocol protocol, final int reputation) { + EthPeer ethPeer = Mockito.mock(EthPeer.class); + PeerConnection peerConnection = Mockito.mock(PeerConnection.class); + Peer peer = Mockito.mock(Peer.class); + ChainState chainState = Mockito.mock(ChainState.class); + PeerReputation peerReputation = Mockito.mock(PeerReputation.class); + + Mockito.when(ethPeer.getConnection()).thenReturn(peerConnection); + Mockito.when(peerConnection.getPeer()).thenReturn(peer); + Mockito.when(ethPeer.getProtocolName()).thenReturn(protocol.getName()); + Mockito.when(ethPeer.chainState()).thenReturn(chainState); + Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight); + Mockito.when(ethPeer.getReputation()).thenReturn(peerReputation); + Mockito.when(peerReputation.getScore()).thenReturn(reputation); + + Mockito.when(ethPeer.compareTo(Mockito.any(EthPeer.class))) + .thenAnswer( + (invocationOnMock) -> { + EthPeer otherPeer = invocationOnMock.getArgument(0, EthPeer.class); + return Integer.compare(reputation, otherPeer.getReputation().getScore()); + }); + return ethPeer; + } + + private static class MockSubProtocol implements SubProtocol { + + @Override + public String getName() { + return "Mock"; + } + + @Override + public int messageSpace(final int protocolVersion) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isValidMessageCode(final int protocolVersion, final int code) { + throw new UnsupportedOperationException(); + } + + @Override + public String messageName(final int protocolVersion, final int code) { + throw new UnsupportedOperationException(); } - return peer; } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java index 0077c90418..a7b5914517 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -15,17 +15,16 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.util.function.Predicate; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -38,7 +37,6 @@ import org.mockito.MockitoAnnotations; public class PeerTaskExecutorTest { private @Mock PeerSelector peerSelector; private @Mock PeerTaskRequestSender requestSender; - private @Mock ProtocolSpec protocolSpec; private @Mock PeerTask peerTask; private @Mock SubProtocol subprotocol; private @Mock MessageData requestMessageData; @@ -53,7 +51,7 @@ public class PeerTaskExecutorTest { mockCloser = MockitoAnnotations.openMocks(this); peerTaskExecutor = new PeerTaskExecutor( - peerSelector, requestSender, () -> protocolSpec, new NoOpMetricsSystem()); + peerSelector, requestSender, new NoOpMetricsSystem()); } @AfterEach @@ -140,8 +138,7 @@ public class PeerTaskExecutorTest { Assertions.assertNotNull(result); Assertions.assertTrue(result.result().isEmpty()); - Assertions.assertEquals( - PeerTaskExecutorResponseCode.PEER_DISCONNECTED, result.responseCode()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.PEER_DISCONNECTED, result.responseCode()); } @Test @@ -192,8 +189,7 @@ public class PeerTaskExecutorTest { Assertions.assertNotNull(result); Assertions.assertTrue(result.result().isEmpty()); - Assertions.assertEquals( - PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode()); } @Test @@ -207,10 +203,13 @@ public class PeerTaskExecutorTest { NoAvailablePeerException { Object responseObject = new Object(); - Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); + Mockito.when( + peerSelector.getPeer(Mockito.any(Collection.class), Mockito.eq(10L), Mockito.eq(subprotocol))) + .thenReturn(ethPeer); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRequiredBlockNumber()).thenReturn(10L); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -240,15 +239,16 @@ public class PeerTaskExecutorTest { int requestMessageDataCode = 123; EthPeer peer2 = Mockito.mock(EthPeer.class); - Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))) + Mockito.when( + peerSelector.getPeer(Mockito.any(Collection.class), Mockito.eq(10L), Mockito.eq(subprotocol))) .thenReturn(ethPeer) .thenReturn(peer2); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()) .thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS)); + Mockito.when(peerTask.getRequiredBlockNumber()).thenReturn(10L); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); - Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenThrow(new TimeoutException()); Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);