Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311-add-GetReceiptsFromPeerTask

pull/7638/head
Matilda Clerke 2 months ago
commit 0e76000d27
  1. 22
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java
  2. 18
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java
  3. 16
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  4. 34
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java
  5. 8
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java

@ -17,30 +17,34 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* This is a simple PeerManager implementation that can be used the default implementation in most * This is a simple PeerSelector implementation that can be used the default implementation in most
* situations * situations
*/ */
public class DefaultPeerManager implements PeerManager { public class DefaultPeerSelector implements PeerSelector {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPeerManager.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultPeerSelector.class);
// use a synchronized map to ensure the map is never modified by multiple threads at once private final Map<PeerId, EthPeer> ethPeersByPeerId = new ConcurrentHashMap<>();
private final Map<PeerId, EthPeer> ethPeersByPeerId =
Collections.synchronizedMap(new HashMap<>());
/**
* Gets the highest reputation peer matching the supplied filter
*
* @param filter a filter to match prospective peers with
* @return the highest reputation peer matching the supplies filter
* @throws NoAvailablePeerException If there are no suitable peers
*/
@Override @Override
public EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException { public EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException {
LOG.trace("Getting peer from pool of {} peers", ethPeersByPeerId.size()); LOG.trace("Finding peer from pool of {} peers", ethPeersByPeerId.size());
return ethPeersByPeerId.values().stream() return ethPeersByPeerId.values().stream()
.filter(filter) .filter(filter)
.max(Comparator.naturalOrder()) .max(Comparator.naturalOrder())

@ -20,14 +20,14 @@ import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import java.util.Optional; import java.util.Optional;
import java.util.function.Predicate; import java.util.function.Predicate;
/** "Manages" the EthPeers for the PeerTaskExecutor */ /** Selects the EthPeers for the PeerTaskExecutor */
public interface PeerManager { public interface PeerSelector {
/** /**
* Gets the highest reputation peer matching the supplies filter * Gets a peer matching the supplied filter
* *
* @param filter a filter to match prospective peers with * @param filter a filter to match prospective peers with
* @return the highest reputation peer matching the supplies filter * @return a peer matching the supplied filter
* @throws NoAvailablePeerException If there are no suitable peers * @throws NoAvailablePeerException If there are no suitable peers
*/ */
EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException; EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException;
@ -37,21 +37,21 @@ public interface PeerManager {
* *
* @param peerId the peerId of the desired EthPeer * @param peerId the peerId of the desired EthPeer
* @return An Optional\<EthPeer\> containing the EthPeer identified by peerId if present in the * @return An Optional\<EthPeer\> containing the EthPeer identified by peerId if present in the
* PeerManager, or empty otherwise * PeerSelector, or empty otherwise
*/ */
Optional<EthPeer> getPeerByPeerId(final PeerId peerId); Optional<EthPeer> getPeerByPeerId(final PeerId peerId);
/** /**
* Add the supplied EthPeer to the PeerManager * Add the supplied EthPeer to the PeerSelector
* *
* @param ethPeer the EthPeer to be added to the PeerManager * @param ethPeer the EthPeer to be added to the PeerSelector
*/ */
void addPeer(final EthPeer ethPeer); void addPeer(final EthPeer ethPeer);
/** /**
* Remove the EthPeer identified by peerId from the PeerManager * Remove the EthPeer identified by peerId from the PeerSelector
* *
* @param peerId the PeerId of the EthPeer to be removed from the PeerManager * @param peerId the PeerId of the EthPeer to be removed from the PeerSelector
*/ */
void removePeer(final PeerId peerId); void removePeer(final PeerId peerId);
} }

@ -32,19 +32,18 @@ import java.util.function.Supplier;
/** Manages the execution of PeerTasks, respecting their PeerTaskBehavior */ /** Manages the execution of PeerTasks, respecting their PeerTaskBehavior */
public class PeerTaskExecutor { public class PeerTaskExecutor {
private static final long[] WAIT_TIME_BEFORE_RETRY = {0, 20000, 5000};
private final PeerManager peerManager; private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender; private final PeerTaskRequestSender requestSender;
private final Supplier<ProtocolSpec> protocolSpecSupplier; private final Supplier<ProtocolSpec> protocolSpecSupplier;
private final LabelledMetric<OperationTimer> requestTimer; private final LabelledMetric<OperationTimer> requestTimer;
public PeerTaskExecutor( public PeerTaskExecutor(
final PeerManager peerManager, final PeerSelector peerSelector,
final PeerTaskRequestSender requestSender, final PeerTaskRequestSender requestSender,
final Supplier<ProtocolSpec> protocolSpecSupplier, final Supplier<ProtocolSpec> protocolSpecSupplier,
final MetricsSystem metricsSystem) { final MetricsSystem metricsSystem) {
this.peerManager = peerManager; this.peerSelector = peerSelector;
this.requestSender = requestSender; this.requestSender = requestSender;
this.protocolSpecSupplier = protocolSpecSupplier; this.protocolSpecSupplier = protocolSpecSupplier;
requestTimer = requestTimer =
@ -64,7 +63,7 @@ public class PeerTaskExecutor {
EthPeer peer; EthPeer peer;
try { try {
peer = peer =
peerManager.getPeer( peerSelector.getPeer(
(candidatePeer) -> (candidatePeer) ->
isPeerUnused(candidatePeer, usedEthPeers) isPeerUnused(candidatePeer, usedEthPeers)
&& (protocolSpecSupplier.get().isPoS() && (protocolSpecSupplier.get().isPoS()
@ -126,7 +125,7 @@ public class PeerTaskExecutor {
} while (--triesRemaining > 0 } while (--triesRemaining > 0
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS && executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED && executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED
&& sleepBetweenRetries(WAIT_TIME_BEFORE_RETRY[triesRemaining])); && sleepBetweenRetries());
return executorResult; return executorResult;
} }
@ -136,9 +135,10 @@ public class PeerTaskExecutor {
return CompletableFuture.supplyAsync(() -> executeAgainstPeer(peerTask, peer)); return CompletableFuture.supplyAsync(() -> executeAgainstPeer(peerTask, peer));
} }
private boolean sleepBetweenRetries(final long sleepTime) { private boolean sleepBetweenRetries() {
try { try {
Thread.sleep(sleepTime); //sleep for 1 second to match implemented wait between retries in AbstractRetryingPeerTask
Thread.sleep(1000);
return true; return true;
} catch (InterruptedException e) { } catch (InterruptedException e) {
return false; return false;

@ -28,35 +28,53 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
public class DefaultPeerManagerTest { public class DefaultPeerSelectorTest {
public DefaultPeerManager peerManager; public DefaultPeerSelector peerSelector;
@BeforeEach @BeforeEach
public void beforeTest() { public void beforeTest() {
peerManager = new DefaultPeerManager(); peerSelector = new DefaultPeerSelector();
} }
@Test @Test
public void testGetPeer() throws NoAvailablePeerException { public void testGetPeer() throws NoAvailablePeerException {
EthPeer protocol1With5ReputationPeer = EthPeer protocol1With5ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5); createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5);
peerManager.addPeer(protocol1With5ReputationPeer); peerSelector.addPeer(protocol1With5ReputationPeer);
EthPeer protocol1With4ReputationPeer = EthPeer protocol1With4ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4); createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4);
peerManager.addPeer(protocol1With4ReputationPeer); peerSelector.addPeer(protocol1With4ReputationPeer);
EthPeer protocol2With50ReputationPeer = EthPeer protocol2With50ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50); createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50);
peerManager.addPeer(protocol2With50ReputationPeer); peerSelector.addPeer(protocol2With50ReputationPeer);
EthPeer protocol2With4ReputationPeer = EthPeer protocol2With4ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4); createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4);
peerManager.addPeer(protocol2With4ReputationPeer); peerSelector.addPeer(protocol2With4ReputationPeer);
EthPeer result = peerManager.getPeer((p) -> p.getProtocolName().equals("protocol1")); EthPeer result = peerSelector.getPeer((p) -> p.getProtocolName().equals("protocol1"));
Assertions.assertSame(protocol1With5ReputationPeer, result); Assertions.assertSame(protocol1With5ReputationPeer, result);
} }
@Test
public void testGetPeerButNoPeerMatchesFilter() {
EthPeer protocol1With5ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5);
peerSelector.addPeer(protocol1With5ReputationPeer);
EthPeer protocol1With4ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4);
peerSelector.addPeer(protocol1With4ReputationPeer);
EthPeer protocol2With50ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50);
peerSelector.addPeer(protocol2With50ReputationPeer);
EthPeer protocol2With4ReputationPeer =
createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4);
peerSelector.addPeer(protocol2With4ReputationPeer);
Assertions.assertThrows(NoAvailablePeerException.class, () -> peerSelector.getPeer((p) -> p.getProtocolName().equals("fake protocol")));
}
private EthPeer createTestPeer( private EthPeer createTestPeer(
final Set<Capability> connectionCapabilities, final Set<Capability> connectionCapabilities,
final String protocolName, final String protocolName,

@ -35,7 +35,7 @@ import org.mockito.Mockito;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
public class PeerTaskExecutorTest { public class PeerTaskExecutorTest {
private @Mock PeerManager peerManager; private @Mock PeerSelector peerSelector;
private @Mock PeerTaskRequestSender requestSender; private @Mock PeerTaskRequestSender requestSender;
private @Mock ProtocolSpec protocolSpec; private @Mock ProtocolSpec protocolSpec;
private @Mock PeerTask<Object> peerTask; private @Mock PeerTask<Object> peerTask;
@ -51,7 +51,7 @@ public class PeerTaskExecutorTest {
mockCloser = MockitoAnnotations.openMocks(this); mockCloser = MockitoAnnotations.openMocks(this);
peerTaskExecutor = peerTaskExecutor =
new PeerTaskExecutor( new PeerTaskExecutor(
peerManager, requestSender, () -> protocolSpec, new NoOpMetricsSystem()); peerSelector, requestSender, () -> protocolSpec, new NoOpMetricsSystem());
} }
@AfterEach @AfterEach
@ -205,7 +205,7 @@ public class PeerTaskExecutorTest {
String subprotocol = "subprotocol"; String subprotocol = "subprotocol";
Object responseObject = new Object(); Object responseObject = new Object();
Mockito.when(peerManager.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer);
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
@ -238,7 +238,7 @@ public class PeerTaskExecutorTest {
int requestMessageDataCode = 123; int requestMessageDataCode = 123;
EthPeer peer2 = Mockito.mock(EthPeer.class); EthPeer peer2 = Mockito.mock(EthPeer.class);
Mockito.when(peerManager.getPeer(Mockito.any(Predicate.class))) Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class)))
.thenReturn(ethPeer) .thenReturn(ethPeer)
.thenReturn(peer2); .thenReturn(peer2);

Loading…
Cancel
Save