Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311-add-GetReceiptsFromPeerTask

pull/7638/head
Matilda Clerke 2 months ago
commit 09ee1c8f74
  1. 7
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java
  2. 33
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  3. 1
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java
  4. 43
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java

@ -73,4 +73,11 @@ public interface PeerTask<T> {
* @return a Predicate that checks if an EthPeer is suitable for this PeerTask * @return a Predicate that checks if an EthPeer is suitable for this PeerTask
*/ */
Predicate<EthPeer> getPeerRequirementFilter(); Predicate<EthPeer> getPeerRequirementFilter();
/**
* Checks if the supplied result is considered a partial success
*
* @return true if the supplied result is considered a partial success
*/
boolean isPartialSuccessTest(T result);
} }

@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNot
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
@ -35,6 +36,9 @@ public class PeerTaskExecutor {
private final PeerTaskRequestSender requestSender; private final PeerTaskRequestSender requestSender;
private final LabelledMetric<OperationTimer> requestTimer; private final LabelledMetric<OperationTimer> requestTimer;
private final LabelledMetric<Counter> timeoutCounter;
private final LabelledMetric<Counter> invalidResponseCounter;
private final LabelledMetric<Counter> internalExceptionCounter;
public PeerTaskExecutor( public PeerTaskExecutor(
final PeerSelector peerSelector, final PeerSelector peerSelector,
@ -48,6 +52,24 @@ public class PeerTaskExecutor {
"PeerTaskExecutor:RequestTime", "PeerTaskExecutor:RequestTime",
"Time taken to send a request and receive a response", "Time taken to send a request and receive a response",
"className"); "className");
timeoutCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"PeerTaskExecutor:TimeoutCounter",
"Counter of the number of timeouts occurred",
"className");
invalidResponseCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"PeerTaskExecutor:InvalidResponseCounter",
"Counter of the number of invalid responses received",
"className");
internalExceptionCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"PeerTaskExecutor:InternalExceptionCounter",
"Counter of the number of internal exceptions occurred",
"className");
} }
public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) { public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
@ -80,6 +102,7 @@ public class PeerTaskExecutor {
PeerTaskExecutorResult<T> executorResult; PeerTaskExecutorResult<T> executorResult;
int retriesRemaining = peerTask.getRetriesWithSamePeer(); int retriesRemaining = peerTask.getRetriesWithSamePeer();
do { do {
try { try {
T result; T result;
try (final OperationTimer.TimingContext ignored = try (final OperationTimer.TimingContext ignored =
@ -89,10 +112,17 @@ public class PeerTaskExecutor {
result = peerTask.parseResponse(responseMessageData); result = peerTask.parseResponse(responseMessageData);
} }
if (peerTask.isPartialSuccessTest(result)) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.PARTIAL_SUCCESS);
} else {
peer.recordUsefulResponse(); peer.recordUsefulResponse();
executorResult = executorResult =
new PeerTaskExecutorResult<>( new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS); Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS);
}
} catch (PeerNotConnected e) { } catch (PeerNotConnected e) {
executorResult = executorResult =
@ -101,16 +131,19 @@ public class PeerTaskExecutor {
} catch (InterruptedException | TimeoutException e) { } catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode()); peer.recordRequestTimeout(requestMessageData.getCode());
timeoutCounter.labels(peerTask.getClass().getSimpleName()).inc();
executorResult = executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT); new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT);
} catch (InvalidPeerTaskResponseException e) { } catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage()); peer.recordUselessResponse(e.getMessage());
invalidResponseCounter.labels(peerTask.getClass().getSimpleName()).inc();
executorResult = executorResult =
new PeerTaskExecutorResult<>( new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE); Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
} catch (ExecutionException e) { } catch (ExecutionException e) {
internalExceptionCounter.labels(peerTask.getClass().getSimpleName()).inc();
executorResult = executorResult =
new PeerTaskExecutorResult<>( new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);

@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask;
public enum PeerTaskExecutorResponseCode { public enum PeerTaskExecutorResponseCode {
SUCCESS, SUCCESS,
PARTIAL_SUCCESS,
NO_PEER_AVAILABLE, NO_PEER_AVAILABLE,
PEER_DISCONNECTED, PEER_DISCONNECTED,
INTERNAL_SERVER_ERROR, INTERNAL_SERVER_ERROR,

@ -57,7 +57,7 @@ public class PeerTaskExecutorTest {
} }
@Test @Test
public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndSuccessfulFlow() public void testExecuteAgainstPeerWithNoRetriesAndSuccessfulFlow()
throws PeerConnection.PeerNotConnected, throws PeerConnection.PeerNotConnected,
ExecutionException, ExecutionException,
InterruptedException, InterruptedException,
@ -73,6 +73,7 @@ public class PeerTaskExecutorTest {
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData); .thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(false);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
@ -85,7 +86,34 @@ public class PeerTaskExecutorTest {
} }
@Test @Test
public void testExecuteAgainstPeerWithRetryBehaviorsAndSuccessfulFlowAfterFirstFailure() public void testExecuteAgainstPeerWithNoRetriesAndPartialSuccessfulFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(true);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertSame(responseObject, result.result().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.PARTIAL_SUCCESS, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithRetriesAndSuccessfulFlowAfterFirstFailure()
throws PeerConnection.PeerNotConnected, throws PeerConnection.PeerNotConnected,
ExecutionException, ExecutionException,
InterruptedException, InterruptedException,
@ -104,6 +132,7 @@ public class PeerTaskExecutorTest {
.thenReturn(responseMessageData); .thenReturn(responseMessageData);
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(false);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
@ -117,7 +146,7 @@ public class PeerTaskExecutorTest {
} }
@Test @Test
public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndPeerNotConnected() public void testExecuteAgainstPeerWithNoRetriesAndPeerNotConnected()
throws PeerConnection.PeerNotConnected, throws PeerConnection.PeerNotConnected,
ExecutionException, ExecutionException,
InterruptedException, InterruptedException,
@ -138,7 +167,7 @@ public class PeerTaskExecutorTest {
} }
@Test @Test
public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndTimeoutException() public void testExecuteAgainstPeerWithNoRetriesAndTimeoutException()
throws PeerConnection.PeerNotConnected, throws PeerConnection.PeerNotConnected,
ExecutionException, ExecutionException,
InterruptedException, InterruptedException,
@ -163,7 +192,7 @@ public class PeerTaskExecutorTest {
} }
@Test @Test
public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndInvalidResponseMessage() public void testExecuteAgainstPeerWithNoRetriesAndInvalidResponseMessage()
throws PeerConnection.PeerNotConnected, throws PeerConnection.PeerNotConnected,
ExecutionException, ExecutionException,
InterruptedException, InterruptedException,
@ -190,7 +219,7 @@ public class PeerTaskExecutorTest {
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testExecuteWithNoPeerTaskBehaviorsAndSuccessFlow() public void testExecuteWithNoRetriesAndSuccessFlow()
throws PeerConnection.PeerNotConnected, throws PeerConnection.PeerNotConnected,
ExecutionException, ExecutionException,
InterruptedException, InterruptedException,
@ -209,6 +238,7 @@ public class PeerTaskExecutorTest {
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData); .thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(false);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
@ -246,6 +276,7 @@ public class PeerTaskExecutorTest {
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, peer2)) Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, peer2))
.thenReturn(responseMessageData); .thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isPartialSuccessTest(responseObject)).thenReturn(false);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.execute(peerTask); PeerTaskExecutorResult<Object> result = peerTaskExecutor.execute(peerTask);

Loading…
Cancel
Save