diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java index 244908c921..7ae78754b2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import java.util.Collection; @@ -29,7 +30,7 @@ public interface PeerTask { * * @return the SubProtocol used for this PeerTask */ - String getSubProtocol(); + SubProtocol getSubProtocol(); /** * Gets the minimum required block number for a peer to have to successfully execute this task diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index 76d5eb3839..bab1a1f0bd 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; @@ -95,11 +96,11 @@ public class PeerTaskExecutor { do { try { - T result ; + T result; try (final OperationTimer.TimingContext timingContext = requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) { MessageData responseMessageData = - requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer); + requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer); result = peerTask.parseResponse(responseMessageData); } @@ -155,7 +156,7 @@ public class PeerTaskExecutor { return ethPeer.chainState().getEstimatedHeight() >= requiredHeight; } - private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final String protocol) { - return ethPeer.getProtocolName().equals(protocol); + private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final SubProtocol protocol) { + return ethPeer.getProtocolName().equals(protocol.getName()); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java index 77ff5e7251..7a597eca8e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.RequestManager.ResponseStream; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -38,13 +39,13 @@ public class PeerTaskRequestSender { } public MessageData sendRequest( - final String subProtocol, final MessageData requestMessageData, final EthPeer ethPeer) + final SubProtocol subProtocol, final MessageData requestMessageData, final EthPeer ethPeer) throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException, TimeoutException { ResponseStream responseStream = - ethPeer.send(requestMessageData, subProtocol, ethPeer.getConnection()); + ethPeer.send(requestMessageData, subProtocol.getName(), ethPeer.getConnection()); final CompletableFuture responseMessageDataFuture = new CompletableFuture<>(); responseStream.then( (boolean streamClosed, MessageData message, EthPeer peer) -> { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java index 930f4325b6..413b68f77c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import java.util.Collections; @@ -39,6 +40,7 @@ public class PeerTaskExecutorTest { private @Mock PeerTaskRequestSender requestSender; private @Mock ProtocolSpec protocolSpec; private @Mock PeerTask peerTask; + private @Mock SubProtocol subprotocol; private @Mock MessageData requestMessageData; private @Mock MessageData responseMessageData; private @Mock EthPeer ethPeer; @@ -66,12 +68,13 @@ public class PeerTaskExecutorTest { InterruptedException, TimeoutException, InvalidPeerTaskResponseException { - String subprotocol = "subprotocol"; + Object responseObject = new Object(); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenReturn(responseMessageData); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); @@ -93,7 +96,6 @@ public class PeerTaskExecutorTest { InterruptedException, TimeoutException, InvalidPeerTaskResponseException { - String subprotocol = "subprotocol"; Object responseObject = new Object(); int requestMessageDataCode = 123; @@ -102,6 +104,7 @@ public class PeerTaskExecutorTest { .thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_SAME_PEER)); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenThrow(new TimeoutException()) .thenReturn(responseMessageData); @@ -125,11 +128,11 @@ public class PeerTaskExecutorTest { ExecutionException, InterruptedException, TimeoutException { - String subprotocol = "subprotocol"; Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenThrow(new PeerConnection.PeerNotConnected("")); @@ -147,12 +150,12 @@ public class PeerTaskExecutorTest { ExecutionException, InterruptedException, TimeoutException { - String subprotocol = "subprotocol"; int requestMessageDataCode = 123; Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenThrow(new TimeoutException()); Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); @@ -173,11 +176,11 @@ public class PeerTaskExecutorTest { InterruptedException, TimeoutException, InvalidPeerTaskResponseException { - String subprotocol = "subprotocol"; Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenReturn(responseMessageData); Mockito.when(peerTask.parseResponse(responseMessageData)) @@ -202,7 +205,6 @@ public class PeerTaskExecutorTest { TimeoutException, InvalidPeerTaskResponseException, NoAvailablePeerException { - String subprotocol = "subprotocol"; Object responseObject = new Object(); Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); @@ -210,6 +212,7 @@ public class PeerTaskExecutorTest { Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenReturn(responseMessageData); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); @@ -233,7 +236,6 @@ public class PeerTaskExecutorTest { TimeoutException, InvalidPeerTaskResponseException, NoAvailablePeerException { - String subprotocol = "subprotocol"; Object responseObject = new Object(); int requestMessageDataCode = 123; EthPeer peer2 = Mockito.mock(EthPeer.class); @@ -246,6 +248,7 @@ public class PeerTaskExecutorTest { Mockito.when(peerTask.getPeerTaskBehaviors()) .thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS)); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenThrow(new TimeoutException()); Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java index 8bc52604db..4041fb6303 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.RequestManager; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -40,7 +41,7 @@ public class PeerTaskRequestSenderTest { @Test public void testSendRequest() throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException { - String subprotocol = "subprotocol"; + SubProtocol subprotocol = Mockito.mock(SubProtocol.class); MessageData requestMessageData = Mockito.mock(MessageData.class); MessageData responseMessageData = Mockito.mock(MessageData.class); EthPeer peer = Mockito.mock(EthPeer.class); @@ -49,7 +50,8 @@ public class PeerTaskRequestSenderTest { Mockito.mock(RequestManager.ResponseStream.class); Mockito.when(peer.getConnection()).thenReturn(peerConnection); - Mockito.when(peer.send(requestMessageData, subprotocol, peerConnection)) + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(peer.send(requestMessageData, "subprotocol", peerConnection)) .thenReturn(responseStream); CompletableFuture actualResponseMessageDataFuture =