[PAN-3155] Handle discovery peers with updated endpoints (#12)

Update discovery logic to consider a peer with an unknown discovery endpoint to be unknown regardless of whether we've encountered a peer with the same node id before. This makes the discovery logic more forgiving in the face of node restarts.

If nodeA bonds with nodeB, then nodeB leaves the network and later comes back with a different ip address or listening port, nodeA would previously continue trying to communicate with nodeB at its original address. With these changes, nodeA will now treat the restarted nodeB as a new peer and communicate with it on its updated endpoint. Additionally, nodeB's information will be updated in the peer table so that neighbors requests return updated information on this node.

Signed-off-by: Danno Ferrin <danno.ferrin@gmail.com>
pull/20/head
mbaxter 5 years ago committed by Danno Ferrin
parent e47edb584a
commit 1d74e60cf2
  1. 2
      acceptance-tests/dsl/build.gradle
  2. 5
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/DiscoveryPeer.java
  3. 2
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/Bucket.java
  4. 3
      ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
  5. 100
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java
  6. 34
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java
  7. 28
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java
  8. 67
      ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/PeerTableTest.java

@ -42,6 +42,4 @@ dependencies {
implementation 'tech.pegasys.ethsigner.internal:core'
implementation 'tech.pegasys.ethsigner.internal:file-based'
implementation 'tech.pegasys.ethsigner.internal:signing-api'
}

@ -109,6 +109,11 @@ public class DiscoveryPeer extends DefaultPeer {
return endpoint;
}
public boolean discoveryEndpointMatches(final DiscoveryPeer peer) {
return peer.getEndpoint().getHost().equals(endpoint.getHost())
&& peer.getEndpoint().getUdpPort() == endpoint.getUdpPort();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DiscoveryPeer{");

@ -91,7 +91,7 @@ public class Bucket {
// Avoid duplicating the peer if it already exists in the bucket.
for (int i = 0; i <= tailIndex; i++) {
if (peer.equals(kBucket[i])) {
if (peer.getId().equals(kBucket[i].getId())) {
throw new IllegalArgumentException(
String.format("Tried to add duplicate peer to k-bucket: %s", peer.getId()));
}

@ -293,7 +293,8 @@ public class PeerDiscoveryController {
}
// Load the peer from the table, or use the instance that comes in.
final Optional<DiscoveryPeer> maybeKnownPeer = peerTable.get(sender);
final Optional<DiscoveryPeer> maybeKnownPeer =
peerTable.get(sender).filter(known -> known.discoveryEndpointMatches(sender));
final DiscoveryPeer peer = maybeKnownPeer.orElse(sender);
final boolean peerKnown = maybeKnownPeer.isPresent();

@ -20,6 +20,8 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.crypto.SECP256K1;
import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper.AgentBuilder;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.FindNeighborsPacketData;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent;
@ -308,6 +310,104 @@ public class PeerDiscoveryAgentTest {
assertThat(remoteIncomingPackets).isEmpty();
}
/**
* These tests simulates the case where a node crashes then comes back up with a new ip address or
* listening port.
*/
@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedPort() {
simulatePeerRestartingOnDifferentEndpoint(false, true);
}
@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHost() {
simulatePeerRestartingOnDifferentEndpoint(true, false);
}
@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHostAndPort() {
simulatePeerRestartingOnDifferentEndpoint(true, true);
}
public void simulatePeerRestartingOnDifferentEndpoint(
final boolean updateHost, final boolean updatePort) {
// Setup peer
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent();
final DiscoveryPeer agentPeer = agent.getAdvertisedPeer().get();
final KeyPair remoteKeyPair = SECP256K1.KeyPair.generate();
final String remoteIp = "1.2.3.4";
final MockPeerDiscoveryAgent remoteAgent =
helper.createDiscoveryAgent(
helper
.agentBuilder()
.keyPair(remoteKeyPair)
.advertisedHost(remoteIp)
.bootstrapPeers(agentPeer));
agent.start(999);
remoteAgent.start(888);
final DiscoveryPeer remotePeer = remoteAgent.getAdvertisedPeer().get();
// Remote agent should have bonded with agent
assertThat(agent.streamDiscoveredPeers()).hasSize(1);
assertThat(agent.streamDiscoveredPeers()).contains(remoteAgent.getAdvertisedPeer().get());
// Create a new remote agent with same id, and new endpoint
remoteAgent.stop();
final int newPort = updatePort ? 0 : remotePeer.getEndpoint().getUdpPort();
final String newIp = updateHost ? "1.2.3.5" : remoteIp;
final MockPeerDiscoveryAgent updatedRemoteAgent =
helper.createDiscoveryAgent(
helper
.agentBuilder()
.keyPair(remoteKeyPair)
.advertisedHost(newIp)
.bindPort(newPort)
.bootstrapPeers(agentPeer));
updatedRemoteAgent.start(889);
final DiscoveryPeer updatedRemotePeer = updatedRemoteAgent.getAdvertisedPeer().get();
// Sanity check
assertThat(
updatedRemotePeer.getEndpoint().getUdpPort() == remotePeer.getEndpoint().getUdpPort())
.isEqualTo(!updatePort);
assertThat(updatedRemotePeer.getEndpoint().getHost().equals(remotePeer.getEndpoint().getHost()))
.isEqualTo(!updateHost);
assertThat(updatedRemotePeer.getId()).isEqualTo(remotePeer.getId());
// Check that our restarted agent receives a PONG response
final List<IncomingPacket> incomingPackets = updatedRemoteAgent.getIncomingPackets();
assertThat(incomingPackets).hasSizeGreaterThan(0);
final long pongCount =
incomingPackets.stream()
.filter(packet -> packet.fromAgent.equals(agent))
.filter(packet -> packet.packet.getType().equals(PacketType.PONG))
.count();
assertThat(pongCount).isGreaterThan(0);
// Check that agent has an endpoint matching the restarted node
final List<DiscoveryPeer> matchingPeers =
agent
.streamDiscoveredPeers()
.filter(peer -> peer.getId().equals(updatedRemotePeer.getId()))
.collect(toList());
// We should have only one peer matching this id
assertThat(matchingPeers.size()).isEqualTo(1);
final DiscoveryPeer discoveredPeer = matchingPeers.get(0);
assertThat(discoveredPeer.getEndpoint().getUdpPort())
.isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort());
assertThat(discoveredPeer.getEndpoint().getHost())
.isEqualTo(updatedRemotePeer.getEndpoint().getHost());
// Check endpoint is consistent with enodeURL
assertThat(discoveredPeer.getEnodeURL().getDiscoveryPortOrZero())
.isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort());
assertThat(discoveredPeer.getEnodeURL().getListeningPortOrZero())
.isEqualTo(updatedRemotePeer.getEndpoint().getFunctionalTcpPort());
assertThat(discoveredPeer.getEnodeURL().getIpAsString())
.isEqualTo(updatedRemotePeer.getEndpoint().getHost());
}
@Test
public void neighbors_allowOutgoingRequest() {
// Setup peer

@ -12,6 +12,7 @@
*/
package org.hyperledger.besu.ethereum.p2p.discovery;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;
import org.hyperledger.besu.crypto.SECP256K1;
@ -31,6 +32,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -179,6 +181,9 @@ public class PeerDiscoveryTestHelper {
private List<EnodeURL> bootnodes = Collections.emptyList();
private boolean active = true;
private PeerPermissions peerPermissions = PeerPermissions.noop();
private String advertisedHost = "127.0.0.1";
private OptionalInt bindPort = OptionalInt.empty();
private KeyPair keyPair = SECP256K1.KeyPair.generate();
private AgentBuilder(
final Map<BytesValue, MockPeerDiscoveryAgent> agents,
@ -215,14 +220,37 @@ public class PeerDiscoveryTestHelper {
return this;
}
public AgentBuilder advertisedHost(final String host) {
checkNotNull(host);
this.advertisedHost = host;
return this;
}
public AgentBuilder bindPort(final int bindPort) {
if (bindPort == 0) {
// Zero means pick the next available port
this.bindPort = OptionalInt.empty();
return this;
}
this.bindPort = OptionalInt.of(bindPort);
return this;
}
public AgentBuilder keyPair(final KeyPair keyPair) {
checkNotNull(keyPair);
this.keyPair = keyPair;
return this;
}
public MockPeerDiscoveryAgent build() {
final int port = bindPort.orElseGet(nextAvailablePort::incrementAndGet);
final DiscoveryConfiguration config = new DiscoveryConfiguration();
config.setBootnodes(bootnodes);
config.setBindPort(nextAvailablePort.incrementAndGet());
config.setAdvertisedHost(advertisedHost);
config.setBindPort(port);
config.setActive(active);
return new MockPeerDiscoveryAgent(
SECP256K1.KeyPair.generate(), config, peerPermissions, agents);
return new MockPeerDiscoveryAgent(keyPair, config, peerPermissions, agents);
}
}
}

@ -34,6 +34,7 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent {
// The set of known agents operating on the network
private final Map<BytesValue, MockPeerDiscoveryAgent> agentNetwork;
private final Deque<IncomingPacket> incomingPackets = new ArrayDeque<>();
private boolean isRunning = false;
public MockPeerDiscoveryAgent(
final KeyPair keyPair,
@ -65,9 +66,9 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent {
@Override
protected CompletableFuture<InetSocketAddress> listenForConnections() {
isRunning = true;
// Skip network setup for tests
InetSocketAddress address =
new InetSocketAddress(config.getAdvertisedHost(), config.getBindPort());
InetSocketAddress address = new InetSocketAddress(config.getBindHost(), config.getBindPort());
return CompletableFuture.completedFuture(address);
}
@ -75,15 +76,35 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent {
protected CompletableFuture<Void> sendOutgoingPacket(
final DiscoveryPeer toPeer, final Packet packet) {
CompletableFuture<Void> result = new CompletableFuture<>();
if (!this.isRunning) {
result.completeExceptionally(new Exception("Attempt to send message from an inactive agent"));
}
MockPeerDiscoveryAgent toAgent = agentNetwork.get(toPeer.getId());
if (toAgent == null) {
result.completeExceptionally(
new Exception(
"Attempt to send to unknown peer. Agents must be constructed through PeerDiscoveryTestHelper."));
return result;
}
final DiscoveryPeer agentPeer = toAgent.getAdvertisedPeer().get();
if (!toPeer.getEndpoint().getHost().equals(agentPeer.getEndpoint().getHost())) {
LOG.warn(
"Attempt to send packet to discovery peer using the wrong host address. Sending to {}, but discovery peer is listening on {}",
toPeer.getEndpoint().getHost(),
agentPeer.getEndpoint().getHost());
} else if (toPeer.getEndpoint().getUdpPort() != agentPeer.getEndpoint().getUdpPort()) {
LOG.warn(
"Attempt to send packet to discovery peer using the wrong udp port. Sending to {}, but discovery peer is listening on {}",
toPeer.getEndpoint().getUdpPort(),
agentPeer.getEndpoint().getUdpPort());
} else if (!toAgent.isRunning) {
LOG.warn("Attempt to send packet to an inactive peer.");
} else {
toAgent.processIncomingPacket(this, packet);
result.complete(null);
}
result.complete(null);
return result;
}
@ -99,6 +120,7 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent {
@Override
public CompletableFuture<?> stop() {
isRunning = false;
return CompletableFuture.completedFuture(null);
}

@ -14,15 +14,19 @@ package org.hyperledger.besu.ethereum.p2p.discovery.internal;
import static org.assertj.core.api.Assertions.assertThat;
import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.Endpoint;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.AddResult.AddOutcome;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.EvictResult;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.EvictResult.EvictOutcome;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.util.bytes.BytesValue;
import java.util.List;
import java.util.OptionalInt;
import org.junit.Test;
@ -73,6 +77,69 @@ public class PeerTableTest {
});
}
@Test
public void peerExists_withDifferentIp() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final BytesValue peerId = KeyPair.generate().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30303, OptionalInt.empty()));
assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);
final DiscoveryPeer duplicatePeer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.2", 30303, OptionalInt.empty()));
assertThat(table.tryAdd(duplicatePeer))
.satisfies(
result -> {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ALREADY_EXISTED);
assertThat(result.getEvictionCandidate()).isNull();
});
}
@Test
public void peerExists_withDifferentUdpPort() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final BytesValue peerId = KeyPair.generate().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30303, OptionalInt.empty()));
assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);
final DiscoveryPeer duplicatePeer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30301, OptionalInt.empty()));
assertThat(table.tryAdd(duplicatePeer))
.satisfies(
result -> {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ALREADY_EXISTED);
assertThat(result.getEvictionCandidate()).isNull();
});
}
@Test
public void peerExists_withDifferentIdAndUdpPort() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final BytesValue peerId = KeyPair.generate().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30303, OptionalInt.empty()));
assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);
final DiscoveryPeer duplicatePeer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.2", 30301, OptionalInt.empty()));
assertThat(table.tryAdd(duplicatePeer))
.satisfies(
result -> {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ALREADY_EXISTED);
assertThat(result.getEvictionCandidate()).isNull();
});
}
@Test
public void evictExistingPeerShouldEvict() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);

Loading…
Cancel
Save