diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/InvalidPeerTaskResponseException.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/InvalidPeerTaskResponseException.java new file mode 100644 index 0000000000..824c0860d7 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/InvalidPeerTaskResponseException.java @@ -0,0 +1,26 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +public class InvalidPeerTaskResponseException extends Exception { + + public InvalidPeerTaskResponseException() { + super(); + } + + public InvalidPeerTaskResponseException(final Throwable cause) { + super(cause); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/NoAvailablePeerException.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/NoAvailablePeerException.java new file mode 100644 index 0000000000..40c600d6fb --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/NoAvailablePeerException.java @@ -0,0 +1,17 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +public class NoAvailablePeerException extends Exception {} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManager.java new file mode 100644 index 0000000000..fc5bc691b7 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManager.java @@ -0,0 +1,64 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.p2p.peers.PeerId; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** "Manages" the EthPeers for the PeerTaskExecutor */ +public class PeerManager { + private static final Logger LOG = LoggerFactory.getLogger(PeerManager.class); + + // use a synchronized map to ensure the map is never modified by multiple threads at once + private final Map ethPeersByPeerId = + Collections.synchronizedMap(new HashMap<>()); + + /** + * Gets the highest reputation peer matching the supplies filter + * + * @param filter a filter to match prospective peers with + * @return the highest reputation peer matching the supplies filter + * @throws NoAvailablePeerException If there are no suitable peers + */ + public EthPeer getPeer(final Predicate filter) throws NoAvailablePeerException { + LOG.trace("Getting peer from pool of {} peers", ethPeersByPeerId.size()); + return ethPeersByPeerId.values().stream() + .filter(filter) + .max(Comparator.naturalOrder()) + .orElseThrow(NoAvailablePeerException::new); + } + + public Optional getPeerByPeerId(final PeerId peerId) { + return Optional.ofNullable(ethPeersByPeerId.get(peerId)); + } + + public void addPeer(final EthPeer ethPeer) { + ethPeersByPeerId.put(ethPeer.getConnection().getPeer(), ethPeer); + } + + public void removePeer(final PeerId peerId) { + ethPeersByPeerId.remove(peerId); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java new file mode 100644 index 0000000000..244908c921 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java @@ -0,0 +1,63 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; + +import java.util.Collection; + +/** + * Represents a task to be executed on an EthPeer by the PeerTaskExecutor + * + * @param The type of the result of this PeerTask + */ +public interface PeerTask { + /** + * Returns the SubProtocol used for this PeerTask + * + * @return the SubProtocol used for this PeerTask + */ + String getSubProtocol(); + + /** + * Gets the minimum required block number for a peer to have to successfully execute this task + * + * @return the minimum required block number for a peer to have to successfully execute this task + */ + long getRequiredBlockNumber(); + + /** + * Gets the request data to send to the EthPeer + * + * @return the request data to send to the EthPeer + */ + MessageData getRequestMessage(); + + /** + * Parses the MessageData response from the EthPeer + * + * @param messageData the response MessageData to be parsed + * @return a T built from the response MessageData + * @throws InvalidPeerTaskResponseException if the response messageData is invalid + */ + T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException; + + /** + * Gets the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor + * + * @return the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor + */ + Collection getPeerTaskBehaviors(); +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskBehavior.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskBehavior.java new file mode 100644 index 0000000000..fba9000a74 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskBehavior.java @@ -0,0 +1,20 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +public enum PeerTaskBehavior { + RETRY_WITH_SAME_PEER, + RETRY_WITH_OTHER_PEERS +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java new file mode 100644 index 0000000000..fabf88d05d --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -0,0 +1,157 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; +import org.hyperledger.besu.plugin.services.metrics.OperationTimer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +/** Manages the execution of PeerTasks, respecting their PeerTaskBehavior */ +public class PeerTaskExecutor { + private static final long[] WAIT_TIME_BEFORE_RETRY = {0, 20000, 5000}; + + private final PeerManager peerManager; + private final PeerTaskRequestSender requestSender; + private final Supplier protocolSpecSupplier; + private final LabelledMetric requestTimer; + + public PeerTaskExecutor( + final PeerManager peerManager, + final PeerTaskRequestSender requestSender, + final Supplier protocolSpecSupplier, + final MetricsSystem metricsSystem) { + this.peerManager = peerManager; + this.requestSender = requestSender; + this.protocolSpecSupplier = protocolSpecSupplier; + requestTimer = + metricsSystem.createLabelledTimer( + BesuMetricCategory.PEERS, "Peer Task Executor Request Time", "", "Task Class Name"); + } + + public PeerTaskExecutorResult execute(final PeerTask peerTask) { + PeerTaskExecutorResult executorResult; + int triesRemaining = + peerTask.getPeerTaskBehaviors().contains(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS) ? 3 : 1; + final Collection usedEthPeers = new ArrayList<>(); + do { + EthPeer peer; + try { + peer = + peerManager.getPeer( + (candidatePeer) -> + isPeerUnused(candidatePeer, usedEthPeers) + && (protocolSpecSupplier.get().isPoS() + || isPeerHeightHighEnough( + candidatePeer, peerTask.getRequiredBlockNumber())) + && isPeerProtocolSuitable(candidatePeer, peerTask.getSubProtocol())); + usedEthPeers.add(peer); + executorResult = executeAgainstPeer(peerTask, peer); + } catch (NoAvailablePeerException e) { + executorResult = + new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE); + } + } while (--triesRemaining > 0 + && executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS); + + return executorResult; + } + + public CompletableFuture> executeAsync(final PeerTask peerTask) { + return CompletableFuture.supplyAsync(() -> execute(peerTask)); + } + + public PeerTaskExecutorResult executeAgainstPeer( + final PeerTask peerTask, final EthPeer peer) { + MessageData requestMessageData = peerTask.getRequestMessage(); + PeerTaskExecutorResult executorResult; + int triesRemaining = + peerTask.getPeerTaskBehaviors().contains(PeerTaskBehavior.RETRY_WITH_SAME_PEER) ? 3 : 1; + do { + try { + + MessageData responseMessageData; + try (final OperationTimer.TimingContext timingContext = + requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) { + responseMessageData = + requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer); + } + T result = peerTask.parseResponse(responseMessageData); + peer.recordUsefulResponse(); + executorResult = new PeerTaskExecutorResult<>(result, PeerTaskExecutorResponseCode.SUCCESS); + + } catch (PeerConnection.PeerNotConnected e) { + executorResult = + new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.PEER_DISCONNECTED); + + } catch (InterruptedException | TimeoutException e) { + peer.recordRequestTimeout(requestMessageData.getCode()); + executorResult = new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.TIMEOUT); + + } catch (InvalidPeerTaskResponseException e) { + peer.recordUselessResponse(e.getMessage()); + executorResult = + new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.INVALID_RESPONSE); + + } catch (ExecutionException e) { + executorResult = + new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR); + } + } while (--triesRemaining > 0 + && executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS + && executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED + && sleepBetweenRetries(WAIT_TIME_BEFORE_RETRY[triesRemaining])); + + return executorResult; + } + + public CompletableFuture> executeAgainstPeerAsync( + final PeerTask peerTask, final EthPeer peer) { + return CompletableFuture.supplyAsync(() -> executeAgainstPeer(peerTask, peer)); + } + + private boolean sleepBetweenRetries(final long sleepTime) { + try { + Thread.sleep(sleepTime); + return true; + } catch (InterruptedException e) { + return false; + } + } + + private static boolean isPeerUnused( + final EthPeer ethPeer, final Collection usedEthPeers) { + return !usedEthPeers.contains(ethPeer); + } + + private static boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) { + return ethPeer.chainState().getEstimatedHeight() >= requiredHeight; + } + + private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final String protocol) { + return ethPeer.getProtocolName().equals(protocol); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java new file mode 100644 index 0000000000..327461de15 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResponseCode.java @@ -0,0 +1,24 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +public enum PeerTaskExecutorResponseCode { + SUCCESS, + NO_PEER_AVAILABLE, + PEER_DISCONNECTED, + INTERNAL_SERVER_ERROR, + TIMEOUT, + INVALID_RESPONSE +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java new file mode 100644 index 0000000000..f89bc67f61 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorResult.java @@ -0,0 +1,35 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import java.util.Optional; + +public class PeerTaskExecutorResult { + private final Optional result; + private final PeerTaskExecutorResponseCode responseCode; + + public PeerTaskExecutorResult(final T result, final PeerTaskExecutorResponseCode responseCode) { + this.result = Optional.ofNullable(result); + this.responseCode = responseCode; + } + + public Optional getResult() { + return result; + } + + public PeerTaskExecutorResponseCode getResponseCode() { + return responseCode; + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java new file mode 100644 index 0000000000..9d9ceed321 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSender.java @@ -0,0 +1,55 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.RequestManager.ResponseStream; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class PeerTaskRequestSender { + private static final long DEFAULT_TIMEOUT_MS = 20_000; + + private final long timeoutMs; + + public PeerTaskRequestSender() { + this.timeoutMs = DEFAULT_TIMEOUT_MS; + } + + public PeerTaskRequestSender(final long timeoutMs) { + this.timeoutMs = timeoutMs; + } + + public MessageData sendRequest( + final String subProtocol, final MessageData requestMessageData, final EthPeer ethPeer) + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException { + ResponseStream responseStream = + ethPeer.send(requestMessageData, subProtocol, ethPeer.getConnection()); + final CompletableFuture responseMessageDataFuture = new CompletableFuture<>(); + responseStream.then( + (boolean streamClosed, MessageData message, EthPeer peer) -> { + responseMessageDataFuture.complete(message); + }); + return responseMessageDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManagerTest.java new file mode 100644 index 0000000000..54e0b88b5f --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerManagerTest.java @@ -0,0 +1,80 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; + +import java.time.Clock; +import java.util.Collections; +import java.util.Set; + +import org.apache.tuweni.bytes.Bytes; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class PeerManagerTest { + + public PeerManager peerManager; + + @BeforeEach + public void beforeTest() { + peerManager = new PeerManager(); + } + + @Test + public void testGetPeer() throws NoAvailablePeerException { + EthPeer protocol1With5ReputationPeer = + createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 5); + peerManager.addPeer(protocol1With5ReputationPeer); + EthPeer protocol1With4ReputationPeer = + createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol1", 4); + peerManager.addPeer(protocol1With4ReputationPeer); + EthPeer protocol2With50ReputationPeer = + createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 50); + peerManager.addPeer(protocol2With50ReputationPeer); + EthPeer protocol2With4ReputationPeer = + createTestPeer(Set.of(Capability.create("capability1", 1)), "protocol2", 4); + peerManager.addPeer(protocol2With4ReputationPeer); + + EthPeer result = peerManager.getPeer((p) -> p.getProtocolName().equals("protocol1")); + + Assertions.assertSame(protocol1With5ReputationPeer, result); + } + + private EthPeer createTestPeer( + final Set connectionCapabilities, + final String protocolName, + final int reputationAdjustment) { + PeerConnection peerConnection = new MockPeerConnection(connectionCapabilities); + EthPeer peer = + new EthPeer( + peerConnection, + protocolName, + null, + Collections.emptyList(), + 1, + Clock.systemUTC(), + Collections.emptyList(), + Bytes.EMPTY); + for (int i = 0; i < reputationAdjustment; i++) { + peer.getReputation().recordUsefulResponse(); + } + return peer; + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java new file mode 100644 index 0000000000..029cb19e10 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -0,0 +1,268 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class PeerTaskExecutorTest { + private @Mock PeerManager peerManager; + private @Mock PeerTaskRequestSender requestSender; + private @Mock ProtocolSpec protocolSpec; + private @Mock PeerTask peerTask; + private @Mock MessageData requestMessageData; + private @Mock MessageData responseMessageData; + private @Mock EthPeer ethPeer; + private AutoCloseable mockCloser; + + private PeerTaskExecutor peerTaskExecutor; + + @BeforeEach + public void beforeTest() { + mockCloser = MockitoAnnotations.openMocks(this); + peerTaskExecutor = + new PeerTaskExecutor( + peerManager, requestSender, () -> protocolSpec, new NoOpMetricsSystem()); + } + + @AfterEach + public void afterTest() throws Exception { + mockCloser.close(); + } + + @Test + public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndSuccessfulFlow() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + String subprotocol = "subprotocol"; + Object responseObject = new Object(); + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordUsefulResponse(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.getResult().isPresent()); + Assertions.assertSame(responseObject, result.getResult().get()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.getResponseCode()); + } + + @Test + public void testExecuteAgainstPeerWithRetryBehaviorsAndSuccessfulFlowAfterFirstFailure() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + String subprotocol = "subprotocol"; + Object responseObject = new Object(); + int requestMessageDataCode = 123; + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getPeerTaskBehaviors()) + .thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_SAME_PEER)); + + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenThrow(new TimeoutException()) + .thenReturn(responseMessageData); + Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode); + Mockito.verify(ethPeer).recordUsefulResponse(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.getResult().isPresent()); + Assertions.assertSame(responseObject, result.getResult().get()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.getResponseCode()); + } + + @Test + public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndPeerNotConnected() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + String subprotocol = "subprotocol"; + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenThrow(new PeerConnection.PeerNotConnected("")); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.getResult().isEmpty()); + Assertions.assertEquals( + PeerTaskExecutorResponseCode.PEER_DISCONNECTED, result.getResponseCode()); + } + + @Test + public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndTimeoutException() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + String subprotocol = "subprotocol"; + int requestMessageDataCode = 123; + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenThrow(new TimeoutException()); + Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.getResult().isEmpty()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.TIMEOUT, result.getResponseCode()); + } + + @Test + public void testExecuteAgainstPeerWithNoPeerTaskBehaviorsAndInvalidResponseMessage() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException { + String subprotocol = "subprotocol"; + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)) + .thenThrow(new InvalidPeerTaskResponseException()); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordUselessResponse(null); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.getResult().isEmpty()); + Assertions.assertEquals( + PeerTaskExecutorResponseCode.INVALID_RESPONSE, result.getResponseCode()); + } + + @Test + @SuppressWarnings("unchecked") + public void testExecuteWithNoPeerTaskBehaviorsAndSuccessFlow() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException, + NoAvailablePeerException { + String subprotocol = "subprotocol"; + Object responseObject = new Object(); + + Mockito.when(peerManager.getPeer(Mockito.any(Predicate.class))).thenReturn(ethPeer); + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getPeerTaskBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + + PeerTaskExecutorResult result = peerTaskExecutor.executeAgainstPeer(peerTask, ethPeer); + + Mockito.verify(ethPeer).recordUsefulResponse(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.getResult().isPresent()); + Assertions.assertSame(responseObject, result.getResult().get()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.getResponseCode()); + } + + @Test + @SuppressWarnings("unchecked") + public void testExecuteWithPeerSwitchingAndSuccessFlow() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException, + InvalidPeerTaskResponseException, + NoAvailablePeerException { + String subprotocol = "subprotocol"; + Object responseObject = new Object(); + int requestMessageDataCode = 123; + EthPeer peer2 = Mockito.mock(EthPeer.class); + + Mockito.when(peerManager.getPeer(Mockito.any(Predicate.class))) + .thenReturn(ethPeer) + .thenReturn(peer2); + + Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); + Mockito.when(peerTask.getPeerTaskBehaviors()) + .thenReturn(List.of(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS)); + Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) + .thenThrow(new TimeoutException()); + Mockito.when(requestMessageData.getCode()).thenReturn(requestMessageDataCode); + Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, peer2)) + .thenReturn(responseMessageData); + Mockito.when(peerTask.parseResponse(responseMessageData)).thenReturn(responseObject); + + PeerTaskExecutorResult result = peerTaskExecutor.execute(peerTask); + + Mockito.verify(ethPeer).recordRequestTimeout(requestMessageDataCode); + Mockito.verify(peer2).recordUsefulResponse(); + + Assertions.assertNotNull(result); + Assertions.assertTrue(result.getResult().isPresent()); + Assertions.assertSame(responseObject, result.getResult().get()); + Assertions.assertEquals(PeerTaskExecutorResponseCode.SUCCESS, result.getResponseCode()); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java new file mode 100644 index 0000000000..9e161031f5 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRequestSenderTest.java @@ -0,0 +1,77 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.RequestManager; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +public class PeerTaskRequestSenderTest { + + private PeerTaskRequestSender peerTaskRequestSender; + + @BeforeEach + public void beforeTest() { + peerTaskRequestSender = new PeerTaskRequestSender(); + } + + @Test + public void testSendRequest() + throws PeerConnection.PeerNotConnected, + ExecutionException, + InterruptedException, + TimeoutException { + String subprotocol = "subprotocol"; + MessageData requestMessageData = Mockito.mock(MessageData.class); + MessageData responseMessageData = Mockito.mock(MessageData.class); + EthPeer peer = Mockito.mock(EthPeer.class); + PeerConnection peerConnection = Mockito.mock(PeerConnection.class); + RequestManager.ResponseStream responseStream = + Mockito.mock(RequestManager.ResponseStream.class); + + Mockito.when(peer.getConnection()).thenReturn(peerConnection); + Mockito.when(peer.send(requestMessageData, subprotocol, peerConnection)) + .thenReturn(responseStream); + + CompletableFuture actualResponseMessageDataFuture = + CompletableFuture.supplyAsync( + () -> { + try { + return peerTaskRequestSender.sendRequest(subprotocol, requestMessageData, peer); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Thread.sleep(500); + ArgumentCaptor responseCallbackArgumentCaptor = + ArgumentCaptor.forClass(RequestManager.ResponseCallback.class); + Mockito.verify(responseStream).then(responseCallbackArgumentCaptor.capture()); + RequestManager.ResponseCallback responseCallback = responseCallbackArgumentCaptor.getValue(); + responseCallback.exec(false, responseMessageData, peer); + + Assertions.assertSame(responseMessageData, actualResponseMessageDataFuture.get()); + } +}