Limit the number of times we retry peer discovery interactions. (#908)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Adrian Sutton 6 years ago committed by GitHub
parent 7bcaff60cc
commit 5eb2ee01db
  1. 22
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  2. 3
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockTimerUtil.java
  3. 39
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java

@ -408,7 +408,7 @@ public class PeerDiscoveryController {
// The filter condition will be updated as soon as the action is performed. // The filter condition will be updated as soon as the action is performed.
final PeerInteractionState ping = final PeerInteractionState ping =
new PeerInteractionState(action, PacketType.PONG, (packet) -> false, true); new PeerInteractionState(action, peer.getId(), PacketType.PONG, (packet) -> false, true);
dispatchInteraction(peer, ping); dispatchInteraction(peer, ping);
} }
@ -441,7 +441,7 @@ public class PeerDiscoveryController {
sendPacket(peer, PacketType.FIND_NEIGHBORS, data); sendPacket(peer, PacketType.FIND_NEIGHBORS, data);
}; };
final PeerInteractionState interaction = final PeerInteractionState interaction =
new PeerInteractionState(action, PacketType.NEIGHBORS, packet -> true, true); new PeerInteractionState(action, peer.getId(), PacketType.NEIGHBORS, packet -> true, true);
dispatchInteraction(peer, interaction); dispatchInteraction(peer, interaction);
} }
@ -459,7 +459,7 @@ public class PeerDiscoveryController {
if (previous != null) { if (previous != null) {
previous.cancelTimers(); previous.cancelTimers();
} }
state.execute(0); state.execute(0, 0);
} }
private void respondToPing( private void respondToPing(
@ -499,11 +499,15 @@ public class PeerDiscoveryController {
/** Holds the state machine data for a peer interaction. */ /** Holds the state machine data for a peer interaction. */
private class PeerInteractionState implements Predicate<Packet> { private class PeerInteractionState implements Predicate<Packet> {
private static final int MAX_RETRIES = 5;
/** /**
* The action that led to the peer being in this state (e.g. sending a PING or NEIGHBORS * The action that led to the peer being in this state (e.g. sending a PING or NEIGHBORS
* message), in case it needs to be retried. * message), in case it needs to be retried.
*/ */
private final Consumer<PeerInteractionState> action; private final Consumer<PeerInteractionState> action;
private final BytesValue peerId;
/** The expected type of the message that will transition the peer out of this state. */ /** The expected type of the message that will transition the peer out of this state. */
private final PacketType expectedType; private final PacketType expectedType;
/** A custom filter to accept transitions out of this state. */ /** A custom filter to accept transitions out of this state. */
@ -515,10 +519,12 @@ public class PeerDiscoveryController {
PeerInteractionState( PeerInteractionState(
final Consumer<PeerInteractionState> action, final Consumer<PeerInteractionState> action,
final BytesValue peerId,
final PacketType expectedType, final PacketType expectedType,
final Predicate<Packet> filter, final Predicate<Packet> filter,
final boolean retryable) { final boolean retryable) {
this.action = action; this.action = action;
this.peerId = peerId;
this.expectedType = expectedType; this.expectedType = expectedType;
this.filter = filter; this.filter = filter;
this.retryable = retryable; this.retryable = retryable;
@ -540,11 +546,15 @@ public class PeerDiscoveryController {
* @param lastTimeout the previous timeout, or 0 if this is the first time the action is being * @param lastTimeout the previous timeout, or 0 if this is the first time the action is being
* executed. * executed.
*/ */
void execute(final long lastTimeout) { void execute(final long lastTimeout, final int retryCount) {
action.accept(this); action.accept(this);
if (retryable) { if (retryable && retryCount < MAX_RETRIES) {
final long newTimeout = retryDelayFunction.apply(lastTimeout); final long newTimeout = retryDelayFunction.apply(lastTimeout);
timerId = OptionalLong.of(timerUtil.setTimer(newTimeout, () -> execute(newTimeout))); timerId =
OptionalLong.of(
timerUtil.setTimer(newTimeout, () -> execute(newTimeout, retryCount + 1)));
} else {
inflightInteractions.remove(peerId);
} }
} }

@ -50,8 +50,7 @@ public class MockTimerUtil implements TimerUtil {
public void runTimerHandlers() { public void runTimerHandlers() {
// Create a copy of the handlers to avoid concurrent modification as handlers run // Create a copy of the handlers to avoid concurrent modification as handlers run
List<TimerHandler> handlers = new ArrayList<>(); final List<TimerHandler> handlers = new ArrayList<>(timerHandlers.values());
timerHandlers.forEach((id, handler) -> handlers.add(handler));
timerHandlers.clear(); timerHandlers.clear();
handlers.forEach(TimerHandler::handle); handlers.forEach(TimerHandler::handle);

@ -192,19 +192,54 @@ public class PeerDiscoveryControllerTest {
controller.onMessage(packet, peers.get(0)); controller.onMessage(packet, peers.get(0));
// Invoke timers again // Invoke timers again
for (int i = 0; i < 4; i++) { for (int i = 0; i < 2; i++) {
timer.runTimerHandlers(); timer.runTimerHandlers();
} }
// Ensure we receive no more PING packets for peer[0]. // Ensure we receive no more PING packets for peer[0].
// Assert PING packet was sent for peer[0] 4 times. // Assert PING packet was sent for peer[0] 4 times.
for (final DiscoveryPeer peer : peers) { for (final DiscoveryPeer peer : peers) {
final int expectedCount = peer.equals(peers.get(0)) ? 4 : 8; final int expectedCount = peer.equals(peers.get(0)) ? 4 : 6;
verify(outboundMessageHandler, times(expectedCount)) verify(outboundMessageHandler, times(expectedCount))
.send(eq(peer), matchPacketOfType(PacketType.PING)); .send(eq(peer), matchPacketOfType(PacketType.PING));
} }
} }
@Test
public void shouldStopRetryingInteractionWhenLimitIsReached() {
// Create peers.
final List<SECP256K1.KeyPair> keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(3);
final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(keyPairs);
final MockTimerUtil timer = new MockTimerUtil();
final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class);
controller =
getControllerBuilder()
.peers(peers)
.timerUtil(timer)
.outboundMessageHandler(outboundMessageHandler)
.build();
// Mock the creation of the PING packet, so that we can control the hash,
// which gets validated when receiving the PONG.
final PingPacketData mockPing =
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0));
doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any());
controller.start();
// Invoke timers several times so that ping to peers should be resent
for (int i = 0; i < 10; i++) {
timer.runTimerHandlers();
}
// Assert PING packet was sent only 6 times (initial attempt plus 5 retries)
for (final DiscoveryPeer peer : peers) {
verify(outboundMessageHandler, times(6)).send(eq(peer), matchPacketOfType(PacketType.PING));
}
}
@Test @Test
public void bootstrapPeersPongReceived_HashMatched() { public void bootstrapPeersPongReceived_HashMatched() {
// Create peers. // Create peers.

Loading…
Cancel
Save