diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java index 6f129ca2c6..a42256019c 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java @@ -12,7 +12,6 @@ */ package tech.pegasys.pantheon.ethereum.p2p.network; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -35,6 +34,7 @@ import tech.pegasys.pantheon.ethereum.p2p.network.netty.HandshakeHandlerOutbound import tech.pegasys.pantheon.ethereum.p2p.network.netty.PeerConnectionRegistry; import tech.pegasys.pantheon.ethereum.p2p.network.netty.TimeoutHandler; import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer; +import tech.pegasys.pantheon.ethereum.p2p.peers.MaintainedPeers; import tech.pegasys.pantheon.ethereum.p2p.peers.Peer; import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissions; import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissionsBlacklist; @@ -56,7 +56,6 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -158,7 +157,7 @@ public class DefaultP2PNetwork implements P2PNetwork { private final PeerPermissions peerPermissions; private volatile Optional rlpxPermissions = Optional.empty(); - @VisibleForTesting final Collection peerMaintainConnectionList; + private final MaintainedPeers maintainedPeers; @VisibleForTesting final PeerConnectionRegistry connections; @VisibleForTesting @@ -196,13 +195,14 @@ public class DefaultP2PNetwork implements P2PNetwork { final NetworkingConfiguration config, final List supportedCapabilities, final PeerPermissions peerPermissions, + final MaintainedPeers maintainedPeers, final MetricsSystem metricsSystem) { this.peerDiscoveryAgent = peerDiscoveryAgent; this.keyPair = keyPair; this.config = config; this.supportedCapabilities = supportedCapabilities; - this.peerMaintainConnectionList = new HashSet<>(); + this.maintainedPeers = maintainedPeers; this.connections = new PeerConnectionRegistry(metricsSystem); this.nodeId = this.keyPair.getPublicKey().getEncodedBytes(); @@ -358,46 +358,19 @@ public class DefaultP2PNetwork implements P2PNetwork { @Override public boolean addMaintainConnectionPeer(final Peer peer) { - checkArgument( - peer.getEnodeURL().isListening(), - "Invalid enode url. Enode url must contain a non-zero listening port."); - final boolean added = peerMaintainConnectionList.add(peer); - final boolean allowConnection = - rlpxPermissions.isPresent() && rlpxPermissions.get().allowNewOutboundConnectionTo(peer); - if (allowConnection && !isConnectingOrConnected(peer)) { - // Connect immediately if appropriate - connect(peer); - } - - return added; + return maintainedPeers.add(peer); } @Override public boolean removeMaintainedConnectionPeer(final Peer peer) { - final boolean removed = peerMaintainConnectionList.remove(peer); - - final CompletableFuture connectionFuture = pendingConnections.get(peer); - if (connectionFuture != null) { - connectionFuture.thenAccept(connection -> connection.disconnect(DisconnectReason.REQUESTED)); - } - - final Optional peerConnection = connections.getConnectionForPeer(peer.getId()); - peerConnection.ifPresent(pc -> pc.disconnect(DisconnectReason.REQUESTED)); - - peerDiscoveryAgent.dropPeer(peer); - - return removed; + return maintainedPeers.remove(peer); } void checkMaintainedConnectionPeers() { if (!rlpxPermissions.isPresent()) { return; } - final PeerRlpxPermissions permissions = rlpxPermissions.get(); - peerMaintainConnectionList.stream() - .filter(p -> !isConnectingOrConnected(p)) - .filter(permissions::allowNewOutboundConnectionTo) - .forEach(this::connect); + maintainedPeers.streamPeers().forEach(this::connect); } @VisibleForTesting @@ -459,20 +432,35 @@ public class DefaultP2PNetwork implements P2PNetwork { return connectionFuture; } - LOG.trace("Initiating connection to peer: {}", peer.getId()); + // Check for existing connection + final Optional existingConnection = + connections.getConnectionForPeer(peer.getId()); + if (existingConnection.isPresent()) { + connectionFuture.complete(existingConnection.get()); + return connectionFuture; + } + // Check for existing pending connection final CompletableFuture existingPendingConnection = pendingConnections.putIfAbsent(peer, connectionFuture); if (existingPendingConnection != null) { - LOG.debug("Attempted to connect to peer with pending connection: {}", peer.getId()); return existingPendingConnection; } + + initiateOutboundConnection(peer, connectionFuture); + return connectionFuture; + } + + @VisibleForTesting + void initiateOutboundConnection( + final Peer peer, final CompletableFuture connectionFuture) { + LOG.trace("Initiating connection to peer: {}", peer.getId()); final EnodeURL enode = peer.getEnodeURL(); if (!enode.isListening()) { final String errorMsg = "Attempt to connect to peer with no listening port: " + enode.toString(); LOG.warn(errorMsg); - connectionFuture.completeExceptionally(new IllegalStateException(errorMsg)); - return connectionFuture; + connectionFuture.completeExceptionally(new IllegalArgumentException(errorMsg)); + return; } if (peer instanceof DiscoveryPeer) { @@ -528,7 +516,6 @@ public class DefaultP2PNetwork implements P2PNetwork { } logConnections(); }); - return connectionFuture; } private void logConnections() { @@ -567,12 +554,32 @@ public class DefaultP2PNetwork implements P2PNetwork { final Peer ourNode = createLocalNode(); this.rlpxPermissions = Optional.of(new PeerRlpxPermissions(ourNode, peerPermissions)); + this.maintainedPeers.subscribeAdd(this::handleMaintainedPeerAdded); + this.maintainedPeers.subscribeRemove(this::handleMaintainedPeerRemoved); + peerConnectionScheduler.scheduleWithFixedDelay( this::checkMaintainedConnectionPeers, 2, 60, TimeUnit.SECONDS); peerConnectionScheduler.scheduleWithFixedDelay( this::attemptPeerConnections, 30, 30, TimeUnit.SECONDS); } + private void handleMaintainedPeerRemoved(final Peer peer, final boolean wasRemoved) { + // Drop peer from peer table + peerDiscoveryAgent.dropPeer(peer); + + // Disconnect if connected or connecting + final CompletableFuture connectionFuture = pendingConnections.get(peer); + if (connectionFuture != null) { + connectionFuture.thenAccept(connection -> connection.disconnect(DisconnectReason.REQUESTED)); + } + final Optional peerConnection = connections.getConnectionForPeer(peer.getId()); + peerConnection.ifPresent(pc -> pc.disconnect(DisconnectReason.REQUESTED)); + } + + private void handleMaintainedPeerAdded(final Peer peer, final boolean wasAdded) { + this.connect(peer); + } + @VisibleForTesting Consumer handlePeerBondedEvent() { return event -> { @@ -626,10 +633,6 @@ public class DefaultP2PNetwork implements P2PNetwork { return connections.isAlreadyConnected(peer.getId()); } - private boolean isConnectingOrConnected(final Peer peer) { - return isConnected(peer) || isConnecting(peer); - } - @Override public void stop() { if (!this.started.get() || !stopped.compareAndSet(false, true)) { @@ -728,6 +731,7 @@ public class DefaultP2PNetwork implements P2PNetwork { private Optional nodePermissioningController = Optional.empty(); private Blockchain blockchain = null; private Vertx vertx; + private MaintainedPeers maintainedPeers = new MaintainedPeers(); public P2PNetwork build() { validate(); @@ -750,6 +754,7 @@ public class DefaultP2PNetwork implements P2PNetwork { config, supportedCapabilities, peerPermissions, + maintainedPeers, metricsSystem); } @@ -829,5 +834,11 @@ public class DefaultP2PNetwork implements P2PNetwork { this.blockchain = blockchain; return this; } + + public Builder maintainedPeers(final MaintainedPeers maintainedPeers) { + checkNotNull(maintainedPeers); + this.maintainedPeers = maintainedPeers; + return this; + } } } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/peers/MaintainedPeers.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/peers/MaintainedPeers.java new file mode 100644 index 0000000000..50f0cb1f8e --- /dev/null +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/peers/MaintainedPeers.java @@ -0,0 +1,74 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.p2p.peers; + +import static com.google.common.base.Preconditions.checkArgument; + +import tech.pegasys.pantheon.util.Subscribers; + +import java.util.Set; +import java.util.stream.Stream; + +import io.vertx.core.impl.ConcurrentHashSet; + +/** Represents a set of peers for which connections should be actively maintained. */ +public class MaintainedPeers { + private final Set maintainedPeers = new ConcurrentHashSet<>(); + private final Subscribers addedSubscribers = new Subscribers<>(); + private final Subscribers removedCallbackSubscribers = new Subscribers<>(); + + public boolean add(final Peer peer) { + checkArgument( + peer.getEnodeURL().isListening(), + "Invalid enode url. Enode url must contain a non-zero listening port."); + boolean wasAdded = maintainedPeers.add(peer); + addedSubscribers.forEach(s -> s.onPeerAdded(peer, wasAdded)); + return wasAdded; + } + + public boolean remove(final Peer peer) { + boolean wasRemoved = maintainedPeers.remove(peer); + removedCallbackSubscribers.forEach(s -> s.onPeerRemoved(peer, wasRemoved)); + return wasRemoved; + } + + public boolean contains(final Peer peer) { + return maintainedPeers.contains(peer); + } + + public int size() { + return maintainedPeers.size(); + } + + public void subscribeAdd(final PeerAddedCallback callback) { + addedSubscribers.subscribe(callback); + } + + public void subscribeRemove(final PeerRemovedCallback callback) { + removedCallbackSubscribers.subscribe(callback); + } + + public Stream streamPeers() { + return maintainedPeers.stream(); + } + + @FunctionalInterface + public interface PeerAddedCallback { + void onPeerAdded(Peer peer, boolean wasAdded); + } + + @FunctionalInterface + public interface PeerRemovedCallback { + void onPeerRemoved(Peer peer, boolean wasRemoved); + } +} diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java index 64131e0a9b..a3d44a39ff 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java @@ -44,6 +44,7 @@ import tech.pegasys.pantheon.ethereum.p2p.discovery.Endpoint; import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerBondedEvent; import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryStatus; import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer; +import tech.pegasys.pantheon.ethereum.p2p.peers.MaintainedPeers; import tech.pegasys.pantheon.ethereum.p2p.peers.Peer; import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissions; import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissions.Action; @@ -109,19 +110,23 @@ public final class DefaultP2PNetworkTest { @Test public void addingMaintainedConnectionPeer_startsConnection() { - final DefaultP2PNetwork network = mockNetwork(); + final MaintainedPeers maintainedPeers = new MaintainedPeers(); + final DefaultP2PNetwork network = + spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build()); network.start(); final Peer peer = mockPeer(); assertThat(network.addMaintainConnectionPeer(peer)).isTrue(); - assertThat(network.peerMaintainConnectionList).contains(peer); - verify(network, times(1)).connect(peer); + assertThat(maintainedPeers.contains(peer)).isTrue(); + verify(network, times(1)).initiateOutboundConnection(eq(peer), any()); } @Test public void addingMaintainedConnectionPeer_forDisallowedPeer() { - final DefaultP2PNetwork network = mockNetwork(); + final MaintainedPeers maintainedPeers = new MaintainedPeers(); + final DefaultP2PNetwork network = + spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build()); network.start(); final Peer localNode = DefaultPeer.fromEnodeURL(network.getLocalEnode().get()); @@ -133,18 +138,20 @@ public final class DefaultP2PNetworkTest { assertThat(network.addMaintainConnectionPeer(peer)).isTrue(); // Add peer but do not connect - assertThat(network.peerMaintainConnectionList).contains(peer); - verify(network, times(0)).connect(peer); + assertThat(maintainedPeers.contains(peer)).isTrue(); + verify(network, times(0)).initiateOutboundConnection(eq(peer), any()); } @Test public void addMaintainConnectionPeer_beforeStartingNetwork() { - final DefaultP2PNetwork network = mockNetwork(); + final MaintainedPeers maintainedPeers = new MaintainedPeers(); + final DefaultP2PNetwork network = + spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build()); final Peer peer = mockPeer(); assertThat(network.addMaintainConnectionPeer(peer)).isTrue(); - assertThat(network.peerMaintainConnectionList).contains(peer); + assertThat(maintainedPeers.contains(peer)).isTrue(); verify(network, never()).connect(peer); } @@ -179,32 +186,38 @@ public final class DefaultP2PNetworkTest { @Test public void checkMaintainedConnectionPeersTriesToConnect() { - final DefaultP2PNetwork network = mockNetwork(); + final MaintainedPeers maintainedPeers = new MaintainedPeers(); + final DefaultP2PNetwork network = + spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build()); + final Peer peer = mockPeer(); + maintainedPeers.add(peer); + network.start(); - final Peer peer = mockPeer(); - network.peerMaintainConnectionList.add(peer); + verify(network, times(0)).connect(peer); network.checkMaintainedConnectionPeers(); - verify(network, times(1)).connect(peer); + verify(network, times(1)).initiateOutboundConnection(eq(peer), any()); } @Test public void checkMaintainedConnectionPeers_doesNotConnectToDisallowedPeer() { - final DefaultP2PNetwork network = mockNetwork(); + final MaintainedPeers maintainedPeers = new MaintainedPeers(); + final DefaultP2PNetwork network = + spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build()); + final Peer peer = mockPeer(); + maintainedPeers.add(peer); network.start(); // Add peer that is not permitted final Peer localNode = DefaultPeer.fromEnodeURL(network.getLocalEnode().get()); - final Peer peer = mockPeer(); doReturn(false) .when(peerPermissions) .isPermitted(eq(localNode), eq(peer), eq(Action.RLPX_ALLOW_NEW_OUTBOUND_CONNECTION)); - assertThat(network.peerMaintainConnectionList.add(peer)).isTrue(); network.checkMaintainedConnectionPeers(); - verify(network, never()).connect(peer); + verify(network, times(0)).initiateOutboundConnection(eq(peer), any()); } @Test @@ -231,11 +244,11 @@ public final class DefaultP2PNetworkTest { // Add peer to maintained list assertThat(network.addMaintainConnectionPeer(peer)).isTrue(); - verify(network, times(1)).connect(peer); + verify(network, times(1)).initiateOutboundConnection(eq(peer), any()); // Check maintained connections network.checkMaintainedConnectionPeers(); - verify(network, times(1)).connect(peer); + verify(network, times(1)).initiateOutboundConnection(eq(peer), any()); } @Test @@ -452,29 +465,33 @@ public final class DefaultP2PNetworkTest { @Test public void removePeerReturnsTrueIfNodeWasInMaintainedConnectionsAndDisconnectsIfInPending() { - final DefaultP2PNetwork network = network(); + final MaintainedPeers maintainedPeers = new MaintainedPeers(); + final DefaultP2PNetwork network = + spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build()); network.start(); final Peer remotePeer = mockPeer("127.0.0.2", 30302); final PeerConnection peerConnection = mockPeerConnection(remotePeer); network.addMaintainConnectionPeer(remotePeer); - assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isTrue(); + assertThat(maintainedPeers.contains(remotePeer)).isTrue(); assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue(); assertThat(network.removeMaintainedConnectionPeer(remotePeer)).isTrue(); - assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse(); + assertThat(maintainedPeers.contains(remotePeer)).isFalse(); // Note: The pendingConnection future is not removed. assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue(); - // Complete the connection, and ensure "disconnect is automatically called. + // Complete the connection, and ensure disconnect is automatically called. network.pendingConnections.get(remotePeer).complete(peerConnection); verify(peerConnection).disconnect(DisconnectReason.REQUESTED); } @Test public void removePeerReturnsFalseIfNotInMaintainedListButDisconnectsPeer() { - final DefaultP2PNetwork network = network(); + final MaintainedPeers maintainedPeers = new MaintainedPeers(); + final DefaultP2PNetwork network = + spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build()); network.start(); final Peer remotePeer = mockPeer("127.0.0.2", 30302); @@ -482,13 +499,13 @@ public final class DefaultP2PNetworkTest { final CompletableFuture future = network.connect(remotePeer); - assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse(); + assertThat(maintainedPeers.contains(remotePeer)).isFalse(); assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue(); future.complete(peerConnection); assertThat(network.pendingConnections.containsKey(remotePeer)).isFalse(); assertThat(network.removeMaintainedConnectionPeer(remotePeer)).isFalse(); - assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse(); + assertThat(maintainedPeers.contains(remotePeer)).isFalse(); verify(peerConnection).disconnect(DisconnectReason.REQUESTED); } @@ -731,7 +748,7 @@ public final class DefaultP2PNetworkTest { final CompletableFuture connectionResult = network.connect(peer); assertThat(connectionResult).isCompletedExceptionally(); assertThatThrownBy(connectionResult::get) - .hasCauseInstanceOf(IllegalStateException.class) + .hasCauseInstanceOf(IllegalArgumentException.class) .hasMessageContaining( "Attempt to connect to peer with no listening port: " + peer.getEnodeURLString()); } @@ -829,6 +846,10 @@ public final class DefaultP2PNetworkTest { return (DefaultP2PNetwork) builder(rlpxConfig).build(); } + private DefaultP2PNetwork.Builder builder() { + return builder(RlpxConfiguration::create); + } + private DefaultP2PNetwork.Builder builder(final Supplier rlpxConfig) { final DiscoveryConfiguration noDiscovery = DiscoveryConfiguration.create().setActive(false); final NetworkingConfiguration networkingConfiguration = diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/P2PNetworkTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/P2PNetworkTest.java index 101a5d6aa9..e713fded9f 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/P2PNetworkTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/P2PNetworkTest.java @@ -125,17 +125,18 @@ public class P2PNetworkTest { final BytesValue listenId = listenerEnode.getNodeId(); final int listenPort = listenerEnode.getListeningPort().getAsInt(); - assertThat( - connector - .connect(createPeer(listenId, listenPort)) - .get(30L, TimeUnit.SECONDS) - .getPeerInfo() - .getNodeId()) - .isEqualTo(listenId); - final CompletableFuture secondConnectionFuture = + final CompletableFuture firstFuture = + connector.connect(createPeer(listenId, listenPort)); + final CompletableFuture secondFuture = connector.connect(createPeer(listenId, listenPort)); - assertThatThrownBy(secondConnectionFuture::get) - .hasCause(new IllegalStateException("Client already connected")); + + final PeerConnection firstConnection = firstFuture.get(30L, TimeUnit.SECONDS); + final PeerConnection secondConnection = secondFuture.get(30L, TimeUnit.SECONDS); + assertThat(firstConnection.getPeerInfo().getNodeId()).isEqualTo(listenId); + + // Connections should reference the same instance - i.e. we shouldn't create 2 distinct + // connections + assertThat(firstConnection == secondConnection).isTrue(); } } diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/peers/MaintainedPeersTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/peers/MaintainedPeersTest.java new file mode 100644 index 0000000000..7c223fe4c0 --- /dev/null +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/peers/MaintainedPeersTest.java @@ -0,0 +1,147 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.p2p.peers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import tech.pegasys.pantheon.util.enode.EnodeURL; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.junit.Test; + +public class MaintainedPeersTest { + + private MaintainedPeers maintainedPeers = new MaintainedPeers(); + + @Test + public void add_newPeer() { + final Peer peer = createPeer(); + final AtomicInteger callbackCount = new AtomicInteger(0); + maintainedPeers.subscribeAdd( + (addedPeer, wasAdded) -> { + callbackCount.incrementAndGet(); + assertThat(addedPeer).isEqualTo(peer); + assertThat(wasAdded).isTrue(); + }); + + assertThat(maintainedPeers.size()).isEqualTo(0); + assertThat(maintainedPeers.add(peer)).isTrue(); + assertThat(callbackCount).hasValue(1); + assertThat(maintainedPeers.size()).isEqualTo(1); + assertThat(maintainedPeers.contains(peer)).isTrue(); + } + + @Test + public void add_invalidPeer() { + final Peer peer = nonListeningPeer(); + assertThatThrownBy(() -> maintainedPeers.add(peer)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Invalid enode url. Enode url must contain a non-zero listening port."); + assertThat(maintainedPeers.contains(peer)).isFalse(); + } + + @Test + public void add_newDuplicatePeer() { + // Initial add + assertThat(maintainedPeers.size()).isEqualTo(0); + final Peer peer = createPeer(); + assertThat(maintainedPeers.add(peer)).isTrue(); + assertThat(maintainedPeers.size()).isEqualTo(1); + + // Test duplicate add + final AtomicInteger callbackCount = new AtomicInteger(0); + maintainedPeers.subscribeAdd( + (addedPeer, wasAdded) -> { + callbackCount.incrementAndGet(); + assertThat(addedPeer).isEqualTo(peer); + assertThat(wasAdded).isFalse(); + }); + assertThat(maintainedPeers.add(peer)).isFalse(); + assertThat(callbackCount).hasValue(1); + assertThat(maintainedPeers.size()).isEqualTo(1); + } + + @Test + public void remove_existingPeer() { + // Initial add + final Peer peer = createPeer(); + assertThat(maintainedPeers.add(peer)).isTrue(); + assertThat(maintainedPeers.size()).isEqualTo(1); + assertThat(maintainedPeers.contains(peer)).isTrue(); + + // Test remove + final AtomicInteger callbackCount = new AtomicInteger(0); + maintainedPeers.subscribeRemove( + (addedPeer, wasRemoved) -> { + callbackCount.incrementAndGet(); + assertThat(addedPeer).isEqualTo(peer); + assertThat(wasRemoved).isTrue(); + }); + assertThat(maintainedPeers.remove(peer)).isTrue(); + assertThat(callbackCount).hasValue(1); + assertThat(maintainedPeers.size()).isEqualTo(0); + assertThat(maintainedPeers.contains(peer)).isFalse(); + } + + @Test + public void remove_nonExistentPeer() { + final Peer peer = createPeer(); + assertThat(maintainedPeers.size()).isEqualTo(0); + + final AtomicInteger callbackCount = new AtomicInteger(0); + maintainedPeers.subscribeRemove( + (addedPeer, wasRemoved) -> { + callbackCount.incrementAndGet(); + assertThat(addedPeer).isEqualTo(peer); + assertThat(wasRemoved).isFalse(); + }); + assertThat(maintainedPeers.remove(peer)).isFalse(); + assertThat(callbackCount).hasValue(1); + assertThat(maintainedPeers.size()).isEqualTo(0); + } + + @Test + public void stream_withPeers() { + // Initial add + final Peer peerA = createPeer(); + final Peer peerB = createPeer(); + assertThat(maintainedPeers.add(peerA)).isTrue(); + assertThat(maintainedPeers.add(peerB)).isTrue(); + + final List peers = maintainedPeers.streamPeers().collect(Collectors.toList()); + assertThat(peers).containsExactlyInAnyOrder(peerA, peerB); + } + + @Test + public void stream_empty() { + final List peers = maintainedPeers.streamPeers().collect(Collectors.toList()); + assertThat(peers).isEmpty(); + } + + private Peer createPeer() { + return DefaultPeer.fromEnodeURL(enodeBuilder().build()); + } + + private Peer nonListeningPeer() { + return DefaultPeer.fromEnodeURL(enodeBuilder().disableListening().build()); + } + + private EnodeURL.Builder enodeBuilder() { + return EnodeURL.builder().ipAddress("127.0.0.1").useDefaultPorts().nodeId(Peer.randomId()); + } +}