Fix ConcurrentModificationException on ReattemptPendingPeerRequests (#4206)

* Handle ConcurrentModificationException on ReattemptPendingPeerRequests
* Replace removeIf with Iterator

Signed-off-by: Gabriel Trintinalia <gabriel.trintinalia@consensys.net>

Co-authored-by: mark-terry <36909937+mark-terry@users.noreply.github.com>
Co-authored-by: Sally MacFarlane <sally.macfarlane@consensys.net>
pull/4222/head
Gabriel-Trintinalia 2 years ago committed by GitHub
parent 98dc2ace37
commit 8f89580542
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  2. 72
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeersTest.java

@ -36,6 +36,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -164,9 +165,16 @@ public class EthPeers {
dispatchMessage(peer, ethMessage, protocolName);
}
private void reattemptPendingPeerRequests() {
@VisibleForTesting
void reattemptPendingPeerRequests() {
synchronized (this) {
pendingRequests.removeIf(PendingPeerRequest::attemptExecution);
final Iterator<PendingPeerRequest> iterator = pendingRequests.iterator();
while (iterator.hasNext()) {
final PendingPeerRequest request = iterator.next();
if (request.attemptExecution()) {
pendingRequests.remove(request);
}
}
}
}

@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@ -40,6 +41,7 @@ import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
public class EthPeersTest {
@ -265,6 +267,76 @@ public class EthPeersTest {
assertRequestFailure(pendingRequest, CancellationException.class);
}
@Test
public void shouldNotFailWhenAttemptExecutionDisconnectSamePeer() throws PeerNotConnected {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final EthPeer ethPeer = spy(peer.getEthPeer());
// Force request to be added to pending request list
when(ethPeer.hasAvailableRequestCapacity()).thenReturn(false);
final PendingPeerRequest pendingPeerRequest =
ethPeers.executePeerRequest(peerRequest, 10, Optional.of(ethPeer));
// Force Request Attempt to cause the peer to disconnect
when(ethPeer.hasAvailableRequestCapacity())
.thenAnswer(
(Answer<Boolean>)
invocation -> {
// Force Disconnect only on the first execution
if (!peer.getPeerConnection().isDisconnected()) {
peer.disconnect(DisconnectReason.UNKNOWN); // Force Peer to disconnect
}
return true;
});
// Sent Pending Requests
ethPeers.reattemptPendingPeerRequests();
// Request should be aborted.
assertRequestFailure(pendingPeerRequest, CancellationException.class);
// Mock works
assertThat(peer.getEthPeer().isDisconnected()).isTrue(); // peer is disconnected
}
@Test
public void shouldNotFailWhenAttemptExecutionDisconnectAnotherPeer() throws PeerNotConnected {
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final EthPeer ethPeer = spy(peer.getEthPeer());
// Force request to be added to pending request list
when(ethPeer.hasAvailableRequestCapacity()).thenReturn(false);
final PendingPeerRequest pendingPeerRequest =
ethPeers.executePeerRequest(peerRequest, 10, Optional.of(ethPeer));
final RespondingEthPeer peerToDisconnect =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
// Force Request Attempt to cause the peer to disconnect
when(ethPeer.hasAvailableRequestCapacity())
.thenAnswer(
(Answer<Boolean>)
invocation -> {
// Force Disconnect only on the first execution
if (!peerToDisconnect.getPeerConnection().isDisconnected()) {
peerToDisconnect.disconnect(
DisconnectReason.UNKNOWN); // Force Peer to disconnect
}
return true;
});
// Sent Pending Requests
ethPeers.reattemptPendingPeerRequests();
// Request Should Execute
assertRequestSuccessful(pendingPeerRequest);
// Mock works
assertThat(peerToDisconnect.getEthPeer().isDisconnected()).isTrue(); // peer is disconnected
}
@Test
public void toString_hasExpectedInfo() {
assertThat(ethPeers.toString()).isEqualTo("0 EthPeers {}");

Loading…
Cancel
Save