From 8186a77d70b92323442b32b0e73a17d8318e4d0c Mon Sep 17 00:00:00 2001 From: Matilda Clerke Date: Wed, 9 Oct 2024 10:29:03 +1100 Subject: [PATCH 1/3] 7311: Rework getPeer again to use LEAST_TO_MOST_BUSY comparator Signed-off-by: Matilda Clerke --- .../org/hyperledger/besu/ethereum/eth/manager/EthPeers.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index d070c35ce1..09a998cf64 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -470,7 +470,10 @@ public class EthPeers implements PeerSelector { // Part of the PeerSelector interface, to be split apart later @Override public Optional getPeer(final Predicate filter) { - return streamBestPeers().filter(filter).filter(EthPeer::hasAvailableRequestCapacity).findFirst(); + return streamAvailablePeers() + .filter(filter) + .filter(EthPeer::hasAvailableRequestCapacity) + .min(LEAST_TO_MOST_BUSY); } // Part of the PeerSelector interface, to be split apart later From 37b0ec26597b2192bbfa2d0adadbb7c7c7d75fbe Mon Sep 17 00:00:00 2001 From: Matilda Clerke Date: Wed, 9 Oct 2024 10:30:42 +1100 Subject: [PATCH 2/3] 7311: Import PeerNotConnected class instead of using fully qualified class name Signed-off-by: Matilda Clerke --- .../besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index 1734c1e876..f3ddc5abbe 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -15,7 +15,7 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -103,7 +103,7 @@ public class PeerTaskExecutor { new PeerTaskExecutorResult<>( Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS); - } catch (PeerConnection.PeerNotConnected e) { + } catch (PeerNotConnected e) { executorResult = new PeerTaskExecutorResult<>( Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED); From 545fd5c29dd00a7f0f3117dd864b4650a9b643ac Mon Sep 17 00:00:00 2001 From: Matilda Clerke Date: Thu, 10 Oct 2024 08:41:18 +1100 Subject: [PATCH 3/3] 7311: Change to specifying retry counts in PeerTask instead of behavior enums Signed-off-by: Matilda Clerke --- .../eth/manager/peertask/PeerTask.java | 18 +++++++++++++---- .../manager/peertask/PeerTaskExecutor.java | 13 ++---------- .../peertask/PeerTaskRetryBehavior.java | 20 ------------------- .../peertask/PeerTaskExecutorTest.java | 20 +++++++++---------- 4 files changed, 25 insertions(+), 46 deletions(-) delete mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRetryBehavior.java diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java index 4436022c9a..1d6464ac26 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java @@ -18,7 +18,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; -import java.util.Collection; import java.util.function.Predicate; /** @@ -51,11 +50,22 @@ public interface PeerTask { T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException; /** - * Gets the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor + * Gets the number of times this task may be attempted against other peers * - * @return the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor + * @return the number of times this task may be attempted against other peers */ - Collection getPeerTaskRetryBehaviors(); + default int getRetriesWithOtherPeer() { + return 5; + } + + /** + * Gets the number of times this task may be attempted against the same peer + * + * @return the number of times this task may be attempted against the same peer + */ + default int getRetriesWithSamePeer() { + return 5; + } /** * Gets a Predicate that checks if an EthPeer is suitable for this PeerTask diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index f3ddc5abbe..7b485ee6c3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -31,9 +31,6 @@ import java.util.concurrent.TimeoutException; /** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */ public class PeerTaskExecutor { - public static final int RETRIES_WITH_SAME_PEER = 2; - public static final int RETRIES_WITH_OTHER_PEER = 2; - public static final int NO_RETRIES = 0; private final PeerSelector peerSelector; private final PeerTaskRequestSender requestSender; @@ -55,10 +52,7 @@ public class PeerTaskExecutor { public PeerTaskExecutorResult execute(final PeerTask peerTask) { PeerTaskExecutorResult executorResult; - int retriesRemaining = - peerTask.getPeerTaskRetryBehaviors().contains(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS) - ? RETRIES_WITH_OTHER_PEER - : NO_RETRIES; + int retriesRemaining = peerTask.getRetriesWithOtherPeer(); final Collection usedEthPeers = new HashSet<>(); do { Optional peer = @@ -84,10 +78,7 @@ public class PeerTaskExecutor { final PeerTask peerTask, final EthPeer peer) { MessageData requestMessageData = peerTask.getRequestMessage(); PeerTaskExecutorResult executorResult; - int retriesRemaining = - peerTask.getPeerTaskRetryBehaviors().contains(PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER) - ? RETRIES_WITH_SAME_PEER - : NO_RETRIES; + int retriesRemaining = peerTask.getRetriesWithSamePeer(); do { try { T result; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRetryBehavior.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRetryBehavior.java deleted file mode 100644 index 53e2def661..0000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRetryBehavior.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright contributors to Hyperledger Besu. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.manager.peertask; - -public enum PeerTaskRetryBehavior { - RETRY_WITH_SAME_PEER, - RETRY_WITH_OTHER_PEERS -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java index 6dfd8d0e20..0015c1ffbc 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -20,8 +20,6 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -69,7 +67,7 @@ public class PeerTaskExecutorTest { Object responseObject = new Object(); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -97,8 +95,7 @@ public class PeerTaskExecutorTest { int requestMessageDataCode = 123; Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()) - .thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER)); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(2); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); @@ -127,7 +124,7 @@ public class PeerTaskExecutorTest { TimeoutException { Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -149,7 +146,7 @@ public class PeerTaskExecutorTest { int requestMessageDataCode = 123; Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -174,7 +171,7 @@ public class PeerTaskExecutorTest { InvalidPeerTaskResponseException { Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -205,7 +202,8 @@ public class PeerTaskExecutorTest { .thenReturn(Optional.of(ethPeer)); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(0); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -239,8 +237,8 @@ public class PeerTaskExecutorTest { .thenReturn(Optional.of(peer2)); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()) - .thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS)); + Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(2); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenThrow(new TimeoutException());