diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index 14ebf587c5..b45ab54a17 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -36,6 +36,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,9 +165,16 @@ public class EthPeers { dispatchMessage(peer, ethMessage, protocolName); } - private void reattemptPendingPeerRequests() { + @VisibleForTesting + void reattemptPendingPeerRequests() { synchronized (this) { - pendingRequests.removeIf(PendingPeerRequest::attemptExecution); + final Iterator iterator = pendingRequests.iterator(); + while (iterator.hasNext()) { + final PendingPeerRequest request = iterator.next(); + if (request.attemptExecution()) { + pendingRequests.remove(request); + } + } } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java index 4ae23a0116..a3476f4690 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -40,6 +41,7 @@ import java.util.function.Consumer; import org.junit.Before; import org.junit.Test; +import org.mockito.stubbing.Answer; public class EthPeersTest { @@ -265,6 +267,76 @@ public class EthPeersTest { assertRequestFailure(pendingRequest, CancellationException.class); } + @Test + public void shouldNotFailWhenAttemptExecutionDisconnectSamePeer() throws PeerNotConnected { + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final EthPeer ethPeer = spy(peer.getEthPeer()); + + // Force request to be added to pending request list + when(ethPeer.hasAvailableRequestCapacity()).thenReturn(false); + + final PendingPeerRequest pendingPeerRequest = + ethPeers.executePeerRequest(peerRequest, 10, Optional.of(ethPeer)); + + // Force Request Attempt to cause the peer to disconnect + when(ethPeer.hasAvailableRequestCapacity()) + .thenAnswer( + (Answer) + invocation -> { + // Force Disconnect only on the first execution + if (!peer.getPeerConnection().isDisconnected()) { + peer.disconnect(DisconnectReason.UNKNOWN); // Force Peer to disconnect + } + return true; + }); + + // Sent Pending Requests + ethPeers.reattemptPendingPeerRequests(); + + // Request should be aborted. + assertRequestFailure(pendingPeerRequest, CancellationException.class); + + // Mock works + assertThat(peer.getEthPeer().isDisconnected()).isTrue(); // peer is disconnected + } + + @Test + public void shouldNotFailWhenAttemptExecutionDisconnectAnotherPeer() throws PeerNotConnected { + final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + final EthPeer ethPeer = spy(peer.getEthPeer()); + + // Force request to be added to pending request list + when(ethPeer.hasAvailableRequestCapacity()).thenReturn(false); + + final PendingPeerRequest pendingPeerRequest = + ethPeers.executePeerRequest(peerRequest, 10, Optional.of(ethPeer)); + + final RespondingEthPeer peerToDisconnect = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + + // Force Request Attempt to cause the peer to disconnect + when(ethPeer.hasAvailableRequestCapacity()) + .thenAnswer( + (Answer) + invocation -> { + // Force Disconnect only on the first execution + if (!peerToDisconnect.getPeerConnection().isDisconnected()) { + peerToDisconnect.disconnect( + DisconnectReason.UNKNOWN); // Force Peer to disconnect + } + return true; + }); + + // Sent Pending Requests + ethPeers.reattemptPendingPeerRequests(); + + // Request Should Execute + assertRequestSuccessful(pendingPeerRequest); + + // Mock works + assertThat(peerToDisconnect.getEthPeer().isDisconnected()).isTrue(); // peer is disconnected + } + @Test public void toString_hasExpectedInfo() { assertThat(ethPeers.toString()).isEqualTo("0 EthPeers {}");