7311: add peertask foundation code (#7628)

* 7311: Add PeerTask system for use in future PRs

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
pull/7782/head
Matilda-Clerke 1 month ago committed by GitHub
parent dfbfb96f28
commit 2169985ee2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  3. 26
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/InvalidPeerTaskResponseException.java
  4. 17
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/NoAvailablePeerException.java
  5. 42
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java
  6. 83
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java
  7. 196
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java
  8. 24
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java
  9. 20
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java
  10. 56
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java
  11. 290
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java
  12. 75
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java
  13. 9
      metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java
  14. 6
      metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java
  15. 8
      metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java
  16. 18
      metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java
  17. 2
      plugin-api/build.gradle
  18. 11
      plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java

@ -1,6 +1,7 @@
# Changelog # Changelog
## [Unreleased] ## [Unreleased]
- Added isLabelsObserved to LabelledGauge in plugin-api. Default implementation returns false.
### Breaking Changes ### Breaking Changes

@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.manager;
import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.SnapProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback; import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker;
import org.hyperledger.besu.ethereum.eth.sync.SnapServerChecker; import org.hyperledger.besu.ethereum.eth.sync.SnapServerChecker;
@ -26,6 +27,7 @@ import org.hyperledger.besu.ethereum.forkid.ForkId;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent; import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
@ -61,7 +63,7 @@ import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class EthPeers { public class EthPeers implements PeerSelector {
private static final Logger LOG = LoggerFactory.getLogger(EthPeers.class); private static final Logger LOG = LoggerFactory.getLogger(EthPeers.class);
public static final Comparator<EthPeer> TOTAL_DIFFICULTY = public static final Comparator<EthPeer> TOTAL_DIFFICULTY =
Comparator.comparing((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty()); Comparator.comparing((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty());
@ -465,6 +467,22 @@ public class EthPeers {
this.trailingPeerRequirementsSupplier = tprSupplier; this.trailingPeerRequirementsSupplier = tprSupplier;
} }
// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeer(final Predicate<EthPeer> filter) {
return streamAvailablePeers()
.filter(filter)
.filter(EthPeer::hasAvailableRequestCapacity)
.filter(EthPeer::isFullyValidated)
.min(LEAST_TO_MOST_BUSY);
}
// Part of the PeerSelector interface, to be split apart later
@Override
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {
return Optional.ofNullable(activeConnections.get(peerId.getId()));
}
@FunctionalInterface @FunctionalInterface
public interface ConnectCallback { public interface ConnectCallback {
void onPeerConnected(EthPeer newPeer); void onPeerConnected(EthPeer newPeer);

@ -0,0 +1,26 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public class InvalidPeerTaskResponseException extends Exception {
public InvalidPeerTaskResponseException() {
super();
}
public InvalidPeerTaskResponseException(final Throwable cause) {
super(cause);
}
}

@ -0,0 +1,17 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public class NoAvailablePeerException extends Exception {}

@ -0,0 +1,42 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import java.util.Optional;
import java.util.function.Predicate;
/** Selects the EthPeers for the PeerTaskExecutor */
public interface PeerSelector {
/**
* Gets a peer matching the supplied filter
*
* @param filter a Predicate\<EthPeer\> matching desirable peers
* @return a peer matching the supplied conditions
*/
Optional<EthPeer> getPeer(final Predicate<EthPeer> filter);
/**
* Attempts to get the EthPeer identified by peerId
*
* @param peerId the peerId of the desired EthPeer
* @return An Optional\<EthPeer\> containing the EthPeer identified by peerId if present in the
* PeerSelector, or empty otherwise
*/
Optional<EthPeer> getPeerByPeerId(PeerId peerId);
}

@ -0,0 +1,83 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.function.Predicate;
/**
* Represents a task to be executed on an EthPeer by the PeerTaskExecutor
*
* @param <T> The type of the result of this PeerTask
*/
public interface PeerTask<T> {
/**
* Returns the SubProtocol used for this PeerTask
*
* @return the SubProtocol used for this PeerTask
*/
SubProtocol getSubProtocol();
/**
* Gets the request data to send to the EthPeer
*
* @return the request data to send to the EthPeer
*/
MessageData getRequestMessage();
/**
* Parses the MessageData response from the EthPeer
*
* @param messageData the response MessageData to be parsed
* @return a T built from the response MessageData
* @throws InvalidPeerTaskResponseException if the response messageData is invalid
*/
T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException;
/**
* Gets the number of times this task may be attempted against other peers
*
* @return the number of times this task may be attempted against other peers
*/
default int getRetriesWithOtherPeer() {
return 5;
}
/**
* Gets the number of times this task may be attempted against the same peer
*
* @return the number of times this task may be attempted against the same peer
*/
default int getRetriesWithSamePeer() {
return 5;
}
/**
* Gets a Predicate that checks if an EthPeer is suitable for this PeerTask
*
* @return a Predicate that checks if an EthPeer is suitable for this PeerTask
*/
Predicate<EthPeer> getPeerRequirementFilter();
/**
* Checks if the supplied result is considered a success
*
* @return true if the supplied result is considered a success
*/
boolean isSuccess(T result);
}

@ -0,0 +1,196 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledGauge;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */
public class PeerTaskExecutor {
private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender;
private final LabelledMetric<OperationTimer> requestTimer;
private final LabelledMetric<Counter> timeoutCounter;
private final LabelledMetric<Counter> invalidResponseCounter;
private final LabelledMetric<Counter> internalExceptionCounter;
private final LabelledGauge inflightRequestGauge;
private final Map<String, AtomicInteger> inflightRequestCountByClassName;
public PeerTaskExecutor(
final PeerSelector peerSelector,
final PeerTaskRequestSender requestSender,
final MetricsSystem metricsSystem) {
this.peerSelector = peerSelector;
this.requestSender = requestSender;
requestTimer =
metricsSystem.createLabelledTimer(
BesuMetricCategory.PEERS,
"request_time",
"Time taken to send a request and receive a response",
"taskName");
timeoutCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"timeout_total",
"Counter of the number of timeouts occurred",
"taskName");
invalidResponseCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"invalid_response_total",
"Counter of the number of invalid responses received",
"taskName");
internalExceptionCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.PEERS,
"internal_exception_total",
"Counter of the number of internal exceptions occurred",
"taskName");
inflightRequestGauge =
metricsSystem.createLabelledGauge(
BesuMetricCategory.PEERS,
"inflight_request_gauge",
"Gauge of the number of inflight requests",
"taskName");
inflightRequestCountByClassName = new ConcurrentHashMap<>();
}
public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
PeerTaskExecutorResult<T> executorResult;
int retriesRemaining = peerTask.getRetriesWithOtherPeer();
final Collection<EthPeer> usedEthPeers = new HashSet<>();
do {
Optional<EthPeer> peer =
peerSelector.getPeer(
(candidatePeer) ->
peerTask.getPeerRequirementFilter().test(candidatePeer)
&& !usedEthPeers.contains(candidatePeer));
if (peer.isEmpty()) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
continue;
}
usedEthPeers.add(peer.get());
executorResult = executeAgainstPeer(peerTask, peer.get());
} while (retriesRemaining-- > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS);
return executorResult;
}
public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
final PeerTask<T> peerTask, final EthPeer peer) {
String taskClassName = peerTask.getClass().getSimpleName();
AtomicInteger inflightRequestCountForThisTaskClass =
inflightRequestCountByClassName.computeIfAbsent(
taskClassName,
(k) -> {
AtomicInteger inflightRequests = new AtomicInteger(0);
inflightRequestGauge.labels(inflightRequests::get, taskClassName);
return inflightRequests;
});
MessageData requestMessageData = peerTask.getRequestMessage();
PeerTaskExecutorResult<T> executorResult;
int retriesRemaining = peerTask.getRetriesWithSamePeer();
do {
try {
T result;
try (final OperationTimer.TimingContext ignored =
requestTimer.labels(taskClassName).startTimer()) {
inflightRequestCountForThisTaskClass.incrementAndGet();
MessageData responseMessageData =
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);
result = peerTask.parseResponse(responseMessageData);
} finally {
inflightRequestCountForThisTaskClass.decrementAndGet();
}
if (peerTask.isSuccess(result)) {
peer.recordUsefulResponse();
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS);
} else {
// At this point, the result is most likely empty. Technically, this is a valid result, so
// we don't penalise the peer, but it's also a useless result, so we return
// INVALID_RESPONSE code
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
}
} catch (PeerNotConnected e) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED);
} catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode());
timeoutCounter.labels(taskClassName).inc();
executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT);
} catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage());
invalidResponseCounter.labels(taskClassName).inc();
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
} catch (ExecutionException e) {
internalExceptionCounter.labels(taskClassName).inc();
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);
}
} while (retriesRemaining-- > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED
&& sleepBetweenRetries());
return executorResult;
}
private boolean sleepBetweenRetries() {
try {
// sleep for 1 second to match implemented wait between retries in AbstractRetryingPeerTask
Thread.sleep(1000);
return true;
} catch (InterruptedException e) {
return false;
}
}
}

@ -0,0 +1,24 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
public enum PeerTaskExecutorResponseCode {
SUCCESS,
NO_PEER_AVAILABLE,
PEER_DISCONNECTED,
INTERNAL_SERVER_ERROR,
TIMEOUT,
INVALID_RESPONSE
}

@ -0,0 +1,20 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import java.util.Optional;
public record PeerTaskExecutorResult<T>(
Optional<T> result, PeerTaskExecutorResponseCode responseCode) {}

@ -0,0 +1,56 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager.ResponseStream;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PeerTaskRequestSender {
private static final long DEFAULT_TIMEOUT_MS = 5_000;
private final long timeoutMs;
public PeerTaskRequestSender() {
this.timeoutMs = DEFAULT_TIMEOUT_MS;
}
public PeerTaskRequestSender(final long timeoutMs) {
this.timeoutMs = timeoutMs;
}
public MessageData sendRequest(
final SubProtocol subProtocol, final MessageData requestMessageData, final EthPeer ethPeer)
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException {
ResponseStream responseStream =
ethPeer.send(requestMessageData, subProtocol.getName(), ethPeer.getConnection());
final CompletableFuture<MessageData> responseMessageDataFuture = new CompletableFuture<>();
responseStream.then(
(boolean streamClosed, MessageData message, EthPeer peer) -> {
responseMessageDataFuture.complete(message);
});
return responseMessageDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
}
}

@ -0,0 +1,290 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
public class PeerTaskExecutorTest {
private @Mock PeerSelector peerSelector;
private @Mock PeerTaskRequestSender requestSender;
private @Mock PeerTask<Object> peerTask;
private @Mock SubProtocol subprotocol;
private @Mock MessageData requestMessageData;
private @Mock MessageData responseMessageData;
private @Mock EthPeer ethPeer;
private AutoCloseable mockCloser;
private PeerTaskExecutor peerTaskExecutor;
@BeforeEach
public void beforeTest() {
mockCloser = MockitoAnnotations.openMocks(this);
peerTaskExecutor = new PeerTaskExecutor(peerSelector, requestSender, new NoOpMetricsSystem());
}
@AfterEach
public void afterTest() throws Exception {
mockCloser.close();
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndSuccessfulFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertSame(responseObject, result.result().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndPartialSuccessfulFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(false);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertEquals(PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithRetriesAndSuccessfulFlowAfterFirstFailure()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
int requestMessageDataCode = 123;
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(2);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException())
.thenReturn(responseMessageData);
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode);
Mockito.verify(ethPeer).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertSame(responseObject, result.result().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndPeerNotConnected()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException {
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new PeerConnection.PeerNotConnected(""));
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isEmpty());
Assertions.assertEquals(PeerTaskExecutorResponseCode.PEER_DISCONNECTED, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndTimeoutException()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException {
int requestMessageDataCode = 123;
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException());
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isEmpty());
Assertions.assertEquals(PeerTaskExecutorResponseCode.TIMEOUT, result.responseCode());
}
@Test
public void testExecuteAgainstPeerWithNoRetriesAndInvalidResponseMessage()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData))
.thenThrow(new InvalidPeerTaskResponseException());
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordUselessResponse(null);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isEmpty());
Assertions.assertEquals(PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode());
}
@Test
@SuppressWarnings("unchecked")
public void testExecuteWithNoRetriesAndSuccessFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class)))
.thenReturn(Optional.of(ethPeer));
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(0);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer);
Mockito.verify(ethPeer).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertSame(responseObject, result.result().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode());
}
@Test
@SuppressWarnings("unchecked")
public void testExecuteWithPeerSwitchingAndSuccessFlow()
throws PeerConnection.PeerNotConnected,
ExecutionException,
InterruptedException,
TimeoutException,
InvalidPeerTaskResponseException {
Object responseObject = new Object();
int requestMessageDataCode = 123;
EthPeer peer2 = Mockito.mock(EthPeer.class);
Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class)))
.thenReturn(Optional.of(ethPeer))
.thenReturn(Optional.of(peer2));
Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData);
Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(2);
Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0);
Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer))
.thenThrow(new TimeoutException());
Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode);
Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, peer2))
.thenReturn(responseMessageData);
Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject);
Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true);
PeerTaskExecutorResult<Object> result = peerTaskExecutor.execute(peerTask);
Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode);
Mockito.verify(peer2).recordUsefulResponse();
Assertions.assertNotNull(result);
Assertions.assertTrue(result.result().isPresent());
Assertions.assertSame(responseObject, result.result().get());
Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode());
}
}

@ -0,0 +1,75 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.RequestManager;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
public class PeerTaskRequestSenderTest {
private PeerTaskRequestSender peerTaskRequestSender;
@BeforeEach
public void beforeTest() {
peerTaskRequestSender = new PeerTaskRequestSender();
}
@Test
public void testSendRequest()
throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException {
SubProtocol subprotocol = Mockito.mock(SubProtocol.class);
MessageData requestMessageData = Mockito.mock(MessageData.class);
MessageData responseMessageData = Mockito.mock(MessageData.class);
EthPeer peer = Mockito.mock(EthPeer.class);
PeerConnection peerConnection = Mockito.mock(PeerConnection.class);
RequestManager.ResponseStream responseStream =
Mockito.mock(RequestManager.ResponseStream.class);
Mockito.when(peer.getConnection()).thenReturn(peerConnection);
Mockito.when(subprotocol.getName()).thenReturn("subprotocol");
Mockito.when(peer.send(requestMessageData, "subprotocol", peerConnection))
.thenReturn(responseStream);
CompletableFuture<MessageData> actualResponseMessageDataFuture =
CompletableFuture.supplyAsync(
() -> {
try {
return peerTaskRequestSender.sendRequest(subprotocol, requestMessageData, peer);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Thread.sleep(500);
ArgumentCaptor<RequestManager.ResponseCallback> responseCallbackArgumentCaptor =
ArgumentCaptor.forClass(RequestManager.ResponseCallback.class);
Mockito.verify(responseStream).then(responseCallbackArgumentCaptor.capture());
RequestManager.ResponseCallback responseCallback = responseCallbackArgumentCaptor.getValue();
responseCallback.exec(false, responseMessageData, peer);
Assertions.assertSame(responseMessageData, actualResponseMessageDataFuture.get());
}
}

@ -253,5 +253,14 @@ public class NoOpMetricsSystem implements ObservableMetricsSystem {
"The count of labels used must match the count of labels expected."); "The count of labels used must match the count of labels expected.");
Preconditions.checkNotNull(valueSupplier, "No valueSupplier specified"); Preconditions.checkNotNull(valueSupplier, "No valueSupplier specified");
} }
@Override
public boolean isLabelsObserved(final String... labelValues) {
Preconditions.checkArgument(
labelValues.length == labelCount,
"The count of labels used must match the count of labels expected.");
final String labelValuesString = String.join(",", labelValues);
return labelValuesCache.contains(labelValuesString);
}
} }
} }

@ -36,4 +36,10 @@ public class NoOpValueCollector implements LabelledGauge {
} }
labelValuesCreated.add(labelValuesString); labelValuesCreated.add(labelValuesString);
} }
@Override
public boolean isLabelsObserved(final String... labelValues) {
final String labelValuesString = String.join(",", labelValues);
return labelValuesCreated.contains(labelValuesString);
}
} }

@ -63,6 +63,14 @@ public class OpenTelemetryGauge implements LabelledGauge {
} }
} }
@Override
public boolean isLabelsObserved(final String... labelValues) {
Preconditions.checkArgument(
labelValues.length == labelNames.size(),
"label values and label names need the same number of elements");
return observationsMap.containsKey(getLabels(labelValues));
}
private Attributes getLabels(final String... labelValues) { private Attributes getLabels(final String... labelValues) {
final AttributesBuilder labelsBuilder = Attributes.builder(); final AttributesBuilder labelsBuilder = Attributes.builder();
for (int i = 0; i < labelNames.size(); i++) { for (int i = 0; i < labelNames.size(); i++) {

@ -47,10 +47,7 @@ public class PrometheusGauge extends Collector implements LabelledGauge {
@Override @Override
public synchronized void labels(final DoubleSupplier valueSupplier, final String... labelValues) { public synchronized void labels(final DoubleSupplier valueSupplier, final String... labelValues) {
if (labelValues.length != labelNames.size()) { validateLabelsCardinality(labelValues);
throw new IllegalArgumentException(
"Label values and label names must be the same cardinality");
}
if (observationsMap.putIfAbsent(List.of(labelValues), valueSupplier) != null) { if (observationsMap.putIfAbsent(List.of(labelValues), valueSupplier) != null) {
final String labelValuesString = String.join(",", labelValues); final String labelValuesString = String.join(",", labelValues);
throw new IllegalArgumentException( throw new IllegalArgumentException(
@ -58,6 +55,12 @@ public class PrometheusGauge extends Collector implements LabelledGauge {
} }
} }
@Override
public boolean isLabelsObserved(final String... labelValues) {
validateLabelsCardinality(labelValues);
return observationsMap.containsKey(List.of(labelValues));
}
@Override @Override
public List<MetricFamilySamples> collect() { public List<MetricFamilySamples> collect() {
final List<MetricFamilySamples.Sample> samples = new ArrayList<>(); final List<MetricFamilySamples.Sample> samples = new ArrayList<>();
@ -68,4 +71,11 @@ public class PrometheusGauge extends Collector implements LabelledGauge {
metricName, labelNames, labels, valueSupplier.getAsDouble()))); metricName, labelNames, labels, valueSupplier.getAsDouble())));
return List.of(new MetricFamilySamples(metricName, Type.GAUGE, help, samples)); return List.of(new MetricFamilySamples(metricName, Type.GAUGE, help, samples));
} }
private void validateLabelsCardinality(final String... labelValues) {
if (labelValues.length != labelNames.size()) {
throw new IllegalArgumentException(
"Label values and label names must be the same cardinality");
}
}
} }

@ -71,7 +71,7 @@ Calculated : ${currentHash}
tasks.register('checkAPIChanges', FileStateChecker) { tasks.register('checkAPIChanges', FileStateChecker) {
description = "Checks that the API for the Plugin-API project does not change without deliberate thought" description = "Checks that the API for the Plugin-API project does not change without deliberate thought"
files = sourceSets.main.allJava.files files = sourceSets.main.allJava.files
knownHash = '4jVaj9yW88nHbX0KmTR3dPQRvj9x8Pvh5E9Ry7KRT6w=' knownHash = 'WRdnBaP05fItpWHYSFz/vBBlRWL3sLGqzR3tzd+pOkA='
} }
check.dependsOn('checkAPIChanges') check.dependsOn('checkAPIChanges')

@ -25,4 +25,15 @@ public interface LabelledGauge {
* @param labelValues the label values * @param labelValues the label values
*/ */
void labels(final DoubleSupplier valueSupplier, final String... labelValues); void labels(final DoubleSupplier valueSupplier, final String... labelValues);
/**
* Checks whether the supplied labelValues are already observed by this LabelledGauge
*
* @param labelValues The labelValues to check
* @return true if the supplied labelValues are already observed by this LabelledGauge, false
* otherwise
*/
default boolean isLabelsObserved(final String... labelValues) {
return false;
}
} }

Loading…
Cancel
Save