7311: Move peer selection logic to PeerSelector

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7628/head
Matilda Clerke 2 months ago
parent 720f94ef50
commit 7d845b327b
  1. 38
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java
  2. 23
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java
  3. 49
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  4. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java
  5. 139
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java
  6. 22
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java

@ -15,13 +15,17 @@
package org.hyperledger.besu.ethereum.eth.manager.peertask; package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -33,8 +37,13 @@ import org.slf4j.LoggerFactory;
public class DefaultPeerSelector implements PeerSelector { public class DefaultPeerSelector implements PeerSelector {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPeerSelector.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultPeerSelector.class);
private final Supplier<ProtocolSpec> protocolSpecSupplier;
private final Map<PeerId, EthPeer> ethPeersByPeerId = new ConcurrentHashMap<>(); private final Map<PeerId, EthPeer> ethPeersByPeerId = new ConcurrentHashMap<>();
public DefaultPeerSelector(final Supplier<ProtocolSpec> protocolSpecSupplier) {
this.protocolSpecSupplier = protocolSpecSupplier;
}
/** /**
* Gets the highest reputation peer matching the supplied filter * Gets the highest reputation peer matching the supplied filter
* *
@ -42,8 +51,7 @@ public class DefaultPeerSelector implements PeerSelector {
* @return the highest reputation peer matching the supplies filter * @return the highest reputation peer matching the supplies filter
* @throws NoAvailablePeerException If there are no suitable peers * @throws NoAvailablePeerException If there are no suitable peers
*/ */
@Override private EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException {
public EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException {
LOG.trace("Finding peer from pool of {} peers", ethPeersByPeerId.size()); LOG.trace("Finding peer from pool of {} peers", ethPeersByPeerId.size());
return ethPeersByPeerId.values().stream() return ethPeersByPeerId.values().stream()
.filter(filter) .filter(filter)
@ -51,6 +59,20 @@ public class DefaultPeerSelector implements PeerSelector {
.orElseThrow(NoAvailablePeerException::new); .orElseThrow(NoAvailablePeerException::new);
} }
@Override
public EthPeer getPeer(
final Collection<EthPeer> usedEthPeers,
final long requiredPeerHeight,
final SubProtocol requiredSubProtocol)
throws NoAvailablePeerException {
return getPeer(
(candidatePeer) ->
isPeerUnused(candidatePeer, usedEthPeers)
&& (protocolSpecSupplier.get().isPoS()
|| isPeerHeightHighEnough(candidatePeer, requiredPeerHeight))
&& isPeerProtocolSuitable(candidatePeer, requiredSubProtocol));
}
@Override @Override
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) { public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {
return Optional.ofNullable(ethPeersByPeerId.get(peerId)); return Optional.ofNullable(ethPeersByPeerId.get(peerId));
@ -65,4 +87,16 @@ public class DefaultPeerSelector implements PeerSelector {
public void removePeer(final PeerId peerId) { public void removePeer(final PeerId peerId) {
ethPeersByPeerId.remove(peerId); ethPeersByPeerId.remove(peerId);
} }
private boolean isPeerUnused(final EthPeer ethPeer, final Collection<EthPeer> usedEthPeers) {
return !usedEthPeers.contains(ethPeer);
}
private boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) {
return ethPeer.chainState().getEstimatedHeight() >= requiredHeight;
}
private boolean isPeerProtocolSuitable(final EthPeer ethPeer, final SubProtocol protocol) {
return ethPeer.getProtocolName().equals(protocol.getName());
}
} }

@ -16,21 +16,28 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.Collection;
import java.util.Optional; import java.util.Optional;
import java.util.function.Predicate;
/** Selects the EthPeers for the PeerTaskExecutor */ /** Selects the EthPeers for the PeerTaskExecutor */
public interface PeerSelector { public interface PeerSelector {
/** /**
* Gets a peer matching the supplied filter * Gets a peer with the requiredPeerHeight (if not PoS), and with the requiredSubProtocol, and
* which is not in the supplied collection of usedEthPeers
* *
* @param filter a filter to match prospective peers with * @param usedEthPeers a collection of EthPeers to be excluded from selection because they have
* @return a peer matching the supplied filter * already been used
* @param requiredPeerHeight the minimum peer height required of the selected peer
* @param requiredSubProtocol the SubProtocol required of the peer
* @return a peer matching the supplied conditions
* @throws NoAvailablePeerException If there are no suitable peers * @throws NoAvailablePeerException If there are no suitable peers
*/ */
EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException; EthPeer getPeer(
Collection<EthPeer> usedEthPeers, long requiredPeerHeight, SubProtocol requiredSubProtocol)
throws NoAvailablePeerException;
/** /**
* Attempts to get the EthPeer identified by peerId * Attempts to get the EthPeer identified by peerId
@ -39,19 +46,19 @@ public interface PeerSelector {
* @return An Optional\<EthPeer\> containing the EthPeer identified by peerId if present in the * @return An Optional\<EthPeer\> containing the EthPeer identified by peerId if present in the
* PeerSelector, or empty otherwise * PeerSelector, or empty otherwise
*/ */
Optional<EthPeer> getPeerByPeerId(final PeerId peerId); Optional<EthPeer> getPeerByPeerId(PeerId peerId);
/** /**
* Add the supplied EthPeer to the PeerSelector * Add the supplied EthPeer to the PeerSelector
* *
* @param ethPeer the EthPeer to be added to the PeerSelector * @param ethPeer the EthPeer to be added to the PeerSelector
*/ */
void addPeer(final EthPeer ethPeer); void addPeer(EthPeer ethPeer);
/** /**
* Remove the EthPeer identified by peerId from the PeerSelector * Remove the EthPeer identified by peerId from the PeerSelector
* *
* @param peerId the PeerId of the EthPeer to be removed from the PeerSelector * @param peerId the PeerId of the EthPeer to be removed from the PeerSelector
*/ */
void removePeer(final PeerId peerId); void removePeer(PeerId peerId);
} }

@ -18,14 +18,13 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -40,17 +39,15 @@ public class PeerTaskExecutor {
public static final int NO_RETRIES = 1; public static final int NO_RETRIES = 1;
private final PeerSelector peerSelector; private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender; private final PeerTaskRequestSender requestSender;
private final Supplier<ProtocolSpec> protocolSpecSupplier;
private final LabelledMetric<OperationTimer> requestTimer; private final LabelledMetric<OperationTimer> requestTimer;
public PeerTaskExecutor( public PeerTaskExecutor(
final PeerSelector peerSelector, final PeerSelector peerSelector,
final PeerTaskRequestSender requestSender, final PeerTaskRequestSender requestSender,
final Supplier<ProtocolSpec> protocolSpecSupplier,
final MetricsSystem metricsSystem) { final MetricsSystem metricsSystem) {
this.peerSelector = peerSelector; this.peerSelector = peerSelector;
this.requestSender = requestSender; this.requestSender = requestSender;
this.protocolSpecSupplier = protocolSpecSupplier;
requestTimer = requestTimer =
metricsSystem.createLabelledTimer( metricsSystem.createLabelledTimer(
BesuMetricCategory.PEERS, BesuMetricCategory.PEERS,
@ -65,23 +62,19 @@ public class PeerTaskExecutor {
peerTask.getPeerTaskBehaviors().contains(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS) peerTask.getPeerTaskBehaviors().contains(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS)
? RETRIES_WITH_OTHER_PEER ? RETRIES_WITH_OTHER_PEER
: NO_RETRIES; : NO_RETRIES;
final Collection<EthPeer> usedEthPeers = new ArrayList<>(); final Collection<EthPeer> usedEthPeers = new HashSet<>();
do { do {
EthPeer peer; EthPeer peer;
try { try {
peer = peer =
peerSelector.getPeer( peerSelector.getPeer(
(candidatePeer) -> usedEthPeers, peerTask.getRequiredBlockNumber(), peerTask.getSubProtocol());
isPeerUnused(candidatePeer, usedEthPeers)
&& (protocolSpecSupplier.get().isPoS()
|| isPeerHeightHighEnough(
candidatePeer, peerTask.getRequiredBlockNumber()))
&& isPeerProtocolSuitable(candidatePeer, peerTask.getSubProtocol()));
usedEthPeers.add(peer); usedEthPeers.add(peer);
executorResult = executeAgainstPeer(peerTask, peer); executorResult = executeAgainstPeer(peerTask, peer);
} catch (NoAvailablePeerException e) { } catch (NoAvailablePeerException e) {
executorResult = executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE); new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
} }
} while (--triesRemaining > 0 } while (--triesRemaining > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS); && executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS);
@ -103,7 +96,6 @@ public class PeerTaskExecutor {
: NO_RETRIES; : NO_RETRIES;
do { do {
try { try {
T result; T result;
try (final OperationTimer.TimingContext ignored = try (final OperationTimer.TimingContext ignored =
requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) { requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) {
@ -113,24 +105,30 @@ public class PeerTaskExecutor {
result = peerTask.parseResponse(responseMessageData); result = peerTask.parseResponse(responseMessageData);
} }
peer.recordUsefulResponse(); peer.recordUsefulResponse();
executorResult = new PeerTaskExecutorResult<>(Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS); executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS);
} catch (PeerConnection.PeerNotConnected e) { } catch (PeerConnection.PeerNotConnected e) {
executorResult = executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED); new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED);
} catch (InterruptedException | TimeoutException e) { } catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode()); peer.recordRequestTimeout(requestMessageData.getCode());
executorResult = new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT); executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT);
} catch (InvalidPeerTaskResponseException e) { } catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage()); peer.recordUselessResponse(e.getMessage());
executorResult = executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE); new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
} catch (ExecutionException e) { } catch (ExecutionException e) {
executorResult = executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);
} }
} while (--triesRemaining > 0 } while (--triesRemaining > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS && executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
@ -154,17 +152,4 @@ public class PeerTaskExecutor {
return false; return false;
} }
} }
private static boolean isPeerUnused(
final EthPeer ethPeer, final Collection<EthPeer> usedEthPeers) {
return !usedEthPeers.contains(ethPeer);
}
private static boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) {
return ethPeer.chainState().getEstimatedHeight() >= requiredHeight;
}
private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final SubProtocol protocol) {
return ethPeer.getProtocolName().equals(protocol.getName());
}
} }

@ -16,8 +16,5 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask;
import java.util.Optional; import java.util.Optional;
public record PeerTaskExecutorResult<T> ( public record PeerTaskExecutorResult<T>(
Optional<T> result, Optional<T> result, PeerTaskExecutorResponseCode responseCode) {}
PeerTaskExecutorResponseCode responseCode
)
{}

@ -14,19 +14,23 @@
*/ */
package org.hyperledger.besu.ethereum.eth.manager.peertask; package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.ChainState;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection; import org.hyperledger.besu.ethereum.eth.manager.PeerReputation;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.time.Clock; import java.util.HashSet;
import java.util.Collections;
import java.util.Set; import java.util.Set;
import org.apache.tuweni.bytes.Bytes;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class DefaultPeerSelectorTest { public class DefaultPeerSelectorTest {
@ -34,67 +38,98 @@ public class DefaultPeerSelectorTest {
@BeforeEach @BeforeEach
public void beforeTest() { public void beforeTest() {
peerSelector = new DefaultPeerSelector(); ProtocolSpec protocolSpec = Mockito.mock(ProtocolSpec.class);
Mockito.when(protocolSpec.isPoS()).thenReturn(false);
peerSelector = new DefaultPeerSelector(() -> protocolSpec);
} }
@Test @Test
public void testGetPeer() throws NoAvailablePeerException { public void testGetPeer() throws NoAvailablePeerException {
EthPeer protocol1With5ReputationPeer = EthPeer expectedPeer = createTestPeer(10, EthProtocol.get(), 5);
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5); peerSelector.addPeer(expectedPeer);
peerSelector.addPeer(protocol1With5ReputationPeer); EthPeer excludedForLowChainHeightPeer = createTestPeer(5, EthProtocol.get(), 50);
EthPeer protocol1With4ReputationPeer = peerSelector.addPeer(excludedForLowChainHeightPeer);
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4); EthPeer excludedForWrongProtocolPeer = createTestPeer(10, SnapProtocol.get(), 50);
peerSelector.addPeer(protocol1With4ReputationPeer); peerSelector.addPeer(excludedForWrongProtocolPeer);
EthPeer protocol2With50ReputationPeer = EthPeer excludedForLowReputationPeer = createTestPeer(10, EthProtocol.get(), 1);
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50); peerSelector.addPeer(excludedForLowReputationPeer);
peerSelector.addPeer(protocol2With50ReputationPeer); EthPeer excludedForBeingAlreadyUsedPeer = createTestPeer(10, EthProtocol.get(), 50);
EthPeer protocol2With4ReputationPeer = peerSelector.addPeer(excludedForBeingAlreadyUsedPeer);
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4);
peerSelector.addPeer(protocol2With4ReputationPeer); Set<EthPeer> usedEthPeers = new HashSet<>();
usedEthPeers.add(excludedForBeingAlreadyUsedPeer);
EthPeer result = peerSelector.getPeer((p) -> p.getProtocolName().equals("protocol1"));
EthPeer result = peerSelector.getPeer(usedEthPeers, 10, EthProtocol.get());
Assertions.assertSame(protocol1With5ReputationPeer, result);
Assertions.assertSame(expectedPeer, result);
} }
@Test @Test
public void testGetPeerButNoPeerMatchesFilter() { public void testGetPeerButNoPeerMatchesFilter() {
EthPeer protocol1With5ReputationPeer = EthPeer expectedPeer = createTestPeer(10, EthProtocol.get(), 5);
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5); peerSelector.addPeer(expectedPeer);
peerSelector.addPeer(protocol1With5ReputationPeer); EthPeer excludedForLowChainHeightPeer = createTestPeer(5, EthProtocol.get(), 50);
EthPeer protocol1With4ReputationPeer = peerSelector.addPeer(excludedForLowChainHeightPeer);
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4); EthPeer excludedForWrongProtocolPeer = createTestPeer(10, SnapProtocol.get(), 50);
peerSelector.addPeer(protocol1With4ReputationPeer); peerSelector.addPeer(excludedForWrongProtocolPeer);
EthPeer protocol2With50ReputationPeer = EthPeer excludedForLowReputationPeer = createTestPeer(10, EthProtocol.get(), 1);
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50); peerSelector.addPeer(excludedForLowReputationPeer);
peerSelector.addPeer(protocol2With50ReputationPeer); EthPeer excludedForBeingAlreadyUsedPeer = createTestPeer(10, EthProtocol.get(), 50);
EthPeer protocol2With4ReputationPeer = peerSelector.addPeer(excludedForBeingAlreadyUsedPeer);
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4);
peerSelector.addPeer(protocol2With4ReputationPeer); Set<EthPeer> usedEthPeers = new HashSet<>();
usedEthPeers.add(excludedForBeingAlreadyUsedPeer);
Assertions.assertThrows( Assertions.assertThrows(
NoAvailablePeerException.class, NoAvailablePeerException.class,
() -> peerSelector.getPeer((p) -> p.getProtocolName().equals("fake protocol"))); () -> peerSelector.getPeer(usedEthPeers, 10, new MockSubProtocol()));
} }
private EthPeer createTestPeer( private EthPeer createTestPeer(
final Set<Capability> connectionCapabilities, final long chainHeight, final SubProtocol protocol, final int reputation) {
final String protocolName, EthPeer ethPeer = Mockito.mock(EthPeer.class);
final int reputationAdjustment) { PeerConnection peerConnection = Mockito.mock(PeerConnection.class);
PeerConnection peerConnection = new MockPeerConnection(connectionCapabilities); Peer peer = Mockito.mock(Peer.class);
EthPeer peer = ChainState chainState = Mockito.mock(ChainState.class);
new EthPeer( PeerReputation peerReputation = Mockito.mock(PeerReputation.class);
peerConnection,
protocolName, Mockito.when(ethPeer.getConnection()).thenReturn(peerConnection);
null, Mockito.when(peerConnection.getPeer()).thenReturn(peer);
Collections.emptyList(), Mockito.when(ethPeer.getProtocolName()).thenReturn(protocol.getName());
1, Mockito.when(ethPeer.chainState()).thenReturn(chainState);
Clock.systemUTC(), Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight);
Collections.emptyList(), Mockito.when(ethPeer.getReputation()).thenReturn(peerReputation);
Bytes.EMPTY); Mockito.when(peerReputation.getScore()).thenReturn(reputation);
for (int i = 0; i < reputationAdjustment; i++) {
peer.getReputation().recordUsefulResponse(); Mockito.when(ethPeer.compareTo(Mockito.any(EthPeer.class)))
.thenAnswer(
(invocationOnMock) -> {
EthPeer otherPeer = invocationOnMock.getArgument(0, EthPeer.class);
return Integer.compare(reputation, otherPeer.getReputation().getScore());
});
return ethPeer;
}
private static class MockSubProtocol implements SubProtocol {
@Override
public String getName() {
return "Mock";
}
@Override
public int messageSpace(final int protocolVersion) {
throw new UnsupportedOperationException();
}
@Override
public boolean isValidMessageCode(final int protocolVersion, final int code) {
throw new UnsupportedOperationException();
}
@Override
public String messageName(final int protocolVersion, final int code) {
throw new UnsupportedOperationException();
} }
return peer;
} }
} }

@ -15,17 +15,16 @@
package org.hyperledger.besu.ethereum.eth.manager.peertask; package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
@ -38,7 +37,6 @@ import org.mockito.MockitoAnnotations;
public class PeerTaskExecutorTest { public class PeerTaskExecutorTest {
private @Mock PeerSelector peerSelector; private @Mock PeerSelector peerSelector;
private @Mock PeerTaskRequestSender requestSender; private @Mock PeerTaskRequestSender requestSender;
private @Mock ProtocolSpec protocolSpec;
private @Mock PeerTask<Object> peerTask; private @Mock PeerTask<Object> peerTask;
private @Mock SubProtocol subprotocol; private @Mock SubProtocol subprotocol;
private @Mock MessageData requestMessageData; private @Mock MessageData requestMessageData;
@ -53,7 +51,7 @@ public class PeerTaskExecutorTest {
mockCloser = MockitoAnnotations.openMocks(this); mockCloser = MockitoAnnotations.openMocks(this);
peerTaskExecutor = peerTaskExecutor =
new PeerTaskExecutor( new PeerTaskExecutor(
peerSelector, requestSender, () -> protocolSpec, new NoOpMetricsSystem()); peerSelector, requestSender, new NoOpMetricsSystem());
} }
@AfterEach @AfterEach
@ -140,8 +138,7 @@ public class PeerTaskExecutorTest {
Assertions.assertNotNull(result); Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isEmpty()); Assertions.assertTrue(result.result().isEmpty());
Assertions.assertEquals( Assertions.assertEquals(PeerTaskExecutorResponseCode.PEER_DISCONNECTED, result.responseCode());
PeerTaskExecutorResponseCode.PEER_DISCONNECTED, result.responseCode());
} }
@Test @Test
@ -192,8 +189,7 @@ public class PeerTaskExecutorTest {
Assertions.assertNotNull(result); Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isEmpty()); Assertions.assertTrue(result.result().isEmpty());
Assertions.assertEquals( Assertions.assertEquals(PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode());
PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode());
} }
@Test @Test
@ -207,10 +203,13 @@ public class PeerTaskExecutorTest {
NoAvailablePeerException { NoAvailablePeerException {
Object responseObject = new Object(); Object responseObject = new Object();
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); Mockito.when(
peerSelector.getPeer(Mockito.any(Collection.class), Mockito.eq(10L), Mockito.eq(subprotocol)))
.thenReturn(ethPeer);
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getRequiredBlockNumber()).thenReturn(10L);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
@ -240,15 +239,16 @@ public class PeerTaskExecutorTest {
int requestMessageDataCode = 123; int requestMessageDataCode = 123;
EthPeer peer2 = Mockito.mock(EthPeer.class); EthPeer peer2 = Mockito.mock(EthPeer.class);
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))) Mockito.when(
peerSelector.getPeer(Mockito.any(Collection.class), Mockito.eq(10L), Mockito.eq(subprotocol)))
.thenReturn(ethPeer) .thenReturn(ethPeer)
.thenReturn(peer2); .thenReturn(peer2);
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()) Mockito.when(peerTask.getPeerTaskBehaviors())
.thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS)); .thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS));
Mockito.when(peerTask.getRequiredBlockNumber()).thenReturn(10L);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException()); .thenThrow(new TimeoutException());
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);

Loading…
Cancel
Save