7311: Make changes as discussed in walkthrough meeting

Remove DefaultPeerSelector, make EthPeers implement PeerSelector interface, and add PeerTask.getPeerRequirementFilter

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7628/head
Matilda Clerke 2 months ago
parent 8718102277
commit e63f4730c6
  1. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  2. 102
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelector.java
  3. 30
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java
  4. 16
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java
  5. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  6. 135
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/DefaultPeerSelectorTest.java
  7. 21
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java

@ -17,6 +17,8 @@ package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
import org.hyperledger.besu.ethereum.eth.sync.SnapServerChecker;
@ -26,6 +28,7 @@ import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
@ -61,7 +64,7 @@ import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EthPeers {
public class EthPeers implements PeerSelector {
private static final Logger LOG = LoggerFactory.getLogger(EthPeers.class);
public static final Comparator<EthPeer> TOTAL_DIFFICULTY =
Comparator.comparing((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty());
@ -465,6 +468,18 @@ public class EthPeers {
this.trailingPeerRequirementsSupplier = tprSupplier;
}
// Part of the PeerSelector interface, to be split apart later
@Override
public EthPeer getPeer(final Predicate<EthPeer> filter) {
return streamBestPeers().filter(filter).findFirst().orElseThrow(NoAvailablePeersException::new);
}
// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {
return Optional.ofNullable(activeConnections.get(peerId.getId()));
}
@FunctionalInterface
public interface ConnectCallback {
void onPeerConnected(EthPeer newPeer);

@ -1,102 +0,0 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a simple PeerSelector implementation that can be used the default implementation in most
* situations
*/
public class DefaultPeerSelector implements PeerSelector {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPeerSelector.class);
private final Supplier<ProtocolSpec> protocolSpecSupplier;
private final Map<PeerId, EthPeer> ethPeersByPeerId = new ConcurrentHashMap<>();
public DefaultPeerSelector(final Supplier<ProtocolSpec> protocolSpecSupplier) {
this.protocolSpecSupplier = protocolSpecSupplier;
}
/**
* Gets the highest reputation peer matching the supplied filter
*
* @param filter a filter to match prospective peers with
* @return the highest reputation peer matching the supplies filter
* @throws NoAvailablePeerException If there are no suitable peers
*/
private EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException {
LOG.trace("Finding peer from pool of {} peers", ethPeersByPeerId.size());
return ethPeersByPeerId.values().stream()
.filter(filter)
.max(Comparator.naturalOrder())
.orElseThrow(NoAvailablePeerException::new);
}
@Override
public EthPeer getPeer(
final Collection<EthPeer> usedEthPeers,
final long requiredPeerHeight,
final SubProtocol requiredSubProtocol)
throws NoAvailablePeerException {
return getPeer(
(candidatePeer) ->
isPeerUnused(candidatePeer, usedEthPeers)
&& (protocolSpecSupplier.get().isPoS()
|| isPeerHeightHighEnough(candidatePeer, requiredPeerHeight))
&& isPeerProtocolSuitable(candidatePeer, requiredSubProtocol));
}
@Override
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {
return Optional.ofNullable(ethPeersByPeerId.get(peerId));
}
@Override
public void addPeer(final EthPeer ethPeer) {
ethPeersByPeerId.put(ethPeer.getConnection().getPeer(), ethPeer);
}
@Override
public void removePeer(final PeerId peerId) {
ethPeersByPeerId.remove(peerId);
}
private boolean isPeerUnused(final EthPeer ethPeer, final Collection<EthPeer> usedEthPeers) {
return !usedEthPeers.contains(ethPeer);
}
private boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) {
return ethPeer.chainState().getEstimatedHeight() >= requiredHeight;
}
private boolean isPeerProtocolSuitable(final EthPeer ethPeer, final SubProtocol protocol) {
return ethPeer.getProtocolName().equals(protocol.getName());
}
}

@ -16,28 +16,20 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Predicate;
/** Selects the EthPeers for the PeerTaskExecutor */
public interface PeerSelector {
/**
* Gets a peer with the requiredPeerHeight (if not PoS), and with the requiredSubProtocol, and
* which is not in the supplied collection of usedEthPeers
* Gets a peer matching the supplied filter
*
* @param usedEthPeers a collection of EthPeers to be excluded from selection because they have
* already been used
* @param requiredPeerHeight the minimum peer height required of the selected peer
* @param requiredSubProtocol the SubProtocol required of the peer
* @param filter a Predicate\<EthPeer\> matching desirable peers
* @return a peer matching the supplied conditions
* @throws NoAvailablePeerException If there are no suitable peers
*/
EthPeer getPeer(
Collection<EthPeer> usedEthPeers, long requiredPeerHeight, SubProtocol requiredSubProtocol)
throws NoAvailablePeerException;
EthPeer getPeer(final Predicate<EthPeer> filter);
/**
* Attempts to get the EthPeer identified by peerId
@ -47,18 +39,4 @@ public interface PeerSelector {
* PeerSelector, or empty otherwise
*/
Optional<EthPeer> getPeerByPeerId(PeerId peerId);
/**
* Add the supplied EthPeer to the PeerSelector
*
* @param ethPeer the EthPeer to be added to the PeerSelector
*/
void addPeer(EthPeer ethPeer);
/**
* Remove the EthPeer identified by peerId from the PeerSelector
*
* @param peerId the PeerId of the EthPeer to be removed from the PeerSelector
*/
void removePeer(PeerId peerId);
}

@ -14,10 +14,12 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.Collection;
import java.util.function.Predicate;
/**
* Represents a task to be executed on an EthPeer by the PeerTaskExecutor
@ -32,13 +34,6 @@ public interface PeerTask<T> {
*/
SubProtocol getSubProtocol();
/**
* Gets the minimum required block number for a peer to have to successfully execute this task
*
* @return the minimum required block number for a peer to have to successfully execute this task
*/
long getRequiredBlockNumber();
/**
* Gets the request data to send to the EthPeer
*
@ -61,4 +56,11 @@ public interface PeerTask<T> {
* @return the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor
*/
Collection<PeerTaskRetryBehavior> getPeerTaskBehaviors();
/**
* Gets a Predicate that checks if an EthPeer is suitable for this PeerTask
*
* @return a Predicate that checks if an EthPeer is suitable for this PeerTask
*/
Predicate<EthPeer> getPeerRequirementFilter();
}

@ -15,6 +15,8 @@
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory;
@ -37,15 +39,18 @@ public class PeerTaskExecutor {
public static final int NO_RETRIES = 1;
private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender;
private final EthScheduler ethScheduler;
private final LabelledMetric<OperationTimer> requestTimer;
public PeerTaskExecutor(
final PeerSelector peerSelector,
final PeerTaskRequestSender requestSender,
final EthScheduler ethScheduler,
final MetricsSystem metricsSystem) {
this.peerSelector = peerSelector;
this.requestSender = requestSender;
this.ethScheduler = ethScheduler;
requestTimer =
metricsSystem.createLabelledTimer(
BesuMetricCategory.PEERS,
@ -66,10 +71,12 @@ public class PeerTaskExecutor {
try {
peer =
peerSelector.getPeer(
usedEthPeers, peerTask.getRequiredBlockNumber(), peerTask.getSubProtocol());
(candidatePeer) ->
peerTask.getPeerRequirementFilter().test(candidatePeer)
&& !usedEthPeers.contains(candidatePeer));
usedEthPeers.add(peer);
executorResult = executeAgainstPeer(peerTask, peer);
} catch (NoAvailablePeerException e) {
} catch (NoAvailablePeersException e) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
@ -81,7 +88,8 @@ public class PeerTaskExecutor {
}
public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAsync(final PeerTask<T> peerTask) {
return CompletableFuture.supplyAsync(() -> execute(peerTask));
return ethScheduler.scheduleSyncWorkerTask(
() -> CompletableFuture.completedFuture(execute(peerTask)));
}
public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
@ -138,7 +146,8 @@ public class PeerTaskExecutor {
public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAgainstPeerAsync(
final PeerTask<T> peerTask, final EthPeer peer) {
return CompletableFuture.supplyAsync(() -> executeAgainstPeer(peerTask, peer));
return ethScheduler.scheduleSyncWorkerTask(
() -> CompletableFuture.completedFuture(executeAgainstPeer(peerTask, peer)));
}
private boolean sleepBetweenRetries() {

@ -1,135 +0,0 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.ChainState;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.PeerReputation;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class DefaultPeerSelectorTest {
public DefaultPeerSelector peerSelector;
@BeforeEach
public void beforeTest() {
ProtocolSpec protocolSpec = Mockito.mock(ProtocolSpec.class);
Mockito.when(protocolSpec.isPoS()).thenReturn(false);
peerSelector = new DefaultPeerSelector(() -> protocolSpec);
}
@Test
public void testGetPeer() throws NoAvailablePeerException {
EthPeer expectedPeer = createTestPeer(10, EthProtocol.get(), 5);
peerSelector.addPeer(expectedPeer);
EthPeer excludedForLowChainHeightPeer = createTestPeer(5, EthProtocol.get(), 50);
peerSelector.addPeer(excludedForLowChainHeightPeer);
EthPeer excludedForWrongProtocolPeer = createTestPeer(10, SnapProtocol.get(), 50);
peerSelector.addPeer(excludedForWrongProtocolPeer);
EthPeer excludedForLowReputationPeer = createTestPeer(10, EthProtocol.get(), 1);
peerSelector.addPeer(excludedForLowReputationPeer);
EthPeer excludedForBeingAlreadyUsedPeer = createTestPeer(10, EthProtocol.get(), 50);
peerSelector.addPeer(excludedForBeingAlreadyUsedPeer);
Set<EthPeer> usedEthPeers = new HashSet<>();
usedEthPeers.add(excludedForBeingAlreadyUsedPeer);
EthPeer result = peerSelector.getPeer(usedEthPeers, 10, EthProtocol.get());
Assertions.assertSame(expectedPeer, result);
}
@Test
public void testGetPeerButNoPeerMatchesFilter() {
EthPeer expectedPeer = createTestPeer(10, EthProtocol.get(), 5);
peerSelector.addPeer(expectedPeer);
EthPeer excludedForLowChainHeightPeer = createTestPeer(5, EthProtocol.get(), 50);
peerSelector.addPeer(excludedForLowChainHeightPeer);
EthPeer excludedForWrongProtocolPeer = createTestPeer(10, SnapProtocol.get(), 50);
peerSelector.addPeer(excludedForWrongProtocolPeer);
EthPeer excludedForLowReputationPeer = createTestPeer(10, EthProtocol.get(), 1);
peerSelector.addPeer(excludedForLowReputationPeer);
EthPeer excludedForBeingAlreadyUsedPeer = createTestPeer(10, EthProtocol.get(), 50);
peerSelector.addPeer(excludedForBeingAlreadyUsedPeer);
Set<EthPeer> usedEthPeers = new HashSet<>();
usedEthPeers.add(excludedForBeingAlreadyUsedPeer);
Assertions.assertThrows(
NoAvailablePeerException.class,
() -> peerSelector.getPeer(usedEthPeers, 10, new MockSubProtocol()));
}
private EthPeer createTestPeer(
final long chainHeight, final SubProtocol protocol, final int reputation) {
EthPeer ethPeer = Mockito.mock(EthPeer.class);
PeerConnection peerConnection = Mockito.mock(PeerConnection.class);
Peer peer = Mockito.mock(Peer.class);
ChainState chainState = Mockito.mock(ChainState.class);
PeerReputation peerReputation = Mockito.mock(PeerReputation.class);
Mockito.when(ethPeer.getConnection()).thenReturn(peerConnection);
Mockito.when(peerConnection.getPeer()).thenReturn(peer);
Mockito.when(ethPeer.getProtocolName()).thenReturn(protocol.getName());
Mockito.when(ethPeer.chainState()).thenReturn(chainState);
Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight);
Mockito.when(ethPeer.getReputation()).thenReturn(peerReputation);
Mockito.when(peerReputation.getScore()).thenReturn(reputation);
Mockito.when(ethPeer.compareTo(Mockito.any(EthPeer.class)))
.thenAnswer(
(invocationOnMock) -> {
EthPeer otherPeer = invocationOnMock.getArgument(0, EthPeer.class);
return Integer.compare(reputation, otherPeer.getReputation().getScore());
});
return ethPeer;
}
private static class MockSubProtocol implements SubProtocol {
@Override
public String getName() {
return "Mock";
}
@Override
public int messageSpace(final int protocolVersion) {
throw new UnsupportedOperationException();
}
@Override
public boolean isValidMessageCode(final int protocolVersion, final int code) {
throw new UnsupportedOperationException();
}
@Override
public String messageName(final int protocolVersion, final int code) {
throw new UnsupportedOperationException();
}
}
}

@ -15,16 +15,17 @@
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@ -37,6 +38,7 @@ import org.mockito.MockitoAnnotations;
public class PeerTaskExecutorTest {
private @Mock PeerSelector peerSelector;
private @Mock PeerTaskRequestSender requestSender;
private @Mock EthScheduler ethScheduler;
private @Mock PeerTask<Object> peerTask;
private @Mock SubProtocol subprotocol;
private @Mock MessageData requestMessageData;
@ -49,7 +51,8 @@ public class PeerTaskExecutorTest {
@BeforeEach
public void beforeTest() {
mockCloser = MockitoAnnotations.openMocks(this);
peerTaskExecutor = new PeerTaskExecutor(peerSelector, requestSender, new NoOpMetricsSystem());
peerTaskExecutor =
new PeerTaskExecutor(peerSelector, requestSender, ethScheduler, new NoOpMetricsSystem());
}
@AfterEach
@ -201,14 +204,10 @@ public class PeerTaskExecutorTest {
NoAvailablePeerException {
Object responseObject = new Object();
Mockito.when(
peerSelector.getPeer(
Mockito.any(Collection.class), Mockito.eq(10L), Mockito.eq(subprotocol)))
.thenReturn(ethPeer);
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer);
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getRequiredBlockNumber()).thenReturn(10L);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
@ -232,22 +231,18 @@ public class PeerTaskExecutorTest {
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException,
NoAvailablePeerException {
InvalidPeerTaskResponseException {
Object responseObject = new Object();
int requestMessageDataCode = 123;
EthPeer peer2 = Mockito.mock(EthPeer.class);
Mockito.when(
peerSelector.getPeer(
Mockito.any(Collection.class), Mockito.eq(10L), Mockito.eq(subprotocol)))
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class)))
.thenReturn(ethPeer)
.thenReturn(peer2);
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors())
.thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS));
Mockito.when(peerTask.getRequiredBlockNumber()).thenReturn(10L);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException());

Loading…
Cancel
Save