[PAN-2730] Create MaintainedPeers class (#1484)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
mbaxter 6 years ago committed by GitHub
parent 689967964e
commit 527167827a
  1. 97
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java
  2. 74
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/peers/MaintainedPeers.java
  3. 73
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java
  4. 21
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/P2PNetworkTest.java
  5. 147
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/peers/MaintainedPeersTest.java

@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.network;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@ -35,6 +34,7 @@ import tech.pegasys.pantheon.ethereum.p2p.network.netty.HandshakeHandlerOutbound
import tech.pegasys.pantheon.ethereum.p2p.network.netty.PeerConnectionRegistry;
import tech.pegasys.pantheon.ethereum.p2p.network.netty.TimeoutHandler;
import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer;
import tech.pegasys.pantheon.ethereum.p2p.peers.MaintainedPeers;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissions;
import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissionsBlacklist;
@ -56,7 +56,6 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -158,7 +157,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
private final PeerPermissions peerPermissions;
private volatile Optional<PeerRlpxPermissions> rlpxPermissions = Optional.empty();
@VisibleForTesting final Collection<Peer> peerMaintainConnectionList;
private final MaintainedPeers maintainedPeers;
@VisibleForTesting final PeerConnectionRegistry connections;
@VisibleForTesting
@ -196,13 +195,14 @@ public class DefaultP2PNetwork implements P2PNetwork {
final NetworkingConfiguration config,
final List<Capability> supportedCapabilities,
final PeerPermissions peerPermissions,
final MaintainedPeers maintainedPeers,
final MetricsSystem metricsSystem) {
this.peerDiscoveryAgent = peerDiscoveryAgent;
this.keyPair = keyPair;
this.config = config;
this.supportedCapabilities = supportedCapabilities;
this.peerMaintainConnectionList = new HashSet<>();
this.maintainedPeers = maintainedPeers;
this.connections = new PeerConnectionRegistry(metricsSystem);
this.nodeId = this.keyPair.getPublicKey().getEncodedBytes();
@ -358,46 +358,19 @@ public class DefaultP2PNetwork implements P2PNetwork {
@Override
public boolean addMaintainConnectionPeer(final Peer peer) {
checkArgument(
peer.getEnodeURL().isListening(),
"Invalid enode url. Enode url must contain a non-zero listening port.");
final boolean added = peerMaintainConnectionList.add(peer);
final boolean allowConnection =
rlpxPermissions.isPresent() && rlpxPermissions.get().allowNewOutboundConnectionTo(peer);
if (allowConnection && !isConnectingOrConnected(peer)) {
// Connect immediately if appropriate
connect(peer);
}
return added;
return maintainedPeers.add(peer);
}
@Override
public boolean removeMaintainedConnectionPeer(final Peer peer) {
final boolean removed = peerMaintainConnectionList.remove(peer);
final CompletableFuture<PeerConnection> connectionFuture = pendingConnections.get(peer);
if (connectionFuture != null) {
connectionFuture.thenAccept(connection -> connection.disconnect(DisconnectReason.REQUESTED));
}
final Optional<PeerConnection> peerConnection = connections.getConnectionForPeer(peer.getId());
peerConnection.ifPresent(pc -> pc.disconnect(DisconnectReason.REQUESTED));
peerDiscoveryAgent.dropPeer(peer);
return removed;
return maintainedPeers.remove(peer);
}
void checkMaintainedConnectionPeers() {
if (!rlpxPermissions.isPresent()) {
return;
}
final PeerRlpxPermissions permissions = rlpxPermissions.get();
peerMaintainConnectionList.stream()
.filter(p -> !isConnectingOrConnected(p))
.filter(permissions::allowNewOutboundConnectionTo)
.forEach(this::connect);
maintainedPeers.streamPeers().forEach(this::connect);
}
@VisibleForTesting
@ -459,20 +432,35 @@ public class DefaultP2PNetwork implements P2PNetwork {
return connectionFuture;
}
LOG.trace("Initiating connection to peer: {}", peer.getId());
// Check for existing connection
final Optional<PeerConnection> existingConnection =
connections.getConnectionForPeer(peer.getId());
if (existingConnection.isPresent()) {
connectionFuture.complete(existingConnection.get());
return connectionFuture;
}
// Check for existing pending connection
final CompletableFuture<PeerConnection> existingPendingConnection =
pendingConnections.putIfAbsent(peer, connectionFuture);
if (existingPendingConnection != null) {
LOG.debug("Attempted to connect to peer with pending connection: {}", peer.getId());
return existingPendingConnection;
}
initiateOutboundConnection(peer, connectionFuture);
return connectionFuture;
}
@VisibleForTesting
void initiateOutboundConnection(
final Peer peer, final CompletableFuture<PeerConnection> connectionFuture) {
LOG.trace("Initiating connection to peer: {}", peer.getId());
final EnodeURL enode = peer.getEnodeURL();
if (!enode.isListening()) {
final String errorMsg =
"Attempt to connect to peer with no listening port: " + enode.toString();
LOG.warn(errorMsg);
connectionFuture.completeExceptionally(new IllegalStateException(errorMsg));
return connectionFuture;
connectionFuture.completeExceptionally(new IllegalArgumentException(errorMsg));
return;
}
if (peer instanceof DiscoveryPeer) {
@ -528,7 +516,6 @@ public class DefaultP2PNetwork implements P2PNetwork {
}
logConnections();
});
return connectionFuture;
}
private void logConnections() {
@ -567,12 +554,32 @@ public class DefaultP2PNetwork implements P2PNetwork {
final Peer ourNode = createLocalNode();
this.rlpxPermissions = Optional.of(new PeerRlpxPermissions(ourNode, peerPermissions));
this.maintainedPeers.subscribeAdd(this::handleMaintainedPeerAdded);
this.maintainedPeers.subscribeRemove(this::handleMaintainedPeerRemoved);
peerConnectionScheduler.scheduleWithFixedDelay(
this::checkMaintainedConnectionPeers, 2, 60, TimeUnit.SECONDS);
peerConnectionScheduler.scheduleWithFixedDelay(
this::attemptPeerConnections, 30, 30, TimeUnit.SECONDS);
}
private void handleMaintainedPeerRemoved(final Peer peer, final boolean wasRemoved) {
// Drop peer from peer table
peerDiscoveryAgent.dropPeer(peer);
// Disconnect if connected or connecting
final CompletableFuture<PeerConnection> connectionFuture = pendingConnections.get(peer);
if (connectionFuture != null) {
connectionFuture.thenAccept(connection -> connection.disconnect(DisconnectReason.REQUESTED));
}
final Optional<PeerConnection> peerConnection = connections.getConnectionForPeer(peer.getId());
peerConnection.ifPresent(pc -> pc.disconnect(DisconnectReason.REQUESTED));
}
private void handleMaintainedPeerAdded(final Peer peer, final boolean wasAdded) {
this.connect(peer);
}
@VisibleForTesting
Consumer<PeerBondedEvent> handlePeerBondedEvent() {
return event -> {
@ -626,10 +633,6 @@ public class DefaultP2PNetwork implements P2PNetwork {
return connections.isAlreadyConnected(peer.getId());
}
private boolean isConnectingOrConnected(final Peer peer) {
return isConnected(peer) || isConnecting(peer);
}
@Override
public void stop() {
if (!this.started.get() || !stopped.compareAndSet(false, true)) {
@ -728,6 +731,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
private Optional<NodePermissioningController> nodePermissioningController = Optional.empty();
private Blockchain blockchain = null;
private Vertx vertx;
private MaintainedPeers maintainedPeers = new MaintainedPeers();
public P2PNetwork build() {
validate();
@ -750,6 +754,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
config,
supportedCapabilities,
peerPermissions,
maintainedPeers,
metricsSystem);
}
@ -829,5 +834,11 @@ public class DefaultP2PNetwork implements P2PNetwork {
this.blockchain = blockchain;
return this;
}
public Builder maintainedPeers(final MaintainedPeers maintainedPeers) {
checkNotNull(maintainedPeers);
this.maintainedPeers = maintainedPeers;
return this;
}
}
}

@ -0,0 +1,74 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.peers;
import static com.google.common.base.Preconditions.checkArgument;
import tech.pegasys.pantheon.util.Subscribers;
import java.util.Set;
import java.util.stream.Stream;
import io.vertx.core.impl.ConcurrentHashSet;
/** Represents a set of peers for which connections should be actively maintained. */
public class MaintainedPeers {
private final Set<Peer> maintainedPeers = new ConcurrentHashSet<>();
private final Subscribers<PeerAddedCallback> addedSubscribers = new Subscribers<>();
private final Subscribers<PeerRemovedCallback> removedCallbackSubscribers = new Subscribers<>();
public boolean add(final Peer peer) {
checkArgument(
peer.getEnodeURL().isListening(),
"Invalid enode url. Enode url must contain a non-zero listening port.");
boolean wasAdded = maintainedPeers.add(peer);
addedSubscribers.forEach(s -> s.onPeerAdded(peer, wasAdded));
return wasAdded;
}
public boolean remove(final Peer peer) {
boolean wasRemoved = maintainedPeers.remove(peer);
removedCallbackSubscribers.forEach(s -> s.onPeerRemoved(peer, wasRemoved));
return wasRemoved;
}
public boolean contains(final Peer peer) {
return maintainedPeers.contains(peer);
}
public int size() {
return maintainedPeers.size();
}
public void subscribeAdd(final PeerAddedCallback callback) {
addedSubscribers.subscribe(callback);
}
public void subscribeRemove(final PeerRemovedCallback callback) {
removedCallbackSubscribers.subscribe(callback);
}
public Stream<Peer> streamPeers() {
return maintainedPeers.stream();
}
@FunctionalInterface
public interface PeerAddedCallback {
void onPeerAdded(Peer peer, boolean wasAdded);
}
@FunctionalInterface
public interface PeerRemovedCallback {
void onPeerRemoved(Peer peer, boolean wasRemoved);
}
}

@ -44,6 +44,7 @@ import tech.pegasys.pantheon.ethereum.p2p.discovery.Endpoint;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerBondedEvent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryStatus;
import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer;
import tech.pegasys.pantheon.ethereum.p2p.peers.MaintainedPeers;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissions;
import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissions.Action;
@ -109,19 +110,23 @@ public final class DefaultP2PNetworkTest {
@Test
public void addingMaintainedConnectionPeer_startsConnection() {
final DefaultP2PNetwork network = mockNetwork();
final MaintainedPeers maintainedPeers = new MaintainedPeers();
final DefaultP2PNetwork network =
spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build());
network.start();
final Peer peer = mockPeer();
assertThat(network.addMaintainConnectionPeer(peer)).isTrue();
assertThat(network.peerMaintainConnectionList).contains(peer);
verify(network, times(1)).connect(peer);
assertThat(maintainedPeers.contains(peer)).isTrue();
verify(network, times(1)).initiateOutboundConnection(eq(peer), any());
}
@Test
public void addingMaintainedConnectionPeer_forDisallowedPeer() {
final DefaultP2PNetwork network = mockNetwork();
final MaintainedPeers maintainedPeers = new MaintainedPeers();
final DefaultP2PNetwork network =
spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build());
network.start();
final Peer localNode = DefaultPeer.fromEnodeURL(network.getLocalEnode().get());
@ -133,18 +138,20 @@ public final class DefaultP2PNetworkTest {
assertThat(network.addMaintainConnectionPeer(peer)).isTrue();
// Add peer but do not connect
assertThat(network.peerMaintainConnectionList).contains(peer);
verify(network, times(0)).connect(peer);
assertThat(maintainedPeers.contains(peer)).isTrue();
verify(network, times(0)).initiateOutboundConnection(eq(peer), any());
}
@Test
public void addMaintainConnectionPeer_beforeStartingNetwork() {
final DefaultP2PNetwork network = mockNetwork();
final MaintainedPeers maintainedPeers = new MaintainedPeers();
final DefaultP2PNetwork network =
spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build());
final Peer peer = mockPeer();
assertThat(network.addMaintainConnectionPeer(peer)).isTrue();
assertThat(network.peerMaintainConnectionList).contains(peer);
assertThat(maintainedPeers.contains(peer)).isTrue();
verify(network, never()).connect(peer);
}
@ -179,32 +186,38 @@ public final class DefaultP2PNetworkTest {
@Test
public void checkMaintainedConnectionPeersTriesToConnect() {
final DefaultP2PNetwork network = mockNetwork();
final MaintainedPeers maintainedPeers = new MaintainedPeers();
final DefaultP2PNetwork network =
spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build());
final Peer peer = mockPeer();
maintainedPeers.add(peer);
network.start();
final Peer peer = mockPeer();
network.peerMaintainConnectionList.add(peer);
verify(network, times(0)).connect(peer);
network.checkMaintainedConnectionPeers();
verify(network, times(1)).connect(peer);
verify(network, times(1)).initiateOutboundConnection(eq(peer), any());
}
@Test
public void checkMaintainedConnectionPeers_doesNotConnectToDisallowedPeer() {
final DefaultP2PNetwork network = mockNetwork();
final MaintainedPeers maintainedPeers = new MaintainedPeers();
final DefaultP2PNetwork network =
spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build());
final Peer peer = mockPeer();
maintainedPeers.add(peer);
network.start();
// Add peer that is not permitted
final Peer localNode = DefaultPeer.fromEnodeURL(network.getLocalEnode().get());
final Peer peer = mockPeer();
doReturn(false)
.when(peerPermissions)
.isPermitted(eq(localNode), eq(peer), eq(Action.RLPX_ALLOW_NEW_OUTBOUND_CONNECTION));
assertThat(network.peerMaintainConnectionList.add(peer)).isTrue();
network.checkMaintainedConnectionPeers();
verify(network, never()).connect(peer);
verify(network, times(0)).initiateOutboundConnection(eq(peer), any());
}
@Test
@ -231,11 +244,11 @@ public final class DefaultP2PNetworkTest {
// Add peer to maintained list
assertThat(network.addMaintainConnectionPeer(peer)).isTrue();
verify(network, times(1)).connect(peer);
verify(network, times(1)).initiateOutboundConnection(eq(peer), any());
// Check maintained connections
network.checkMaintainedConnectionPeers();
verify(network, times(1)).connect(peer);
verify(network, times(1)).initiateOutboundConnection(eq(peer), any());
}
@Test
@ -452,29 +465,33 @@ public final class DefaultP2PNetworkTest {
@Test
public void removePeerReturnsTrueIfNodeWasInMaintainedConnectionsAndDisconnectsIfInPending() {
final DefaultP2PNetwork network = network();
final MaintainedPeers maintainedPeers = new MaintainedPeers();
final DefaultP2PNetwork network =
spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build());
network.start();
final Peer remotePeer = mockPeer("127.0.0.2", 30302);
final PeerConnection peerConnection = mockPeerConnection(remotePeer);
network.addMaintainConnectionPeer(remotePeer);
assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isTrue();
assertThat(maintainedPeers.contains(remotePeer)).isTrue();
assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue();
assertThat(network.removeMaintainedConnectionPeer(remotePeer)).isTrue();
assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse();
assertThat(maintainedPeers.contains(remotePeer)).isFalse();
// Note: The pendingConnection future is not removed.
assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue();
// Complete the connection, and ensure "disconnect is automatically called.
// Complete the connection, and ensure disconnect is automatically called.
network.pendingConnections.get(remotePeer).complete(peerConnection);
verify(peerConnection).disconnect(DisconnectReason.REQUESTED);
}
@Test
public void removePeerReturnsFalseIfNotInMaintainedListButDisconnectsPeer() {
final DefaultP2PNetwork network = network();
final MaintainedPeers maintainedPeers = new MaintainedPeers();
final DefaultP2PNetwork network =
spy((DefaultP2PNetwork) builder().maintainedPeers(maintainedPeers).build());
network.start();
final Peer remotePeer = mockPeer("127.0.0.2", 30302);
@ -482,13 +499,13 @@ public final class DefaultP2PNetworkTest {
final CompletableFuture<PeerConnection> future = network.connect(remotePeer);
assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse();
assertThat(maintainedPeers.contains(remotePeer)).isFalse();
assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue();
future.complete(peerConnection);
assertThat(network.pendingConnections.containsKey(remotePeer)).isFalse();
assertThat(network.removeMaintainedConnectionPeer(remotePeer)).isFalse();
assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse();
assertThat(maintainedPeers.contains(remotePeer)).isFalse();
verify(peerConnection).disconnect(DisconnectReason.REQUESTED);
}
@ -731,7 +748,7 @@ public final class DefaultP2PNetworkTest {
final CompletableFuture<PeerConnection> connectionResult = network.connect(peer);
assertThat(connectionResult).isCompletedExceptionally();
assertThatThrownBy(connectionResult::get)
.hasCauseInstanceOf(IllegalStateException.class)
.hasCauseInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Attempt to connect to peer with no listening port: " + peer.getEnodeURLString());
}
@ -829,6 +846,10 @@ public final class DefaultP2PNetworkTest {
return (DefaultP2PNetwork) builder(rlpxConfig).build();
}
private DefaultP2PNetwork.Builder builder() {
return builder(RlpxConfiguration::create);
}
private DefaultP2PNetwork.Builder builder(final Supplier<RlpxConfiguration> rlpxConfig) {
final DiscoveryConfiguration noDiscovery = DiscoveryConfiguration.create().setActive(false);
final NetworkingConfiguration networkingConfiguration =

@ -125,17 +125,18 @@ public class P2PNetworkTest {
final BytesValue listenId = listenerEnode.getNodeId();
final int listenPort = listenerEnode.getListeningPort().getAsInt();
assertThat(
connector
.connect(createPeer(listenId, listenPort))
.get(30L, TimeUnit.SECONDS)
.getPeerInfo()
.getNodeId())
.isEqualTo(listenId);
final CompletableFuture<PeerConnection> secondConnectionFuture =
final CompletableFuture<PeerConnection> firstFuture =
connector.connect(createPeer(listenId, listenPort));
final CompletableFuture<PeerConnection> secondFuture =
connector.connect(createPeer(listenId, listenPort));
assertThatThrownBy(secondConnectionFuture::get)
.hasCause(new IllegalStateException("Client already connected"));
final PeerConnection firstConnection = firstFuture.get(30L, TimeUnit.SECONDS);
final PeerConnection secondConnection = secondFuture.get(30L, TimeUnit.SECONDS);
assertThat(firstConnection.getPeerInfo().getNodeId()).isEqualTo(listenId);
// Connections should reference the same instance - i.e. we shouldn't create 2 distinct
// connections
assertThat(firstConnection == secondConnection).isTrue();
}
}

@ -0,0 +1,147 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.peers;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import tech.pegasys.pantheon.util.enode.EnodeURL;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.Test;
public class MaintainedPeersTest {
private MaintainedPeers maintainedPeers = new MaintainedPeers();
@Test
public void add_newPeer() {
final Peer peer = createPeer();
final AtomicInteger callbackCount = new AtomicInteger(0);
maintainedPeers.subscribeAdd(
(addedPeer, wasAdded) -> {
callbackCount.incrementAndGet();
assertThat(addedPeer).isEqualTo(peer);
assertThat(wasAdded).isTrue();
});
assertThat(maintainedPeers.size()).isEqualTo(0);
assertThat(maintainedPeers.add(peer)).isTrue();
assertThat(callbackCount).hasValue(1);
assertThat(maintainedPeers.size()).isEqualTo(1);
assertThat(maintainedPeers.contains(peer)).isTrue();
}
@Test
public void add_invalidPeer() {
final Peer peer = nonListeningPeer();
assertThatThrownBy(() -> maintainedPeers.add(peer))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Invalid enode url. Enode url must contain a non-zero listening port.");
assertThat(maintainedPeers.contains(peer)).isFalse();
}
@Test
public void add_newDuplicatePeer() {
// Initial add
assertThat(maintainedPeers.size()).isEqualTo(0);
final Peer peer = createPeer();
assertThat(maintainedPeers.add(peer)).isTrue();
assertThat(maintainedPeers.size()).isEqualTo(1);
// Test duplicate add
final AtomicInteger callbackCount = new AtomicInteger(0);
maintainedPeers.subscribeAdd(
(addedPeer, wasAdded) -> {
callbackCount.incrementAndGet();
assertThat(addedPeer).isEqualTo(peer);
assertThat(wasAdded).isFalse();
});
assertThat(maintainedPeers.add(peer)).isFalse();
assertThat(callbackCount).hasValue(1);
assertThat(maintainedPeers.size()).isEqualTo(1);
}
@Test
public void remove_existingPeer() {
// Initial add
final Peer peer = createPeer();
assertThat(maintainedPeers.add(peer)).isTrue();
assertThat(maintainedPeers.size()).isEqualTo(1);
assertThat(maintainedPeers.contains(peer)).isTrue();
// Test remove
final AtomicInteger callbackCount = new AtomicInteger(0);
maintainedPeers.subscribeRemove(
(addedPeer, wasRemoved) -> {
callbackCount.incrementAndGet();
assertThat(addedPeer).isEqualTo(peer);
assertThat(wasRemoved).isTrue();
});
assertThat(maintainedPeers.remove(peer)).isTrue();
assertThat(callbackCount).hasValue(1);
assertThat(maintainedPeers.size()).isEqualTo(0);
assertThat(maintainedPeers.contains(peer)).isFalse();
}
@Test
public void remove_nonExistentPeer() {
final Peer peer = createPeer();
assertThat(maintainedPeers.size()).isEqualTo(0);
final AtomicInteger callbackCount = new AtomicInteger(0);
maintainedPeers.subscribeRemove(
(addedPeer, wasRemoved) -> {
callbackCount.incrementAndGet();
assertThat(addedPeer).isEqualTo(peer);
assertThat(wasRemoved).isFalse();
});
assertThat(maintainedPeers.remove(peer)).isFalse();
assertThat(callbackCount).hasValue(1);
assertThat(maintainedPeers.size()).isEqualTo(0);
}
@Test
public void stream_withPeers() {
// Initial add
final Peer peerA = createPeer();
final Peer peerB = createPeer();
assertThat(maintainedPeers.add(peerA)).isTrue();
assertThat(maintainedPeers.add(peerB)).isTrue();
final List<Peer> peers = maintainedPeers.streamPeers().collect(Collectors.toList());
assertThat(peers).containsExactlyInAnyOrder(peerA, peerB);
}
@Test
public void stream_empty() {
final List<Peer> peers = maintainedPeers.streamPeers().collect(Collectors.toList());
assertThat(peers).isEmpty();
}
private Peer createPeer() {
return DefaultPeer.fromEnodeURL(enodeBuilder().build());
}
private Peer nonListeningPeer() {
return DefaultPeer.fromEnodeURL(enodeBuilder().disableListening().build());
}
private EnodeURL.Builder enodeBuilder() {
return EnodeURL.builder().ipAddress("127.0.0.1").useDefaultPorts().nodeId(Peer.randomId());
}
}
Loading…
Cancel
Save