Fix p2p PeerInfo handling (#1428)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by GitHub
parent 5d04b32530
commit ff56a65af2
  1. 121
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java
  2. 25
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java

@ -135,56 +135,44 @@ public class DefaultP2PNetwork implements P2PNetwork {
private static final Logger LOG = LogManager.getLogger(); private static final Logger LOG = LogManager.getLogger();
private static final int TIMEOUT_SECONDS = 30; private static final int TIMEOUT_SECONDS = 30;
private ChannelFuture server;
private final EventLoopGroup boss = new NioEventLoopGroup(1);
private final EventLoopGroup workers = new NioEventLoopGroup(1);
private final ScheduledExecutorService peerConnectionScheduler = private final ScheduledExecutorService peerConnectionScheduler =
Executors.newSingleThreadScheduledExecutor(); Executors.newSingleThreadScheduledExecutor();
final Map<Capability, Subscribers<Consumer<Message>>> protocolCallbacks =
new ConcurrentHashMap<>();
private final Subscribers<Consumer<PeerConnection>> connectCallbacks = new Subscribers<>();
private final Subscribers<DisconnectCallback> disconnectCallbacks = new Subscribers<>();
private final Callbacks callbacks = new Callbacks(protocolCallbacks, disconnectCallbacks);
private final PeerDiscoveryAgent peerDiscoveryAgent; private final PeerDiscoveryAgent peerDiscoveryAgent;
private final PeerBlacklist peerBlacklist;
private final NetworkingConfiguration config; private final NetworkingConfiguration config;
private final List<Capability> supportedCapabilities; private final List<Capability> supportedCapabilities;
private OptionalLong peerBondedObserverId = OptionalLong.empty();
private OptionalLong peerDroppedObserverId = OptionalLong.empty();
@VisibleForTesting public final Collection<Peer> peerMaintainConnectionList;
@VisibleForTesting public final PeerConnectionRegistry connections;
@VisibleForTesting
public final Map<Peer, CompletableFuture<PeerConnection>> pendingConnections =
new ConcurrentHashMap<>();
private final EventLoopGroup boss = new NioEventLoopGroup(1);
private final EventLoopGroup workers = new NioEventLoopGroup(1);
private volatile PeerInfo ourPeerInfo;
private final SECP256K1.KeyPair keyPair;
private ChannelFuture server;
private final int maxPeers; private final int maxPeers;
private final List<SubProtocol> subProtocols; private final List<SubProtocol> subProtocols;
private final LabelledMetric<Counter> outboundMessagesCounter; private final SECP256K1.KeyPair keyPair;
private final BytesValue nodeId;
private final String advertisedHost; private volatile OptionalInt listeningPort = OptionalInt.empty();
private volatile Optional<EnodeURL> localEnode = Optional.empty(); private volatile Optional<EnodeURL> localEnode = Optional.empty();
private volatile Optional<PeerInfo> ourPeerInfo = Optional.empty();
private final PeerBlacklist peerBlacklist;
private final Optional<NodePermissioningController> nodePermissioningController; private final Optional<NodePermissioningController> nodePermissioningController;
private final Optional<Blockchain> blockchain; private final Optional<Blockchain> blockchain;
@VisibleForTesting final Collection<Peer> peerMaintainConnectionList;
@VisibleForTesting final PeerConnectionRegistry connections;
@VisibleForTesting
final Map<Peer, CompletableFuture<PeerConnection>> pendingConnections = new ConcurrentHashMap<>();
final Map<Capability, Subscribers<Consumer<Message>>> protocolCallbacks =
new ConcurrentHashMap<>();
private final Subscribers<Consumer<PeerConnection>> connectCallbacks = new Subscribers<>();
private final Subscribers<DisconnectCallback> disconnectCallbacks = new Subscribers<>();
private final Callbacks callbacks = new Callbacks(protocolCallbacks, disconnectCallbacks);
private final LabelledMetric<Counter> outboundMessagesCounter;
private OptionalLong blockAddedObserverId = OptionalLong.empty(); private OptionalLong blockAddedObserverId = OptionalLong.empty();
private OptionalLong peerBondedObserverId = OptionalLong.empty();
private OptionalLong peerDroppedObserverId = OptionalLong.empty();
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false);
@ -225,8 +213,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
this.peerMaintainConnectionList = new HashSet<>(); this.peerMaintainConnectionList = new HashSet<>();
this.connections = new PeerConnectionRegistry(metricsSystem); this.connections = new PeerConnectionRegistry(metricsSystem);
this.nodeId = this.keyPair.getPublicKey().getEncodedBytes();
this.subProtocols = config.getSupportedProtocols(); this.subProtocols = config.getSupportedProtocols();
this.advertisedHost = config.getDiscovery().getAdvertisedHost();
this.maxPeers = config.getRlpx().getMaxPeers(); this.maxPeers = config.getRlpx().getMaxPeers();
peerDiscoveryAgent.addPeerRequirement(() -> connections.size() >= maxPeers); peerDiscoveryAgent.addPeerRequirement(() -> connections.size() >= maxPeers);
@ -271,8 +259,12 @@ public class DefaultP2PNetwork implements P2PNetwork {
.sum(); .sum();
} }
/** Start listening for incoming connections */ /**
private void startListening() { * Start listening for incoming connections.
*
* @return The port on which we're listening for incoming connections.
*/
private int startListening() {
server = server =
new ServerBootstrap() new ServerBootstrap()
.group(boss, workers) .group(boss, workers)
@ -293,13 +285,15 @@ public class DefaultP2PNetwork implements P2PNetwork {
LOG.error(message, future.cause()); LOG.error(message, future.cause());
} }
checkState(socketAddress != null, message); checkState(socketAddress != null, message);
listeningPort = OptionalInt.of(socketAddress.getPort());
ourPeerInfo = ourPeerInfo =
Optional.of(
new PeerInfo( new PeerInfo(
5, 5,
config.getClientId(), config.getClientId(),
supportedCapabilities, supportedCapabilities,
socketAddress.getPort(), listeningPort.getAsInt(),
this.keyPair.getPublicKey().getEncodedBytes()); nodeId));
LOG.info("P2PNetwork started and listening on {}", socketAddress); LOG.info("P2PNetwork started and listening on {}", socketAddress);
latch.countDown(); latch.countDown();
}); });
@ -309,6 +303,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
if (!latch.await(1, TimeUnit.MINUTES)) { if (!latch.await(1, TimeUnit.MINUTES)) {
throw new RuntimeException("Timed out while waiting for network startup"); throw new RuntimeException("Timed out while waiting for network startup");
} }
return listeningPort.getAsInt();
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
throw new RuntimeException("Interrupted before startup completed", e); throw new RuntimeException("Interrupted before startup completed", e);
} }
@ -332,7 +327,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
new HandshakeHandlerInbound( new HandshakeHandlerInbound(
keyPair, keyPair,
subProtocols, subProtocols,
ourPeerInfo, ourPeerInfo.get(),
connectionFuture, connectionFuture,
callbacks, callbacks,
connections, connections,
@ -437,8 +432,14 @@ public class DefaultP2PNetwork implements P2PNetwork {
@Override @Override
public CompletableFuture<PeerConnection> connect(final Peer peer) { public CompletableFuture<PeerConnection> connect(final Peer peer) {
LOG.trace("Initiating connection to peer: {}", peer.getId());
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>(); final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();
if (!localEnode.isPresent()) {
connectionFuture.completeExceptionally(
new IllegalStateException("Attempt to connect to peer before network is ready"));
return connectionFuture;
}
LOG.trace("Initiating connection to peer: {}", peer.getId());
final EnodeURL enode = peer.getEnodeURL(); final EnodeURL enode = peer.getEnodeURL();
final CompletableFuture<PeerConnection> existingPendingConnection = final CompletableFuture<PeerConnection> existingPendingConnection =
pendingConnections.putIfAbsent(peer, connectionFuture); pendingConnections.putIfAbsent(peer, connectionFuture);
@ -471,7 +472,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
keyPair, keyPair,
peer, peer,
subProtocols, subProtocols,
ourPeerInfo, ourPeerInfo.get(),
connectionFuture, connectionFuture,
callbacks, callbacks,
connections, connections,
@ -528,8 +529,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
LOG.warn("Attempted to start an already started " + getClass().getSimpleName()); LOG.warn("Attempted to start an already started " + getClass().getSimpleName());
} }
startListening(); final int listeningPort = startListening();
peerDiscoveryAgent.start(ourPeerInfo.getPort()).join(); peerDiscoveryAgent.start(listeningPort).join();
peerBondedObserverId = peerBondedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent())); OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent()));
peerDroppedObserverId = peerDroppedObserverId =
@ -609,22 +610,22 @@ public class DefaultP2PNetwork implements P2PNetwork {
} }
private boolean isPeerAllowed(final EnodeURL enode) { private boolean isPeerAllowed(final EnodeURL enode) {
final Optional<EnodeURL> maybeEnode = getLocalEnode();
if (!maybeEnode.isPresent()) {
// If the network isn't ready yet, deny connections
return false;
}
final EnodeURL localEnode = maybeEnode.get();
if (peerBlacklist.contains(enode.getNodeId())) { if (peerBlacklist.contains(enode.getNodeId())) {
return false; return false;
} }
if (enode.getNodeId().equals(ourPeerInfo.getNodeId())) { if (enode.getNodeId().equals(nodeId)) {
// Peer matches our node id // Peer matches our node id
return false; return false;
} }
Optional<EnodeURL> maybeEnode = getLocalEnode(); return nodePermissioningController.map(c -> c.isPermitted(localEnode, enode)).orElse(true);
if (!maybeEnode.isPresent()) {
// If local enode isn't yet available we can't evaluate permissions
return false;
}
return nodePermissioningController
.map(c -> c.isPermitted(maybeEnode.get(), enode))
.orElse(true);
} }
@VisibleForTesting @VisibleForTesting
@ -701,12 +702,10 @@ public class DefaultP2PNetwork implements P2PNetwork {
} }
private void createLocalEnode() { private void createLocalEnode() {
if (localEnode.isPresent()) { if (localEnode.isPresent() || !listeningPort.isPresent()) {
return; return;
} }
final BytesValue nodeId = ourPeerInfo.getNodeId();
final int listeningPort = ourPeerInfo.getPort();
final OptionalInt discoveryPort = final OptionalInt discoveryPort =
peerDiscoveryAgent peerDiscoveryAgent
.getAdvertisedPeer() .getAdvertisedPeer()
@ -718,8 +717,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
final EnodeURL localEnode = final EnodeURL localEnode =
EnodeURL.builder() EnodeURL.builder()
.nodeId(nodeId) .nodeId(nodeId)
.ipAddress(advertisedHost) .ipAddress(config.getDiscovery().getAdvertisedHost())
.listeningPort(listeningPort) .listeningPort(listeningPort.getAsInt())
.discoveryPort(discoveryPort) .discoveryPort(discoveryPort)
.build(); .build();

@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.p2p.network; package tech.pegasys.pantheon.ethereum.p2p.network;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Java6Assertions.assertThatThrownBy;
import static org.assertj.core.api.Java6Assertions.catchThrowable; import static org.assertj.core.api.Java6Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.argThat;
@ -112,6 +113,17 @@ public final class DefaultP2PNetworkTest {
verify(network, times(1)).connect(peer); verify(network, times(1)).connect(peer);
} }
@Test
public void addMaintainConnectionPeer_beforeStartingNetwork() {
final DefaultP2PNetwork network = mockNetwork();
final Peer peer = mockPeer();
assertThat(network.addMaintainConnectionPeer(peer)).isTrue();
assertThat(network.peerMaintainConnectionList).contains(peer);
verify(network, never()).connect(peer);
}
@Test @Test
public void addingRepeatMaintainedPeersReturnsFalse() { public void addingRepeatMaintainedPeersReturnsFalse() {
final P2PNetwork network = network(); final P2PNetwork network = network();
@ -194,6 +206,7 @@ public final class DefaultP2PNetworkTest {
@Test @Test
public void shouldntAttemptNewConnectionToPendingPeer() { public void shouldntAttemptNewConnectionToPendingPeer() {
final P2PNetwork network = network(); final P2PNetwork network = network();
network.start();
final Peer peer = mockPeer(); final Peer peer = mockPeer();
final CompletableFuture<PeerConnection> connectingFuture = network.connect(peer); final CompletableFuture<PeerConnection> connectingFuture = network.connect(peer);
@ -477,6 +490,18 @@ public final class DefaultP2PNetworkTest {
verify(network, times(0)).connect(any()); verify(network, times(0)).connect(any());
} }
@Test
public void connect_beforeStartingNetwork() {
final DefaultP2PNetwork network = network();
final Peer peer = mockPeer();
final CompletableFuture<PeerConnection> connectionResult = network.connect(peer);
assertThat(connectionResult).isCompletedExceptionally();
assertThatThrownBy(connectionResult::get)
.hasCauseInstanceOf(IllegalStateException.class)
.hasMessageContaining("Attempt to connect to peer before network is ready");
}
private DiscoveryPeer createDiscoveryPeer() { private DiscoveryPeer createDiscoveryPeer() {
return createDiscoveryPeer(Peer.randomId(), 999); return createDiscoveryPeer(Peer.randomId(), 999);
} }

Loading…
Cancel
Save