diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java similarity index 72% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerManager.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java index 9424ee0b48..c334773d16 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java @@ -17,30 +17,34 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.p2p.peers.PeerId; -import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This is a simple PeerManager implementation that can be used the default implementation in most + * This is a simple PeerSelector implementation that can be used the default implementation in most * situations */ -public class DefaultPeerManager implements PeerManager { - private static final Logger LOG = LoggerFactory.getLogger(DefaultPeerManager.class); +public class DefaultPeerSelector implements PeerSelector { + private static final Logger LOG = LoggerFactory.getLogger(DefaultPeerSelector.class); - // use a synchronized map to ensure the map is never modified by multiple threads at once - private final Map ethPeersByPeerId = - Collections.synchronizedMap(new HashMap<>()); + private final Map ethPeersByPeerId = new ConcurrentHashMap<>(); + /** + * Gets the highest reputation peer matching the supplied filter + * + * @param filter a filter to match prospective peers with + * @return the highest reputation peer matching the supplies filter + * @throws NoAvailablePeerException If there are no suitable peers + */ @Override public EthPeer getPeer(final Predicate filter) throws NoAvailablePeerException { - LOG.trace("Getting peer from pool of {} peers", ethPeersByPeerId.size()); + LOG.trace("Finding peer from pool of {} peers", ethPeersByPeerId.size()); return ethPeersByPeerId.values().stream() .filter(filter) .max(Comparator.naturalOrder()) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java similarity index 77% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManager.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java index 14323474a1..3f5589f93b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java @@ -20,14 +20,14 @@ import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import java.util.Optional; import java.util.function.Predicate; -/** "Manages" the EthPeers for the PeerTaskExecutor */ -public interface PeerManager { +/** Selects the EthPeers for the PeerTaskExecutor */ +public interface PeerSelector { /** - * Gets the highest reputation peer matching the supplies filter + * Gets a peer matching the supplied filter * * @param filter a filter to match prospective peers with - * @return the highest reputation peer matching the supplies filter + * @return a peer matching the supplied filter * @throws NoAvailablePeerException If there are no suitable peers */ EthPeer getPeer(final Predicate filter) throws NoAvailablePeerException; @@ -37,21 +37,21 @@ public interface PeerManager { * * @param peerId the peerId of the desired EthPeer * @return An Optional\ containing the EthPeer identified by peerId if present in the - * PeerManager, or empty otherwise + * PeerSelector, or empty otherwise */ Optional getPeerByPeerId(final PeerId peerId); /** - * Add the supplied EthPeer to the PeerManager + * Add the supplied EthPeer to the PeerSelector * - * @param ethPeer the EthPeer to be added to the PeerManager + * @param ethPeer the EthPeer to be added to the PeerSelector */ void addPeer(final EthPeer ethPeer); /** - * Remove the EthPeer identified by peerId from the PeerManager + * Remove the EthPeer identified by peerId from the PeerSelector * - * @param peerId the PeerId of the EthPeer to be removed from the PeerManager + * @param peerId the PeerId of the EthPeer to be removed from the PeerSelector */ void removePeer(final PeerId peerId); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index da9969686c..98e1343b3f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -32,19 +32,18 @@ import java.util.function.Supplier; /** Manages the execution of PeerTasks, respecting their PeerTaskBehavior */ public class PeerTaskExecutor { - private static final long[] WAIT_TIME_BEFORE_RETRY = {0, 20000, 5000}; - private final PeerManager peerManager; + private final PeerSelector peerSelector; private final PeerTaskRequestSender requestSender; private final Supplier protocolSpecSupplier; private final LabelledMetric requestTimer; public PeerTaskExecutor( - final PeerManager peerManager, + final PeerSelector peerSelector, final PeerTaskRequestSender requestSender, final Supplier protocolSpecSupplier, final MetricsSystem metricsSystem) { - this.peerManager = peerManager; + this.peerSelector = peerSelector; this.requestSender = requestSender; this.protocolSpecSupplier = protocolSpecSupplier; requestTimer = @@ -64,7 +63,7 @@ public class PeerTaskExecutor { EthPeer peer; try { peer = - peerManager.getPeer( + peerSelector.getPeer( (candidatePeer) -> isPeerUnused(candidatePeer, usedEthPeers) && (protocolSpecSupplier.get().isPoS() @@ -126,7 +125,7 @@ public class PeerTaskExecutor { } while (--triesRemaining > 0 && executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS && executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED - && sleepBetweenRetries(WAIT_TIME_BEFORE_RETRY[triesRemaining])); + && sleepBetweenRetries()); return executorResult; } @@ -136,9 +135,10 @@ public class PeerTaskExecutor { return CompletableFuture.supplyAsync(() -> executeAgainstPeer(peerTask, peer)); } - private boolean sleepBetweenRetries(final long sleepTime) { + private boolean sleepBetweenRetries() { try { - Thread.sleep(sleepTime); + //sleep for 1 second to match implemented wait between retries in AbstractRetryingPeerTask + Thread.sleep(1000); return true; } catch (InterruptedException e) { return false; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java similarity index 64% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerManagerTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java index 5aa04f8f9a..16d53f474b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java @@ -28,35 +28,53 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class DefaultPeerManagerTest { +public class DefaultPeerSelectorTest { - public DefaultPeerManager peerManager; + public DefaultPeerSelector peerSelector; @BeforeEach public void beforeTest() { - peerManager = new DefaultPeerManager(); + peerSelector = new DefaultPeerSelector(); } @Test public void testGetPeer() throws NoAvailablePeerException { EthPeer protocol1With5ReputationPeer = createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5); - peerManager.addPeer(protocol1With5ReputationPeer); + peerSelector.addPeer(protocol1With5ReputationPeer); EthPeer protocol1With4ReputationPeer = createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4); - peerManager.addPeer(protocol1With4ReputationPeer); + peerSelector.addPeer(protocol1With4ReputationPeer); EthPeer protocol2With50ReputationPeer = createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50); - peerManager.addPeer(protocol2With50ReputationPeer); + peerSelector.addPeer(protocol2With50ReputationPeer); EthPeer protocol2With4ReputationPeer = createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4); - peerManager.addPeer(protocol2With4ReputationPeer); + peerSelector.addPeer(protocol2With4ReputationPeer); - EthPeer result = peerManager.getPeer((p) -> p.getProtocolName().equals("protocol1")); + EthPeer result = peerSelector.getPeer((p) -> p.getProtocolName().equals("protocol1")); Assertions.assertSame(protocol1With5ReputationPeer, result); } + @Test + public void testGetPeerButNoPeerMatchesFilter() { + EthPeer protocol1With5ReputationPeer = + createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5); + peerSelector.addPeer(protocol1With5ReputationPeer); + EthPeer protocol1With4ReputationPeer = + createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4); + peerSelector.addPeer(protocol1With4ReputationPeer); + EthPeer protocol2With50ReputationPeer = + createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50); + peerSelector.addPeer(protocol2With50ReputationPeer); + EthPeer protocol2With4ReputationPeer = + createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4); + peerSelector.addPeer(protocol2With4ReputationPeer); + + Assertions.assertThrows(NoAvailablePeerException.class, () -> peerSelector.getPeer((p) -> p.getProtocolName().equals("fake protocol"))); + } + private EthPeer createTestPeer( final Set connectionCapabilities, final String protocolName, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java index 2c015425f1..930f4325b6 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -35,7 +35,7 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; public class PeerTaskExecutorTest { - private @Mock PeerManager peerManager; + private @Mock PeerSelector peerSelector; private @Mock PeerTaskRequestSender requestSender; private @Mock ProtocolSpec protocolSpec; private @Mock PeerTask peerTask; @@ -51,7 +51,7 @@ public class PeerTaskExecutorTest { mockCloser = MockitoAnnotations.openMocks(this); peerTaskExecutor = new PeerTaskExecutor( - peerManager, requestSender, () -> protocolSpec, new NoOpMetricsSystem()); + peerSelector, requestSender, () -> protocolSpec, new NoOpMetricsSystem()); } @AfterEach @@ -205,7 +205,7 @@ public class PeerTaskExecutorTest { String subprotocol = "subprotocol"; Object responseObject = new Object(); - Mockito.when(peerManager.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); + Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); @@ -238,7 +238,7 @@ public class PeerTaskExecutorTest { int requestMessageDataCode = 123; EthPeer peer2 = Mockito.mock(EthPeer.class); - Mockito.when(peerManager.getPeer(Mockito.any(Predicate.class))) + Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))) .thenReturn(ethPeer) .thenReturn(peer2);