Filter out disconnected peers when fetching available peers (#4269)

* Filter out disconnected peers when fetching available peers

Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
pull/4303/head
Fabio Di Fabio 2 years ago committed by GitHub
parent 16922cac83
commit b3b8e0aaeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java
  3. 19
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/PendingPeerRequest.java
  4. 2
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java
  5. 28
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java
  6. 4
      ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DetermineCommonAncestorTaskTest.java

@ -5,6 +5,7 @@
### Additions and Improvements
- Upgrade besu-native to 0.6.0 and use Blake2bf native implementation if available by default [#4264](https://github.com/hyperledger/besu/pull/4264)
- Better management of jemalloc presence/absence in startup script [#4237](https://github.com/hyperledger/besu/pull/4237)
- Filter out disconnected peers when fetching available peers [#4269](https://github.com/hyperledger/besu/pull/4269)
### Bug Fixes

@ -203,7 +203,9 @@ public class EthPeers {
}
public Stream<EthPeer> streamAvailablePeers() {
return streamAllPeers().filter(EthPeer::readyForRequests);
return streamAllPeers()
.filter(EthPeer::readyForRequests)
.filter(peer -> !peer.isDisconnected());
}
public Stream<EthPeer> streamBestPeers() {

@ -52,18 +52,18 @@ public class PendingPeerRequest {
if (result.isDone()) {
return true;
}
final Optional<EthPeer> leastBusySuitablePeer = getLeastBusySuitablePeer();
if (!leastBusySuitablePeer.isPresent()) {
final Optional<EthPeer> maybePeer = getPeerToUse();
if (maybePeer.isEmpty()) {
// No peers have the required height.
result.completeExceptionally(new NoAvailablePeersException());
return true;
} else {
// At least one peer has the required height, but we not be able to use it if it's busy
final Optional<EthPeer> selectedPeer =
leastBusySuitablePeer.filter(EthPeer::hasAvailableRequestCapacity);
// At least one peer has the required height, but we are not able to use it if it's busy
final Optional<EthPeer> maybePeerWithCapacity =
maybePeer.filter(EthPeer::hasAvailableRequestCapacity);
selectedPeer.ifPresent(this::sendRequest);
return selectedPeer.isPresent();
maybePeerWithCapacity.ifPresent(this::sendRequest);
return maybePeerWithCapacity.isPresent();
}
}
@ -79,8 +79,9 @@ public class PendingPeerRequest {
}
}
private Optional<EthPeer> getLeastBusySuitablePeer() {
return peer.isPresent()
private Optional<EthPeer> getPeerToUse() {
// return the assigned peer if still valid, otherwise switch to another peer
return peer.filter(p -> !p.isDisconnected()).isPresent()
? peer
: ethPeers
.streamAvailablePeers()

@ -131,7 +131,7 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry
return getEthContext()
.getEthPeers()
.streamBestPeers()
.filter(peer -> !peer.isDisconnected() && !triedPeers.contains(peer));
.filter(peer -> !triedPeers.contains(peer));
}
private void refreshPeers() {

@ -85,21 +85,29 @@ public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>>
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
peerCountToTimeout.set(3);
final List<Block> requestedData = generateDataToBeRequested(10);
final CompleteBlocksTask task = createTask(requestedData);
final CompletableFuture<List<Block>> future = task.run();
final List<MessageData> messageCollector = new ArrayList<>();
final List<MessageData> messageCollector = new ArrayList<>(4);
respondingPeer.respond(
peerCountToTimeout.set(4);
// after 3 timeouts a peer is disconnected, so we need another peer to reach 4 retries
respondingPeer.respondTimes(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector),
3);
final RespondingEthPeer respondingPeer2 =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
respondingPeer2.respond(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector));
assertThat(batchSize(messageCollector.get(0))).isEqualTo(10);
assertThat(batchSize(messageCollector.get(1))).isEqualTo(5);
assertThat(batchSize(messageCollector.get(2))).isEqualTo(4);
assertThat(batchSize(messageCollector.get(3))).isEqualTo(3);
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}
@ -110,21 +118,29 @@ public class CompleteBlocksTaskTest extends RetryingMessageTaskTest<List<Block>>
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
peerCountToTimeout.set(3);
final List<Block> requestedData = generateDataToBeRequested(1);
final EthTask<List<Block>> task = createTask(requestedData);
final CompletableFuture<List<Block>> future = task.run();
final List<MessageData> messageCollector = new ArrayList<>();
final List<MessageData> messageCollector = new ArrayList<>(4);
respondingPeer.respond(
peerCountToTimeout.set(4);
// after 3 timeouts a peer is disconnected, so we need another peer to reach 4 retries
respondingPeer.respondTimes(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector),
3);
final RespondingEthPeer respondingPeer2 =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
respondingPeer2.respond(
RespondingEthPeer.wrapResponderWithCollector(
RespondingEthPeer.emptyResponder(), messageCollector));
assertThat(batchSize(messageCollector.get(0))).isEqualTo(1);
assertThat(batchSize(messageCollector.get(1))).isEqualTo(1);
assertThat(batchSize(messageCollector.get(2))).isEqualTo(1);
assertThat(batchSize(messageCollector.get(3))).isEqualTo(1);
assertThat(future.isCompletedExceptionally()).isTrue();
assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class);
}

@ -42,6 +42,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.EthTaskException.FailureReason;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
@ -117,8 +118,7 @@ public class DetermineCommonAncestorTaskTest {
assertThat(failure.get()).isNotNull();
final Throwable error = ExceptionUtils.rootCause(failure.get());
assertThat(error).isInstanceOf(EthTaskException.class);
assertThat(((EthTaskException) error).reason())
.isEqualTo(EthTaskException.FailureReason.PEER_DISCONNECTED);
assertThat(((EthTaskException) error).reason()).isEqualTo(FailureReason.NO_AVAILABLE_PEERS);
}
@Test

Loading…
Cancel
Save