diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index a721697717..09a998cf64 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -470,10 +470,10 @@ public class EthPeers implements PeerSelector { // Part of the PeerSelector interface, to be split apart later @Override public Optional getPeer(final Predicate filter) { - return streamBestPeers() + return streamAvailablePeers() .filter(filter) .filter(EthPeer::hasAvailableRequestCapacity) - .findFirst(); + .min(LEAST_TO_MOST_BUSY); } // Part of the PeerSelector interface, to be split apart later diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java index 4436022c9a..1d6464ac26 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTask.java @@ -18,7 +18,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; -import java.util.Collection; import java.util.function.Predicate; /** @@ -51,11 +50,22 @@ public interface PeerTask { T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException; /** - * Gets the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor + * Gets the number of times this task may be attempted against other peers * - * @return the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor + * @return the number of times this task may be attempted against other peers */ - Collection getPeerTaskRetryBehaviors(); + default int getRetriesWithOtherPeer() { + return 5; + } + + /** + * Gets the number of times this task may be attempted against the same peer + * + * @return the number of times this task may be attempted against the same peer + */ + default int getRetriesWithSamePeer() { + return 5; + } /** * Gets a Predicate that checks if an EthPeer is suitable for this PeerTask diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java index 1734c1e876..7b485ee6c3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutor.java @@ -15,7 +15,7 @@ package org.hyperledger.besu.ethereum.eth.manager.peertask; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; +import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -31,9 +31,6 @@ import java.util.concurrent.TimeoutException; /** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */ public class PeerTaskExecutor { - public static final int RETRIES_WITH_SAME_PEER = 2; - public static final int RETRIES_WITH_OTHER_PEER = 2; - public static final int NO_RETRIES = 0; private final PeerSelector peerSelector; private final PeerTaskRequestSender requestSender; @@ -55,10 +52,7 @@ public class PeerTaskExecutor { public PeerTaskExecutorResult execute(final PeerTask peerTask) { PeerTaskExecutorResult executorResult; - int retriesRemaining = - peerTask.getPeerTaskRetryBehaviors().contains(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS) - ? RETRIES_WITH_OTHER_PEER - : NO_RETRIES; + int retriesRemaining = peerTask.getRetriesWithOtherPeer(); final Collection usedEthPeers = new HashSet<>(); do { Optional peer = @@ -84,10 +78,7 @@ public class PeerTaskExecutor { final PeerTask peerTask, final EthPeer peer) { MessageData requestMessageData = peerTask.getRequestMessage(); PeerTaskExecutorResult executorResult; - int retriesRemaining = - peerTask.getPeerTaskRetryBehaviors().contains(PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER) - ? RETRIES_WITH_SAME_PEER - : NO_RETRIES; + int retriesRemaining = peerTask.getRetriesWithSamePeer(); do { try { T result; @@ -103,7 +94,7 @@ public class PeerTaskExecutor { new PeerTaskExecutorResult<>( Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS); - } catch (PeerConnection.PeerNotConnected e) { + } catch (PeerNotConnected e) { executorResult = new PeerTaskExecutorResult<>( Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRetryBehavior.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRetryBehavior.java deleted file mode 100644 index 53e2def661..0000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskRetryBehavior.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright contributors to Hyperledger Besu. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.manager.peertask; - -public enum PeerTaskRetryBehavior { - RETRY_WITH_SAME_PEER, - RETRY_WITH_OTHER_PEERS -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java index 6dfd8d0e20..0015c1ffbc 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerTaskExecutorTest.java @@ -20,8 +20,6 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -69,7 +67,7 @@ public class PeerTaskExecutorTest { Object responseObject = new Object(); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -97,8 +95,7 @@ public class PeerTaskExecutorTest { int requestMessageDataCode = 123; Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()) - .thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_SAME_PEER)); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(2); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); @@ -127,7 +124,7 @@ public class PeerTaskExecutorTest { TimeoutException { Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -149,7 +146,7 @@ public class PeerTaskExecutorTest { int requestMessageDataCode = 123; Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -174,7 +171,7 @@ public class PeerTaskExecutorTest { InvalidPeerTaskResponseException { Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -205,7 +202,8 @@ public class PeerTaskExecutorTest { .thenReturn(Optional.of(ethPeer)); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()).thenReturn(Collections.emptyList()); + Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(0); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(subprotocol.getName()).thenReturn("subprotocol"); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) @@ -239,8 +237,8 @@ public class PeerTaskExecutorTest { .thenReturn(Optional.of(peer2)); Mockito.when(peerTask.getRequestMessage()).thenReturn(requestMessageData); - Mockito.when(peerTask.getPeerTaskRetryBehaviors()) - .thenReturn(List.of(PeerTaskRetryBehavior.RETRY_WITH_OTHER_PEERS)); + Mockito.when(peerTask.getRetriesWithOtherPeer()).thenReturn(2); + Mockito.when(peerTask.getRetriesWithSamePeer()).thenReturn(0); Mockito.when(peerTask.getSubProtocol()).thenReturn(subprotocol); Mockito.when(requestSender.sendRequest(subprotocol, requestMessageData, ethPeer)) .thenThrow(new TimeoutException());