7311: Return Optional<EthPeer> in PeerSelector.getPeer and utilise existing peer selection behavior in EthPeers

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7628/head
Matilda Clerke 2 months ago
parent c047f428bf
commit 5aa6b0be5f
  1. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  2. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java
  3. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  4. 11
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java

@ -17,7 +17,6 @@ package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback; import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
@ -470,8 +469,8 @@ public class EthPeers implements PeerSelector {
// Part of the PeerSelector interface, to be split apart later // Part of the PeerSelector interface, to be split apart later
@Override @Override
public EthPeer getPeer(final Predicate<EthPeer> filter) { public Optional<EthPeer> getPeer(final Predicate<EthPeer> filter) {
return streamBestPeers().filter(filter).findFirst().orElseThrow(NoAvailablePeersException::new); return bestPeerMatchingCriteria(filter);
} }
// Part of the PeerSelector interface, to be split apart later // Part of the PeerSelector interface, to be split apart later

@ -29,7 +29,7 @@ public interface PeerSelector {
* @param filter a Predicate\<EthPeer\> matching desirable peers * @param filter a Predicate\<EthPeer\> matching desirable peers
* @return a peer matching the supplied conditions * @return a peer matching the supplied conditions
*/ */
EthPeer getPeer(final Predicate<EthPeer> filter); Optional<EthPeer> getPeer(final Predicate<EthPeer> filter);
/** /**
* Attempts to get the EthPeer identified by peerId * Attempts to get the EthPeer identified by peerId

@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.manager.peertask; package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.BesuMetricCategory;
@ -62,20 +61,19 @@ public class PeerTaskExecutor {
: NO_RETRIES; : NO_RETRIES;
final Collection<EthPeer> usedEthPeers = new HashSet<>(); final Collection<EthPeer> usedEthPeers = new HashSet<>();
do { do {
EthPeer peer; Optional<EthPeer> peer =
try { peerSelector.getPeer(
peer = (candidatePeer) ->
peerSelector.getPeer( peerTask.getPeerRequirementFilter().test(candidatePeer)
(candidatePeer) -> && !usedEthPeers.contains(candidatePeer));
peerTask.getPeerRequirementFilter().test(candidatePeer) if (peer.isEmpty()) {
&& !usedEthPeers.contains(candidatePeer));
usedEthPeers.add(peer);
executorResult = executeAgainstPeer(peerTask, peer);
} catch (NoAvailablePeersException e) {
executorResult = executorResult =
new PeerTaskExecutorResult<>( new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE); Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
continue;
} }
usedEthPeers.add(peer.get());
executorResult = executeAgainstPeer(peerTask, peer.get());
} while (retriesRemaining-- > 0 } while (retriesRemaining-- > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS); && executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS);

@ -22,6 +22,7 @@ import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -49,8 +50,7 @@ public class PeerTaskExecutorTest {
@BeforeEach @BeforeEach
public void beforeTest() { public void beforeTest() {
mockCloser = MockitoAnnotations.openMocks(this); mockCloser = MockitoAnnotations.openMocks(this);
peerTaskExecutor = peerTaskExecutor = new PeerTaskExecutor(peerSelector, requestSender, new NoOpMetricsSystem());
new PeerTaskExecutor(peerSelector, requestSender, new NoOpMetricsSystem());
} }
@AfterEach @AfterEach
@ -201,7 +201,8 @@ public class PeerTaskExecutorTest {
InvalidPeerTaskResponseException { InvalidPeerTaskResponseException {
Object responseObject = new Object(); Object responseObject = new Object();
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class)))
.thenReturn(Optional.of(ethPeer));
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList());
@ -234,8 +235,8 @@ public class PeerTaskExecutorTest {
EthPeer peer2 = Mockito.mock(EthPeer.class); EthPeer peer2 = Mockito.mock(EthPeer.class);
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))) Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class)))
.thenReturn(ethPeer) .thenReturn(Optional.of(ethPeer))
.thenReturn(peer2); .thenReturn(Optional.of(peer2));
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskRetryBehaviors()) Mockito.when(peerTask.getPeerTaskRetryBehaviors())

Loading…
Cancel
Save