diff --git a/CHANGELOG.md b/CHANGELOG.md index b5dcccd7d3..c581fbd034 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## [Unreleased] +- Added isLabelsObserved to LabelledGauge in plugin-api. Default implementation returns false. ### Breaking Changes diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index d1a54d4d3d..d19c7dfca2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -17,6 +17,7 @@ package org.hyperledger.besu.ethereum.eth.manager; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerSelector; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.sync.ChainHeadTracker; import org.hyperledger.besu.ethereum.eth.sync.SnapServerChecker; @@ -26,6 +27,7 @@ import org.hyperledger.besu.ethereum.forkid.ForkId; import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.peers.Peer; +import org.hyperledger.besu.ethereum.p2p.peers.PeerId; import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; @@ -61,7 +63,7 @@ import org.apache.tuweni.bytes.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class EthPeers { +public class EthPeers implements PeerSelector { private static final Logger LOG = LoggerFactory.getLogger(EthPeers.class); public static final Comparator TOTAL_DIFFICULTY = Comparator.comparing((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty()); @@ -465,6 +467,22 @@ public class EthPeers { this.trailingPeerRequirementsSupplier = tprSupplier; } + // Part of the PeerSelector interface, to be split apart later + @Override + public Optional getPeer(final Predicate filter) { + return streamAvailablePeers() + .filter(filter) + .filter(EthPeer::hasAvailableRequestCapacity) + .filter(EthPeer::isFullyValidated) + .min(LEAST_TO_MOST_BUSY); + } + + // Part of the PeerSelector interface, to be split apart later + @Override + public Optional getPeerByPeerId(final PeerId peerId) { + return Optional.ofNullable(activeConnections.get(peerId.getId())); + } + @FunctionalInterface public interface ConnectCallback { void onPeerConnected(EthPeer newPeer); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/InvalidPeerTaskResponseException.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/InvalidPeerTaskResponseException.java new file mode 100644 index 0000000000..824c0860d7 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/InvalidPeerTaskResponseException.java @@ -0,0 +1,26 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +public class InvalidPeerTaskResponseException extends Exception { + + public InvalidPeerTaskResponseException() { + super(); + } + + public InvalidPeerTaskResponseException(final Throwable cause) { + super(cause); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/NoAvailablePeerException.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/NoAvailablePeerException.java new file mode 100644 index 0000000000..40c600d6fb --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/NoAvailablePeerException.java @@ -0,0 +1,17 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +public class NoAvailablePeerException extends Exception {} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java new file mode 100644 index 0000000000..8f7ab33e42 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java @@ -0,0 +1,42 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.p2p.peers.PeerId; + +import java.util.Optional; +import java.util.function.Predicate; + +/** Selects the EthPeers for the PeerTaskExecutor */ +public interface PeerSelector { + + /** + * Gets a peer matching the supplied filter + * + * @param filter a Predicate\ matching desirable peers + * @return a peer matching the supplied conditions + */ + Optional getPeer(final Predicate filter); + + /** + * Attempts to get the EthPeer identified by peerId + * + * @param peerId the peerId of the desired EthPeer + * @return An Optional\ containing the EthPeer identified by peerId if present in the + * PeerSelector, or empty otherwise + */ + Optional getPeerByPeerId(PeerId peerId); +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java new file mode 100644 index 0000000000..1243846ac3 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java @@ -0,0 +1,83 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; + +import java.util.function.Predicate; + +/** + * Represents a task to be executed on an EthPeer by the PeerTaskExecutor + * + * @param The type of the result of this PeerTask + */ +public interface PeerTask { + /** + * Returns the SubProtocol used for this PeerTask + * + * @return the SubProtocol used for this PeerTask + */ + SubProtocol getSubProtocol(); + + /** + * Gets the request data to send to the EthPeer + * + * @return the request data to send to the EthPeer + */ + MessageData getRequestMessage(); + + /** + * Parses the MessageData response from the EthPeer + * + * @param messageData the response MessageData to be parsed + * @return a T built from the response MessageData + * @throws InvalidPeerTaskResponseException if the response messageData is invalid + */ + T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException; + + /** + * Gets the number of times this task may be attempted against other peers + * + * @return the number of times this task may be attempted against other peers + */ + default int getRetriesWithOtherPeer() { + return 5; + } + + /** + * Gets the number of times this task may be attempted against the same peer + * + * @return the number of times this task may be attempted against the same peer + */ + default int getRetriesWithSamePeer() { + return 5; + } + + /** + * Gets a Predicate that checks if an EthPeer is suitable for this PeerTask + * + * @return a Predicate that checks if an EthPeer is suitable for this PeerTask + */ + Predicate getPeerRequirementFilter(); + + /** + * Checks if the supplied result is considered a success + * + * @return true if the supplied result is considered a success + */ + boolean isSuccess(T result); +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java new file mode 100644 index 0000000000..984cedccec --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -0,0 +1,196 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.Counter; +import org.hyperledger.besu.plugin.services.metrics.LabelledGauge; +import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; +import org.hyperledger.besu.plugin.services.metrics.OperationTimer; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +/** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */ +public class PeerTaskExecutor { + + private final PeerSelector peerSelector; + private final PeerTaskRequestSender requestSender; + + private final LabelledMetric requestTimer; + private final LabelledMetric timeoutCounter; + private final LabelledMetric invalidResponseCounter; + private final LabelledMetric internalExceptionCounter; + private final LabelledGauge inflightRequestGauge; + private final Map inflightRequestCountByClassName; + + public PeerTaskExecutor( + final PeerSelector peerSelector, + final PeerTaskRequestSender requestSender, + final MetricsSystem metricsSystem) { + this.peerSelector = peerSelector; + this.requestSender = requestSender; + requestTimer = + metricsSystem.createLabelledTimer( + BesuMetricCategory.PEERS, + "request_time", + "Time taken to send a request and receive a response", + "taskName"); + timeoutCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.PEERS, + "timeout_total", + "Counter of the number of timeouts occurred", + "taskName"); + invalidResponseCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.PEERS, + "invalid_response_total", + "Counter of the number of invalid responses received", + "taskName"); + internalExceptionCounter = + metricsSystem.createLabelledCounter( + BesuMetricCategory.PEERS, + "internal_exception_total", + "Counter of the number of internal exceptions occurred", + "taskName"); + inflightRequestGauge = + metricsSystem.createLabelledGauge( + BesuMetricCategory.PEERS, + "inflight_request_gauge", + "Gauge of the number of inflight requests", + "taskName"); + inflightRequestCountByClassName = new ConcurrentHashMap<>(); + } + + public PeerTaskExecutorResult execute(final PeerTask peerTask) { + PeerTaskExecutorResult executorResult; + int retriesRemaining = peerTask.getRetriesWithOtherPeer(); + final Collection usedEthPeers = new HashSet<>(); + do { + Optional peer = + peerSelector.getPeer( + (candidatePeer) -> + peerTask.getPeerRequirementFilter().test(candidatePeer) + && !usedEthPeers.contains(candidatePeer)); + if (peer.isEmpty()) { + executorResult = + new PeerTaskExecutorResult<>( + Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE); + continue; + } + usedEthPeers.add(peer.get()); + executorResult = executeAgainstPeer(peerTask, peer.get()); + } while (retriesRemaining-- > 0 + && executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS); + + return executorResult; + } + + public PeerTaskExecutorResult executeAgainstPeer( + final PeerTask peerTask, final EthPeer peer) { + String taskClassName = peerTask.getClass().getSimpleName(); + AtomicInteger inflightRequestCountForThisTaskClass = + inflightRequestCountByClassName.computeIfAbsent( + taskClassName, + (k) -> { + AtomicInteger inflightRequests = new AtomicInteger(0); + inflightRequestGauge.labels(inflightRequests::get, taskClassName); + return inflightRequests; + }); + MessageData requestMessageData = peerTask.getRequestMessage(); + PeerTaskExecutorResult executorResult; + int retriesRemaining = peerTask.getRetriesWithSamePeer(); + do { + try { + T result; + try (final OperationTimer.TimingContext ignored = + requestTimer.labels(taskClassName).startTimer()) { + inflightRequestCountForThisTaskClass.incrementAndGet(); + + MessageData responseMessageData = + requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer); + + result = peerTask.parseResponse(responseMessageData); + } finally { + inflightRequestCountForThisTaskClass.decrementAndGet(); + } + + if (peerTask.isSuccess(result)) { + peer.recordUsefulResponse(); + executorResult = + new PeerTaskExecutorResult<>( + Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS); + } else { + // At this point, the result is most likely empty. Technically, this is a valid result, so + // we don't penalise the peer, but it's also a useless result, so we return + // INVALID_RESPONSE code + executorResult = + new PeerTaskExecutorResult<>( + Optional.ofNullable(result), PeerTaskExecutorResponseCode.INVALID_RESPONSE); + } + + } catch (PeerNotConnected e) { + executorResult = + new PeerTaskExecutorResult<>( + Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED); + + } catch (InterruptedException | TimeoutException e) { + peer.recordRequestTimeout(requestMessageData.getCode()); + timeoutCounter.labels(taskClassName).inc(); + executorResult = + new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT); + + } catch (InvalidPeerTaskResponseException e) { + peer.recordUselessResponse(e.getMessage()); + invalidResponseCounter.labels(taskClassName).inc(); + executorResult = + new PeerTaskExecutorResult<>( + Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE); + + } catch (ExecutionException e) { + internalExceptionCounter.labels(taskClassName).inc(); + executorResult = + new PeerTaskExecutorResult<>( + Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); + } + } while (retriesRemaining-- > 0 + && executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS + && executorResult.responseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED + && sleepBetweenRetries()); + + return executorResult; + } + + private boolean sleepBetweenRetries() { + try { + // sleep for 1 second to match implemented wait between retries in AbstractRetryingPeerTask + Thread.sleep(1000); + return true; + } catch (InterruptedException e) { + return false; + } + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java new file mode 100644 index 0000000000..327461de15 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java @@ -0,0 +1,24 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +public enum PeerTaskExecutorResponseCode { + SUCCESS, + NO_PEER_AVAILABLE, + PEER_DISCONNECTED, + INTERNAL_SERVER_ERROR, + TIMEOUT, + INVALID_RESPONSE +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java new file mode 100644 index 0000000000..86dec85c29 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java @@ -0,0 +1,20 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import java.util.Optional; + +public record PeerTaskExecutorResult( + Optional result, PeerTaskExecutorResponseCode responseCode) {} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java new file mode 100644 index 0000000000..7a597eca8e --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java @@ -0,0 +1,56 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.RequestManager.ResponseStream; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class PeerTaskRequestSender { + private static final long DEFAULT_TIMEOUT_MS = 5_000; + + private final long timeoutMs; + + public PeerTaskRequestSender() { + this.timeoutMs = DEFAULT_TIMEOUT_MS; + } + + public PeerTaskRequestSender(final long timeoutMs) { + this.timeoutMs = timeoutMs; + } + + public MessageData sendRequest( + final SubProtocol subProtocol, final MessageData requestMessageData, final EthPeer ethPeer) + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException { + ResponseStream responseStream = + ethPeer.send(requestMessageData, subProtocol.getName(), ethPeer.getConnection()); + final CompletableFuture responseMessageDataFuture = new CompletableFuture<>(); + responseStream.then( + (boolean streamClosed, MessageData message, EthPeer peer) -> { + responseMessageDataFuture.complete(message); + }); + return responseMessageDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java new file mode 100644 index 0000000000..0262e276da --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -0,0 +1,290 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class PeerTaskExecutorTest { + private @Mock PeerSelector peerSelector; + private @Mock PeerTaskRequestSender requestSender; + private @Mock PeerTask peerTask; + private @Mock SubProtocol subprotocol; + private @Mock MessageData requestMessageData; + private @Mock MessageData responseMessageData; + private @Mock EthPeer ethPeer; + private AutoCloseable mockCloser; + + private PeerTaskExecutor peerTaskExecutor; + + @BeforeEach + public void beforeTest() { + mockCloser = MockitoAnnotations.openMocks(this); + peerTaskExecutor = new PeerTaskExecutor(peerSelector, requestSender, new NoOpMetricsSystem()); + } + + @AfterEach + public void afterTest() throws Exception { + mockCloser.close(); + } + + @Test + public void testExecuteAgainstPeerWithNoRetriesAndSuccessfulFlow() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + + Object responseObject = new Object(); + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordUsefulResponse(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.result().isPresent()); + Assertions.assertSame(responseObject, result.result().get()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode()); + } + + @Test + public void testExecuteAgainstPeerWithNoRetriesAndPartialSuccessfulFlow() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + + Object responseObject = new Object(); + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(false); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.result().isPresent()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode()); + } + + @Test + public void testExecuteAgainstPeerWithRetriesAndSuccessfulFlowAfterFirstFailure() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + Object responseObject = new Object(); + int requestMessageDataCode = 123; + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(2); + + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenThrow(new TimeoutException()) + .thenReturn(responseMessageData); + Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode); + Mockito.verify(ethPeer).recordUsefulResponse(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.result().isPresent()); + Assertions.assertSame(responseObject, result.result().get()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode()); + } + + @Test + public void testExecuteAgainstPeerWithNoRetriesAndPeerNotConnected() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException { + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenThrow(new PeerConnection.PeerNotConnected("")); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.result().isEmpty()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.PEER_DISCONNECTED, result.responseCode()); + } + + @Test + public void testExecuteAgainstPeerWithNoRetriesAndTimeoutException() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException { + int requestMessageDataCode = 123; + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenThrow(new TimeoutException()); + Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.result().isEmpty()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.TIMEOUT, result.responseCode()); + } + + @Test + public void testExecuteAgainstPeerWithNoRetriesAndInvalidResponseMessage() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)) + .thenThrow(new InvalidPeerTaskResponseException()); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordUselessResponse(null); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.result().isEmpty()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.responseCode()); + } + + @Test + @SuppressWarnings("unchecked") + public void testExecuteWithNoRetriesAndSuccessFlow() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + Object responseObject = new Object(); + + Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))) + .thenReturn(Optional.of(ethPeer)); + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(0); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordUsefulResponse(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.result().isPresent()); + Assertions.assertSame(responseObject, result.result().get()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode()); + } + + @Test + @SuppressWarnings("unchecked") + public void testExecuteWithPeerSwitchingAndSuccessFlow() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + Object responseObject = new Object(); + int requestMessageDataCode = 123; + EthPeer peer2 = Mockito.mock(EthPeer.class); + + Mockito.when(peerSelector.getPeer(Mockito.any(Predicate.class))) + .thenReturn(Optional.of(ethPeer)) + .thenReturn(Optional.of(peer2)); + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(2); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenThrow(new TimeoutException()); + Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, peer2)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + Mockito.when(peerTask.isSuccess(responseObject)).thenReturn(true); + + PeerTaskExecutorResult result = peerTaskExecutor.execute(peerTask); + + Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode); + Mockito.verify(peer2).recordUsefulResponse(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.result().isPresent()); + Assertions.assertSame(responseObject, result.result().get()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.responseCode()); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java new file mode 100644 index 0000000000..4041fb6303 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java @@ -0,0 +1,75 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.RequestManager; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +public class PeerTaskRequestSenderTest { + + private PeerTaskRequestSender peerTaskRequestSender; + + @BeforeEach + public void beforeTest() { + peerTaskRequestSender = new PeerTaskRequestSender(); + } + + @Test + public void testSendRequest() + throws PeerConnection.PeerNotConnected, ExecutionException, InterruptedException { + SubProtocol subprotocol = Mockito.mock(SubProtocol.class); + MessageData requestMessageData = Mockito.mock(MessageData.class); + MessageData responseMessageData = Mockito.mock(MessageData.class); + EthPeer peer = Mockito.mock(EthPeer.class); + PeerConnection peerConnection = Mockito.mock(PeerConnection.class); + RequestManager.ResponseStream responseStream = + Mockito.mock(RequestManager.ResponseStream.class); + + Mockito.when(peer.getConnection()).thenReturn(peerConnection); + Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); + Mockito.when(peer.send(requestMessageData, "subprotocol", peerConnection)) + .thenReturn(responseStream); + + CompletableFuture actualResponseMessageDataFuture = + CompletableFuture.supplyAsync( + () -> { + try { + return peerTaskRequestSender.sendRequest(subprotocol, requestMessageData, peer); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Thread.sleep(500); + ArgumentCaptor responseCallbackArgumentCaptor = + ArgumentCaptor.forClass(RequestManager.ResponseCallback.class); + Mockito.verify(responseStream).then(responseCallbackArgumentCaptor.capture()); + RequestManager.ResponseCallback responseCallback = responseCallbackArgumentCaptor.getValue(); + responseCallback.exec(false, responseMessageData, peer); + + Assertions.assertSame(responseMessageData, actualResponseMessageDataFuture.get()); + } +} diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java index 2d1ee26cfd..5f876fa4d8 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpMetricsSystem.java @@ -253,5 +253,14 @@ public class NoOpMetricsSystem implements ObservableMetricsSystem { "The count of labels used must match the count of labels expected."); Preconditions.checkNotNull(valueSupplier, "No valueSupplier specified"); } + + @Override + public boolean isLabelsObserved(final String... labelValues) { + Preconditions.checkArgument( + labelValues.length == labelCount, + "The count of labels used must match the count of labels expected."); + final String labelValuesString = String.join(",", labelValues); + return labelValuesCache.contains(labelValuesString); + } } } diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java index 144e7f3187..6f36f10d2c 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/noop/NoOpValueCollector.java @@ -36,4 +36,10 @@ public class NoOpValueCollector implements LabelledGauge { } labelValuesCreated.add(labelValuesString); } + + @Override + public boolean isLabelsObserved(final String... labelValues) { + final String labelValuesString = String.join(",", labelValues); + return labelValuesCreated.contains(labelValuesString); + } } diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java index 6aea56586a..e1d785c68e 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryGauge.java @@ -63,6 +63,14 @@ public class OpenTelemetryGauge implements LabelledGauge { } } + @Override + public boolean isLabelsObserved(final String... labelValues) { + Preconditions.checkArgument( + labelValues.length == labelNames.size(), + "label values and label names need the same number of elements"); + return observationsMap.containsKey(getLabels(labelValues)); + } + private Attributes getLabels(final String... labelValues) { final AttributesBuilder labelsBuilder = Attributes.builder(); for (int i = 0; i < labelNames.size(); i++) { diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java index 83000dfac3..b69e3f9062 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/prometheus/PrometheusGauge.java @@ -47,10 +47,7 @@ public class PrometheusGauge extends Collector implements LabelledGauge { @Override public synchronized void labels(final DoubleSupplier valueSupplier, final String... labelValues) { - if (labelValues.length != labelNames.size()) { - throw new IllegalArgumentException( - "Label values and label names must be the same cardinality"); - } + validateLabelsCardinality(labelValues); if (observationsMap.putIfAbsent(List.of(labelValues), valueSupplier) != null) { final String labelValuesString = String.join(",", labelValues); throw new IllegalArgumentException( @@ -58,6 +55,12 @@ public class PrometheusGauge extends Collector implements LabelledGauge { } } + @Override + public boolean isLabelsObserved(final String... labelValues) { + validateLabelsCardinality(labelValues); + return observationsMap.containsKey(List.of(labelValues)); + } + @Override public List collect() { final List samples = new ArrayList<>(); @@ -68,4 +71,11 @@ public class PrometheusGauge extends Collector implements LabelledGauge { metricName, labelNames, labels, valueSupplier.getAsDouble()))); return List.of(new MetricFamilySamples(metricName, Type.GAUGE, help, samples)); } + + private void validateLabelsCardinality(final String... labelValues) { + if (labelValues.length != labelNames.size()) { + throw new IllegalArgumentException( + "Label values and label names must be the same cardinality"); + } + } } diff --git a/plugin-api/build.gradle b/plugin-api/build.gradle index dd2da0a15e..91fb45239d 100644 --- a/plugin-api/build.gradle +++ b/plugin-api/build.gradle @@ -71,7 +71,7 @@ Calculated : ${currentHash} tasks.register('checkAPIChanges', FileStateChecker) { description = "Checks that the API for the Plugin-API project does not change without deliberate thought" files = sourceSets.main.allJava.files - knownHash = '4jVaj9yW88nHbX0KmTR3dPQRvj9x8Pvh5E9Ry7KRT6w=' + knownHash = 'WRdnBaP05fItpWHYSFz/vBBlRWL3sLGqzR3tzd+pOkA=' } check.dependsOn('checkAPIChanges') diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java index 724e31c58d..5357c6505a 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/metrics/LabelledGauge.java @@ -25,4 +25,15 @@ public interface LabelledGauge { * @param labelValues the label values */ void labels(final DoubleSupplier valueSupplier, final String... labelValues); + + /** + * Checks whether the supplied labelValues are already observed by this LabelledGauge + * + * @param labelValues The labelValues to check + * @return true if the supplied labelValues are already observed by this LabelledGauge, false + * otherwise + */ + default boolean isLabelsObserved(final String... labelValues) { + return false; + } }