diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java index 1d6464ac26..57976ac2e9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java @@ -73,4 +73,11 @@ public interface PeerTask { * @return a Predicate that checks if an EthPeer is suitable for this PeerTask */ Predicate getPeerRequirementFilter(); + + /** + * Checks if the supplied result is considered a partial success + * + * @return true if the supplied result is considered a partial success + */ + boolean isPartialSuccessTest(T result); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index 7b485ee6c3..5d1883c7d8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNot import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.Counter; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.plugin.services.metrics.OperationTimer; @@ -35,6 +36,9 @@ public class PeerTaskExecutor { private final PeerTaskRequestSender requestSender; private final LabelledMetric requestTimer; + private final LabelledMetric timeoutCounter; + private final LabelledMetric invalidResponseCounter; + private final LabelledMetric internalExceptionCounter; public PeerTaskExecutor( final PeerSelector peerSelector, @@ -48,6 +52,24 @@ public class PeerTaskExecutor { "PeerTaskExecutor:RequestTime", "Time taken to send a request and receive a response", "className"); + timeoutCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.PEERS, + "PeerTaskExecutor:TimeoutCounter", + "Counter of the number of timeouts occurred", + "className"); + invalidResponseCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.PEERS, + "PeerTaskExecutor:InvalidResponseCounter", + "Counter of the number of invalid responses received", + "className"); + internalExceptionCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.PEERS, + "PeerTaskExecutor:InternalExceptionCounter", + "Counter of the number of internal exceptions occurred", + "className"); } public PeerTaskExecutorResult execute(final PeerTask peerTask) { @@ -80,6 +102,7 @@ public class PeerTaskExecutor { PeerTaskExecutorResult executorResult; int retriesRemaining = peerTask.getRetriesWithSamePeer(); do { + try { T result; try (final OperationTimer.TimingContext ignored = @@ -89,10 +112,17 @@ public class PeerTaskExecutor { result = peerTask.parseResponse(responseMessageData); } - peer.recordUsefulResponse(); - executorResult = - new PeerTaskExecutorResult<>( - Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS); + + if (peerTask.isPartialSuccessTest(result)) { + executorResult = + new PeerTaskExecutorResult<>( + Optional.ofNullable(result), PeerTaskExecutorResponseCode.PARTIAL_SUCCESS); + } else { + peer.recordUsefulResponse(); + executorResult = + new PeerTaskExecutorResult<>( + Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS); + } } catch (PeerNotConnected e) { executorResult = @@ -101,16 +131,19 @@ public class PeerTaskExecutor { } catch (InterruptedException | TimeoutException e) { peer.recordRequestTimeout(requestMessageData.getCode()); + timeoutCounter.labels(peerTask.getClass().getSimpleName()).inc(); executorResult = new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT); } catch (InvalidPeerTaskResponseException e) { peer.recordUselessResponse(e.getMessage()); + invalidResponseCounter.labels(peerTask.getClass().getSimpleName()).inc(); executorResult = new PeerTaskExecutorResult<>( Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE); } catch (ExecutionException e) { + internalExceptionCounter.labels(peerTask.getClass().getSimpleName()).inc(); executorResult = new PeerTaskExecutorResult<>( Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java index 327461de15..123c3267c0 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java @@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; public enum PeerTaskExecutorResponseCode { SUCCESS, + PARTIAL_SUCCESS, NO_PEER_AVAILABLE, PEER_DISCONNECTED, INTERNAL_SERVER_ERROR, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java index 0015c1ffbc..a4e78056e1 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -57,7 +57,7 @@ public class PeerTaskExecutorTest { } @Test - public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndSuccessfulFlow() + public void testExecuteAgainstPeerWithNoRetriesAndSuccessfulFlow() throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException, @@ -73,6 +73,7 @@ public class PeerTaskExecutorTest { Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenReturn(responseMessageData); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(false); PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); @@ -85,7 +86,34 @@ public class PeerTaskExecutorTest { } @Test - public void testExecuteAgainstPeerWithRetryBehaviorsAndSuccessfulFlowAfterFirstFailure() + public void testExecuteAgainstPeerWithNoRetriesAndPartialSuccessfulFlow() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + + Object responseObject = new Object(); + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(true); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.result().isPresent()); + Assertions.assertSame(responseObject, result.result().get()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.PARTIAL_SUCCESS, result.responseCode()); + } + + @Test + public void testExecuteAgainstPeerWithRetriesAndSuccessfulFlowAfterFirstFailure() throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException, @@ -104,6 +132,7 @@ public class PeerTaskExecutorTest { .thenReturn(responseMessageData); Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(false); PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); @@ -117,7 +146,7 @@ public class PeerTaskExecutorTest { } @Test - public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndPeerNotConnected() + public void testExecuteAgainstPeerWithNoRetriesAndPeerNotConnected() throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException, @@ -138,7 +167,7 @@ public class PeerTaskExecutorTest { } @Test - public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndTimeoutException() + public void testExecuteAgainstPeerWithNoRetriesAndTimeoutException() throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException, @@ -163,7 +192,7 @@ public class PeerTaskExecutorTest { } @Test - public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndInvalidResponseMessage() + public void testExecuteAgainstPeerWithNoRetriesAndInvalidResponseMessage() throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException, @@ -190,7 +219,7 @@ public class PeerTaskExecutorTest { @Test @SuppressWarnings("unchecked") - public void testExecuteWithNoPeerTaskBehaviorsAndSuccessFlow() + public void testExecuteWithNoRetriesAndSuccessFlow() throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException, @@ -209,6 +238,7 @@ public class PeerTaskExecutorTest { Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenReturn(responseMessageData); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(false); PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); @@ -246,6 +276,7 @@ public class PeerTaskExecutorTest { Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, peer2)) .thenReturn(responseMessageData); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(false); PeerTaskExecutorResult result = peerTaskExecutor.execute(peerTask);