7311: Use SubProtocol instead of subprotocol name string in PeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7628/head
Matilda Clerke 2 months ago
parent b0f2ed024f
commit 598b519c08
  1. 3
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java
  2. 9
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  3. 5
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java
  4. 17
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java
  5. 6
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java

@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.manager.peertask; package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.Collection; import java.util.Collection;
@ -29,7 +30,7 @@ public interface PeerTask<T> {
* *
* @return the SubProtocol used for this PeerTask * @return the SubProtocol used for this PeerTask
*/ */
String getSubProtocol(); SubProtocol getSubProtocol();
/** /**
* Gets the minimum required block number for a peer to have to successfully execute this task * Gets the minimum required block number for a peer to have to successfully execute this task

@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
@ -95,11 +96,11 @@ public class PeerTaskExecutor {
do { do {
try { try {
T result ; T result;
try (final OperationTimer.TimingContext timingContext = try (final OperationTimer.TimingContext timingContext =
requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) { requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) {
MessageData responseMessageData = MessageData responseMessageData =
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer); requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);
result = peerTask.parseResponse(responseMessageData); result = peerTask.parseResponse(responseMessageData);
} }
@ -155,7 +156,7 @@ public class PeerTaskExecutor {
return ethPeer.chainState().getEstimatedHeight() >= requiredHeight; return ethPeer.chainState().getEstimatedHeight() >= requiredHeight;
} }
private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final String protocol) { private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final SubProtocol protocol) {
return ethPeer.getProtocolName().equals(protocol); return ethPeer.getProtocolName().equals(protocol.getName());
} }
} }

@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager.ResponseStream; import org.hyperledger.besu.ethereum.eth.manager.RequestManager.ResponseStream;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -38,13 +39,13 @@ public class PeerTaskRequestSender {
} }
public MessageData sendRequest( public MessageData sendRequest(
final String subProtocol, final MessageData requestMessageData, final EthPeer ethPeer) final SubProtocol subProtocol, final MessageData requestMessageData, final EthPeer ethPeer)
throws PeerConnection.PeerNotConnected, throws PeerConnection.PeerNotConnected,
ExecutionException, ExecutionException,
InterruptedException, InterruptedException,
TimeoutException { TimeoutException {
ResponseStream responseStream = ResponseStream responseStream =
ethPeer.send(requestMessageData, subProtocol, ethPeer.getConnection()); ethPeer.send(requestMessageData, subProtocol.getName(), ethPeer.getConnection());
final CompletableFuture<MessageData> responseMessageDataFuture = new CompletableFuture<>(); final CompletableFuture<MessageData> responseMessageDataFuture = new CompletableFuture<>();
responseStream.then( responseStream.then(
(boolean streamClosed, MessageData message, EthPeer peer) -> { (boolean streamClosed, MessageData message, EthPeer peer) -> {

@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Collections; import java.util.Collections;
@ -39,6 +40,7 @@ public class PeerTaskExecutorTest {
private @Mock PeerTaskRequestSender requestSender; private @Mock PeerTaskRequestSender requestSender;
private @Mock ProtocolSpec protocolSpec; private @Mock ProtocolSpec protocolSpec;
private @Mock PeerTask<Object> peerTask; private @Mock PeerTask<Object> peerTask;
private @Mock SubProtocol subprotocol;
private @Mock MessageData requestMessageData; private @Mock MessageData requestMessageData;
private @Mock MessageData responseMessageData; private @Mock MessageData responseMessageData;
private @Mock EthPeer ethPeer; private @Mock EthPeer ethPeer;
@ -66,12 +68,13 @@ public class PeerTaskExecutorTest {
InterruptedException, InterruptedException,
TimeoutException, TimeoutException,
InvalidPeerTaskResponseException { InvalidPeerTaskResponseException {
String subprotocol = "subprotocol";
Object responseObject = new Object(); Object responseObject = new Object();
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData); .thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
@ -93,7 +96,6 @@ public class PeerTaskExecutorTest {
InterruptedException, InterruptedException,
TimeoutException, TimeoutException,
InvalidPeerTaskResponseException { InvalidPeerTaskResponseException {
String subprotocol = "subprotocol";
Object responseObject = new Object(); Object responseObject = new Object();
int requestMessageDataCode = 123; int requestMessageDataCode = 123;
@ -102,6 +104,7 @@ public class PeerTaskExecutorTest {
.thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_SAME_PEER)); .thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_SAME_PEER));
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException()) .thenThrow(new TimeoutException())
.thenReturn(responseMessageData); .thenReturn(responseMessageData);
@ -125,11 +128,11 @@ public class PeerTaskExecutorTest {
ExecutionException, ExecutionException,
InterruptedException, InterruptedException,
TimeoutException { TimeoutException {
String subprotocol = "subprotocol";
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new PeerConnection.PeerNotConnected("")); .thenThrow(new PeerConnection.PeerNotConnected(""));
@ -147,12 +150,12 @@ public class PeerTaskExecutorTest {
ExecutionException, ExecutionException,
InterruptedException, InterruptedException,
TimeoutException { TimeoutException {
String subprotocol = "subprotocol";
int requestMessageDataCode = 123; int requestMessageDataCode = 123;
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException()); .thenThrow(new TimeoutException());
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
@ -173,11 +176,11 @@ public class PeerTaskExecutorTest {
InterruptedException, InterruptedException,
TimeoutException, TimeoutException,
InvalidPeerTaskResponseException { InvalidPeerTaskResponseException {
String subprotocol = "subprotocol";
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData); .thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)) Mockito.when(peerTask.parseResponse(responseMessageData))
@ -202,7 +205,6 @@ public class PeerTaskExecutorTest {
TimeoutException, TimeoutException,
InvalidPeerTaskResponseException, InvalidPeerTaskResponseException,
NoAvailablePeerException { NoAvailablePeerException {
String subprotocol = "subprotocol";
Object responseObject = new Object(); Object responseObject = new Object();
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer);
@ -210,6 +212,7 @@ public class PeerTaskExecutorTest {
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList());
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData); .thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
@ -233,7 +236,6 @@ public class PeerTaskExecutorTest {
TimeoutException, TimeoutException,
InvalidPeerTaskResponseException, InvalidPeerTaskResponseException,
NoAvailablePeerException { NoAvailablePeerException {
String subprotocol = "subprotocol";
Object responseObject = new Object(); Object responseObject = new Object();
int requestMessageDataCode = 123; int requestMessageDataCode = 123;
EthPeer peer2 = Mockito.mock(EthPeer.class); EthPeer peer2 = Mockito.mock(EthPeer.class);
@ -246,6 +248,7 @@ public class PeerTaskExecutorTest {
Mockito.when(peerTask.getPeerTaskBehaviors()) Mockito.when(peerTask.getPeerTaskBehaviors())
.thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS)); .thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS));
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException()); .thenThrow(new TimeoutException());
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);

@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager; import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -40,7 +41,7 @@ public class PeerTaskRequestSenderTest {
@Test @Test
public void testSendRequest() public void testSendRequest()
throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException { throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException {
String subprotocol = "subprotocol"; SubProtocol subprotocol = Mockito.mock(SubProtocol.class);
MessageData requestMessageData = Mockito.mock(MessageData.class); MessageData requestMessageData = Mockito.mock(MessageData.class);
MessageData responseMessageData = Mockito.mock(MessageData.class); MessageData responseMessageData = Mockito.mock(MessageData.class);
EthPeer peer = Mockito.mock(EthPeer.class); EthPeer peer = Mockito.mock(EthPeer.class);
@ -49,7 +50,8 @@ public class PeerTaskRequestSenderTest {
Mockito.mock(RequestManager.ResponseStream.class); Mockito.mock(RequestManager.ResponseStream.class);
Mockito.when(peer.getConnection()).thenReturn(peerConnection); Mockito.when(peer.getConnection()).thenReturn(peerConnection);
Mockito.when(peer.send(requestMessageData, subprotocol, peerConnection)) Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(peer.send(requestMessageData, "subprotocol", peerConnection))
.thenReturn(responseStream); .thenReturn(responseStream);
CompletableFuture<MessageData> actualResponseMessageDataFuture = CompletableFuture<MessageData> actualResponseMessageDataFuture =

Loading…
Cancel
Save