mirror of https://github.com/hyperledger/besu
[PAN-2588] Create P2PNetwork Builder (#1343)
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>pull/2/head
parent
3cd9f94dac
commit
d25a6026ec
@ -1,259 +0,0 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p; |
||||
|
||||
import static java.util.Collections.emptyList; |
||||
import static java.util.stream.Collectors.toList; |
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertTrue; |
||||
import static tech.pegasys.pantheon.ethereum.p2p.NetworkingTestHelper.configWithRandomPorts; |
||||
|
||||
import tech.pegasys.pantheon.crypto.SECP256K1; |
||||
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.NetworkingConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryServiceException; |
||||
import tech.pegasys.pantheon.ethereum.p2p.netty.NettyP2PNetwork; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; |
||||
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; |
||||
import tech.pegasys.pantheon.util.enode.EnodeURL; |
||||
|
||||
import java.io.IOException; |
||||
import java.util.Optional; |
||||
|
||||
import io.vertx.core.Vertx; |
||||
import org.assertj.core.api.Assertions; |
||||
import org.junit.After; |
||||
import org.junit.Test; |
||||
|
||||
public class NetworkingServiceLifecycleTest { |
||||
|
||||
private final Vertx vertx = Vertx.vertx(); |
||||
|
||||
@After |
||||
public void closeVertx() { |
||||
vertx.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void createP2PNetwork() { |
||||
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
final NetworkingConfiguration config = configWithRandomPorts(); |
||||
try (final NettyP2PNetwork service = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
config, |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
service.start(); |
||||
final EnodeURL enode = service.getLocalEnode().get(); |
||||
final int udpPort = enode.getEffectiveDiscoveryPort(); |
||||
final int tcpPort = enode.getListeningPort(); |
||||
|
||||
assertEquals(config.getDiscovery().getAdvertisedHost(), enode.getIp()); |
||||
assertThat(udpPort).isNotZero(); |
||||
assertThat(tcpPort).isNotZero(); |
||||
assertThat(service.getDiscoveryPeers()).hasSize(0); |
||||
} |
||||
} |
||||
|
||||
@Test(expected = IllegalArgumentException.class) |
||||
public void createP2PNetwork_NullHost() throws IOException { |
||||
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
final NetworkingConfiguration config = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(DiscoveryConfiguration.create().setBindHost(null)); |
||||
try (final P2PNetwork broken = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
config, |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
Assertions.fail("Expected Exception"); |
||||
} |
||||
} |
||||
|
||||
@Test(expected = IllegalArgumentException.class) |
||||
public void createP2PNetwork_InvalidHost() throws IOException { |
||||
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
final NetworkingConfiguration config = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(DiscoveryConfiguration.create().setBindHost("fake.fake.fake")); |
||||
try (final P2PNetwork broken = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
config, |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
Assertions.fail("Expected Exception"); |
||||
} |
||||
} |
||||
|
||||
@Test(expected = IllegalArgumentException.class) |
||||
public void createP2PNetwork_InvalidPort() throws IOException { |
||||
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
final NetworkingConfiguration config = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(DiscoveryConfiguration.create().setBindPort(-1)); |
||||
try (final P2PNetwork broken = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
config, |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
Assertions.fail("Expected Exception"); |
||||
} |
||||
} |
||||
|
||||
@Test(expected = IllegalArgumentException.class) |
||||
public void createP2PNetwork_NullKeyPair() throws IOException { |
||||
try (final P2PNetwork broken = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
null, |
||||
configWithRandomPorts(), |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
Assertions.fail("Expected Exception"); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void startStopP2PNetwork() { |
||||
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
try (final NettyP2PNetwork service = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
configWithRandomPorts(), |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
service.start(); |
||||
service.stop(); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void startDiscoveryAgentBackToBack() { |
||||
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
try (final NettyP2PNetwork service1 = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
configWithRandomPorts(), |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty()); |
||||
final NettyP2PNetwork service2 = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
configWithRandomPorts(), |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
service1.start(); |
||||
service1.stop(); |
||||
service2.start(); |
||||
service2.stop(); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void startDiscoveryPortInUse() { |
||||
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
try (final NettyP2PNetwork service1 = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
configWithRandomPorts(), |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
service1.start(); |
||||
final NetworkingConfiguration config = configWithRandomPorts(); |
||||
config.getDiscovery().setBindPort(service1.getLocalEnode().get().getEffectiveDiscoveryPort()); |
||||
try (final NettyP2PNetwork service2 = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
config, |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
try { |
||||
service2.start(); |
||||
} catch (final Exception e) { |
||||
assertThat(e).hasCauseExactlyInstanceOf(PeerDiscoveryServiceException.class); |
||||
assertThat(e) |
||||
.hasMessageStartingWith( |
||||
"tech.pegasys.pantheon.ethereum.p2p.discovery." |
||||
+ "PeerDiscoveryServiceException: Failed to bind Ethereum UDP discovery listener to 0.0.0.0:"); |
||||
assertThat(e).hasMessageContaining("Address already in use"); |
||||
} finally { |
||||
service1.stop(); |
||||
service2.stop(); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void createP2PNetwork_NoActivePeers() { |
||||
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
try (final NettyP2PNetwork agent = |
||||
new NettyP2PNetwork( |
||||
vertx, |
||||
keyPair, |
||||
configWithRandomPorts(), |
||||
emptyList(), |
||||
new PeerBlacklist(), |
||||
new NoOpMetricsSystem(), |
||||
Optional.empty(), |
||||
Optional.empty())) { |
||||
assertTrue(agent.getDiscoveryPeers().collect(toList()).isEmpty()); |
||||
assertEquals(0, agent.getPeers().size()); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,88 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
|
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.junit.runners.Parameterized; |
||||
import org.junit.runners.Parameterized.Parameters; |
||||
|
||||
@RunWith(Parameterized.class) |
||||
public class PeerRequirementCombineTest { |
||||
private static final PeerRequirement fulfilled = () -> true; |
||||
private static final PeerRequirement notFulfilled = () -> false; |
||||
|
||||
private static final AtomicBoolean configurableIsFulfilled = new AtomicBoolean(true); |
||||
private static final PeerRequirement configurable = configurableIsFulfilled::get; |
||||
|
||||
private final List<PeerRequirement> requirements; |
||||
private final boolean expectedResult; |
||||
|
||||
public PeerRequirementCombineTest( |
||||
final List<PeerRequirement> requirements, final boolean expectedResult) { |
||||
this.requirements = requirements; |
||||
this.expectedResult = expectedResult; |
||||
} |
||||
|
||||
@Parameters |
||||
public static Collection<Object[]> data() { |
||||
return Arrays.asList( |
||||
new Object[][] { |
||||
{Collections.emptyList(), true}, |
||||
{Arrays.asList(fulfilled), true}, |
||||
{Arrays.asList(notFulfilled), false}, |
||||
{Arrays.asList(notFulfilled, notFulfilled), false}, |
||||
{Arrays.asList(notFulfilled, fulfilled), false}, |
||||
{Arrays.asList(fulfilled, notFulfilled), false}, |
||||
{Arrays.asList(fulfilled, fulfilled), true} |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
public void combine() { |
||||
PeerRequirement combined = PeerRequirement.combine(requirements); |
||||
assertThat(combined.hasSufficientPeers()).isEqualTo(expectedResult); |
||||
} |
||||
|
||||
@Test |
||||
public void combineAndModify() { |
||||
List<PeerRequirement> modifiableRequirements = new ArrayList<>(requirements); |
||||
modifiableRequirements.add(configurable); |
||||
|
||||
PeerRequirement combined = PeerRequirement.combine(modifiableRequirements); |
||||
assertThat(combined.hasSufficientPeers()).isEqualTo(expectedResult); |
||||
|
||||
// If the configurable requirement switches to false, we should always get false
|
||||
configurableIsFulfilled.set(false); |
||||
assertThat(combined.hasSufficientPeers()).isFalse(); |
||||
|
||||
// Otherwise, we should get our expected result
|
||||
configurableIsFulfilled.set(true); |
||||
assertThat(combined.hasSufficientPeers()).isEqualTo(expectedResult); |
||||
} |
||||
|
||||
@Test |
||||
public void combine_withOn() { |
||||
PeerRequirement combined = PeerRequirement.combine(Collections.emptyList()); |
||||
assertThat(combined.hasSufficientPeers()).isTrue(); |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,621 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.network; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Java6Assertions.catchThrowable; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.argThat; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.doReturn; |
||||
import static org.mockito.Mockito.lenient; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.never; |
||||
import static org.mockito.Mockito.reset; |
||||
import static org.mockito.Mockito.spy; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import tech.pegasys.pantheon.crypto.SECP256K1; |
||||
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; |
||||
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent; |
||||
import tech.pegasys.pantheon.ethereum.chain.BlockAddedObserver; |
||||
import tech.pegasys.pantheon.ethereum.chain.Blockchain; |
||||
import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; |
||||
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork; |
||||
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.NetworkingConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.RlpxConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.DiscoveryPeer; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerBondedEvent; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryStatus; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.Endpoint; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; |
||||
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; |
||||
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; |
||||
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; |
||||
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; |
||||
import tech.pegasys.pantheon.ethereum.permissioning.node.NodePermissioningController; |
||||
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
import tech.pegasys.pantheon.util.enode.EnodeURL; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
import java.util.OptionalInt; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.function.Supplier; |
||||
import java.util.stream.Collectors; |
||||
import java.util.stream.Stream; |
||||
|
||||
import io.vertx.core.Vertx; |
||||
import org.junit.After; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.ArgumentMatcher; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
/** Tests for {@link DefaultP2PNetwork}. */ |
||||
@RunWith(MockitoJUnitRunner.StrictStubs.class) |
||||
public final class DefaultP2PNetworkTest { |
||||
|
||||
@Mock private NodePermissioningController nodePermissioningController; |
||||
|
||||
@Mock private Blockchain blockchain; |
||||
|
||||
private ArgumentCaptor<BlockAddedObserver> observerCaptor = |
||||
ArgumentCaptor.forClass(BlockAddedObserver.class); |
||||
|
||||
private final Vertx vertx = Vertx.vertx(); |
||||
private final NetworkingConfiguration config = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(DiscoveryConfiguration.create().setActive(false)) |
||||
.setSupportedProtocols(subProtocol()) |
||||
.setRlpx(RlpxConfiguration.create().setBindPort(0)); |
||||
|
||||
@Before |
||||
public void before() { |
||||
when(blockchain.observeBlockAdded(observerCaptor.capture())).thenReturn(1L); |
||||
} |
||||
|
||||
@After |
||||
public void closeVertx() { |
||||
vertx.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void addingMaintainedNetworkPeerStartsConnection() { |
||||
final DefaultP2PNetwork network = mockNetwork(); |
||||
final Peer peer = mockPeer(); |
||||
|
||||
assertThat(network.addMaintainConnectionPeer(peer)).isTrue(); |
||||
|
||||
assertThat(network.peerMaintainConnectionList).contains(peer); |
||||
verify(network, times(1)).connect(peer); |
||||
} |
||||
|
||||
@Test |
||||
public void addingRepeatMaintainedPeersReturnsFalse() { |
||||
final P2PNetwork network = network(); |
||||
final Peer peer = mockPeer(); |
||||
assertThat(network.addMaintainConnectionPeer(peer)).isTrue(); |
||||
assertThat(network.addMaintainConnectionPeer(peer)).isFalse(); |
||||
} |
||||
|
||||
@Test |
||||
public void checkMaintainedConnectionPeersTriesToConnect() { |
||||
final DefaultP2PNetwork network = mockNetwork(); |
||||
final Peer peer = mockPeer(); |
||||
network.peerMaintainConnectionList.add(peer); |
||||
|
||||
network.checkMaintainedConnectionPeers(); |
||||
verify(network, times(1)).connect(peer); |
||||
} |
||||
|
||||
@Test |
||||
public void checkMaintainedConnectionPeersDoesntReconnectPendingPeers() { |
||||
final DefaultP2PNetwork network = mockNetwork(); |
||||
final Peer peer = mockPeer(); |
||||
|
||||
network.pendingConnections.put(peer, new CompletableFuture<>()); |
||||
|
||||
network.checkMaintainedConnectionPeers(); |
||||
verify(network, times(0)).connect(peer); |
||||
} |
||||
|
||||
@Test |
||||
public void checkMaintainedConnectionPeersDoesntReconnectConnectedPeers() { |
||||
final DefaultP2PNetwork network = spy(network()); |
||||
final Peer peer = mockPeer(); |
||||
verify(network, never()).connect(peer); |
||||
assertThat(network.addMaintainConnectionPeer(peer)).isTrue(); |
||||
verify(network, times(1)).connect(peer); |
||||
|
||||
{ |
||||
final CompletableFuture<PeerConnection> connection; |
||||
connection = network.pendingConnections.remove(peer); |
||||
assertThat(connection).isNotNull(); |
||||
assertThat(connection.cancel(true)).isTrue(); |
||||
} |
||||
|
||||
{ |
||||
final PeerConnection peerConnection = mockPeerConnection(peer.getId()); |
||||
network.connections.registerConnection(peerConnection); |
||||
network.checkMaintainedConnectionPeers(); |
||||
verify(network, times(1)).connect(peer); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void shouldSendClientQuittingWhenNetworkStops() { |
||||
final P2PNetwork network = network(); |
||||
final Peer peer = mockPeer(); |
||||
final PeerConnection peerConnection = mockPeerConnection(); |
||||
|
||||
network.connect(peer).complete(peerConnection); |
||||
network.stop(); |
||||
|
||||
verify(peerConnection).disconnect(eq(DisconnectReason.CLIENT_QUITTING)); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldntAttemptNewConnectionToPendingPeer() { |
||||
final P2PNetwork network = network(); |
||||
final Peer peer = mockPeer(); |
||||
|
||||
final CompletableFuture<PeerConnection> connectingFuture = network.connect(peer); |
||||
assertThat(network.connect(peer)).isEqualTo(connectingFuture); |
||||
} |
||||
|
||||
@Test |
||||
public void whenStartingNetworkWithNodePermissioningShouldSubscribeToBlockAddedEvents() { |
||||
final P2PNetwork network = network(); |
||||
|
||||
network.start(); |
||||
|
||||
verify(blockchain).observeBlockAdded(any()); |
||||
} |
||||
|
||||
@Test |
||||
public void whenBuildingNetworkWithNodePermissioningWithoutBlockchainShouldThrowIllegalState() { |
||||
blockchain = null; |
||||
final Throwable throwable = catchThrowable(this::network); |
||||
assertThat(throwable) |
||||
.isInstanceOf(IllegalStateException.class) |
||||
.hasMessage( |
||||
"Network permissioning needs to listen to BlockAddedEvents. Blockchain can't be null."); |
||||
} |
||||
|
||||
@Test |
||||
public void whenStoppingNetworkWithNodePermissioningShouldUnsubscribeBlockAddedEvents() { |
||||
final P2PNetwork network = network(); |
||||
|
||||
network.start(); |
||||
network.stop(); |
||||
|
||||
verify(blockchain).removeObserver(eq(1L)); |
||||
} |
||||
|
||||
@Test |
||||
public void onBlockAddedShouldCheckPermissionsForAllPeers() { |
||||
final BlockAddedEvent blockAddedEvent = blockAddedEvent(); |
||||
final P2PNetwork network = network(); |
||||
final Peer remotePeer1 = mockPeer("127.0.0.2", 30302); |
||||
final Peer remotePeer2 = mockPeer("127.0.0.3", 30303); |
||||
|
||||
final PeerConnection peerConnection1 = mockPeerConnection(remotePeer1); |
||||
final PeerConnection peerConnection2 = mockPeerConnection(remotePeer2); |
||||
|
||||
network.start(); |
||||
network.connect(remotePeer1).complete(peerConnection1); |
||||
network.connect(remotePeer2).complete(peerConnection2); |
||||
|
||||
final BlockAddedObserver blockAddedObserver = observerCaptor.getValue(); |
||||
blockAddedObserver.onBlockAdded(blockAddedEvent, blockchain); |
||||
|
||||
verify(nodePermissioningController, times(2)).isPermitted(any(), any()); |
||||
} |
||||
|
||||
@Test |
||||
public void onBlockAddedAndPeerNotPermittedShouldDisconnect() { |
||||
final BlockAddedEvent blockAddedEvent = blockAddedEvent(); |
||||
final P2PNetwork network = network(); |
||||
|
||||
final Peer permittedPeer = mockPeer("127.0.0.2", 30302); |
||||
final Peer notPermittedPeer = mockPeer("127.0.0.3", 30303); |
||||
|
||||
final PeerConnection permittedPeerConnection = mockPeerConnection(permittedPeer); |
||||
final PeerConnection notPermittedPeerConnection = mockPeerConnection(notPermittedPeer); |
||||
|
||||
final EnodeURL permittedEnodeURL = EnodeURL.fromString(permittedPeer.getEnodeURLString()); |
||||
final EnodeURL notPermittedEnodeURL = EnodeURL.fromString(notPermittedPeer.getEnodeURLString()); |
||||
|
||||
network.start(); |
||||
network.connect(permittedPeer).complete(permittedPeerConnection); |
||||
network.connect(notPermittedPeer).complete(notPermittedPeerConnection); |
||||
|
||||
reset(nodePermissioningController); |
||||
|
||||
lenient() |
||||
.when(nodePermissioningController.isPermitted(any(), enodeEq(notPermittedEnodeURL))) |
||||
.thenReturn(false); |
||||
lenient() |
||||
.when(nodePermissioningController.isPermitted(any(), enodeEq(permittedEnodeURL))) |
||||
.thenReturn(true); |
||||
|
||||
final BlockAddedObserver blockAddedObserver = observerCaptor.getValue(); |
||||
blockAddedObserver.onBlockAdded(blockAddedEvent, blockchain); |
||||
|
||||
verify(notPermittedPeerConnection).disconnect(eq(DisconnectReason.REQUESTED)); |
||||
verify(permittedPeerConnection, never()).disconnect(any()); |
||||
} |
||||
|
||||
@Test |
||||
public void removePeerReturnsTrueIfNodeWasInMaintaineConnectionsAndDisconnectsIfInPending() { |
||||
final DefaultP2PNetwork network = network(); |
||||
network.start(); |
||||
|
||||
final Peer remotePeer = mockPeer("127.0.0.2", 30302); |
||||
final PeerConnection peerConnection = mockPeerConnection(remotePeer); |
||||
|
||||
network.addMaintainConnectionPeer(remotePeer); |
||||
assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isTrue(); |
||||
assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue(); |
||||
assertThat(network.removeMaintainedConnectionPeer(remotePeer)).isTrue(); |
||||
assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse(); |
||||
|
||||
// Note: The pendingConnection future is not removed.
|
||||
assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue(); |
||||
|
||||
// Complete the connection, and ensure "disconnect is automatically called.
|
||||
network.pendingConnections.get(remotePeer).complete(peerConnection); |
||||
verify(peerConnection).disconnect(DisconnectReason.REQUESTED); |
||||
} |
||||
|
||||
@Test |
||||
public void removePeerReturnsFalseIfNotInMaintainedListButDisconnectsPeer() { |
||||
final DefaultP2PNetwork network = network(); |
||||
network.start(); |
||||
|
||||
final Peer remotePeer = mockPeer("127.0.0.2", 30302); |
||||
final PeerConnection peerConnection = mockPeerConnection(remotePeer); |
||||
|
||||
CompletableFuture<PeerConnection> future = network.connect(remotePeer); |
||||
|
||||
assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse(); |
||||
assertThat(network.pendingConnections.containsKey(remotePeer)).isTrue(); |
||||
future.complete(peerConnection); |
||||
assertThat(network.pendingConnections.containsKey(remotePeer)).isFalse(); |
||||
|
||||
assertThat(network.removeMaintainedConnectionPeer(remotePeer)).isFalse(); |
||||
assertThat(network.peerMaintainConnectionList.contains(remotePeer)).isFalse(); |
||||
|
||||
verify(peerConnection).disconnect(DisconnectReason.REQUESTED); |
||||
} |
||||
|
||||
@Test |
||||
public void beforeStartingNetworkEnodeURLShouldNotBePresent() { |
||||
final P2PNetwork network = mockNetwork(); |
||||
|
||||
assertThat(network.getLocalEnode()).isNotPresent(); |
||||
} |
||||
|
||||
@Test |
||||
public void afterStartingNetworkEnodeURLShouldBePresent() { |
||||
final P2PNetwork network = mockNetwork(); |
||||
network.start(); |
||||
|
||||
assertThat(network.getLocalEnode()).isPresent(); |
||||
} |
||||
|
||||
@Test |
||||
public void handlePeerBondedEvent_forPeerWithNoTcpPort() { |
||||
final DefaultP2PNetwork network = mockNetwork(); |
||||
DiscoveryPeer peer = |
||||
new DiscoveryPeer(generatePeerId(0), "127.0.0.1", 999, OptionalInt.empty()); |
||||
PeerBondedEvent peerBondedEvent = new PeerBondedEvent(peer, System.currentTimeMillis()); |
||||
|
||||
network.handlePeerBondedEvent().accept(peerBondedEvent); |
||||
verify(network, times(1)).connect(peer); |
||||
} |
||||
|
||||
@Test |
||||
public void attemptPeerConnections_connectsToValidPeer() { |
||||
final int maxPeers = 5; |
||||
final DefaultP2PNetwork network = |
||||
mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers)); |
||||
|
||||
doReturn(2).when(network).connectionCount(); |
||||
DiscoveryPeer peer = createDiscoveryPeer(0); |
||||
peer.setStatus(PeerDiscoveryStatus.BONDED); |
||||
|
||||
doReturn(Stream.of(peer)).when(network).getDiscoveredPeers(); |
||||
ArgumentCaptor<DiscoveryPeer> peerCapture = ArgumentCaptor.forClass(DiscoveryPeer.class); |
||||
doReturn(CompletableFuture.completedFuture(mock(PeerConnection.class))) |
||||
.when(network) |
||||
.connect(peerCapture.capture()); |
||||
|
||||
network.attemptPeerConnections(); |
||||
verify(network, times(1)).connect(any()); |
||||
assertThat(peerCapture.getValue()).isEqualTo(peer); |
||||
} |
||||
|
||||
@Test |
||||
public void attemptPeerConnections_ignoresUnbondedPeer() { |
||||
final int maxPeers = 5; |
||||
final DefaultP2PNetwork network = |
||||
mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers)); |
||||
|
||||
doReturn(2).when(network).connectionCount(); |
||||
DiscoveryPeer peer = createDiscoveryPeer(0); |
||||
peer.setStatus(PeerDiscoveryStatus.KNOWN); |
||||
|
||||
doReturn(Stream.of(peer)).when(network).getDiscoveredPeers(); |
||||
|
||||
network.attemptPeerConnections(); |
||||
verify(network, times(0)).connect(any()); |
||||
} |
||||
|
||||
@Test |
||||
public void attemptPeerConnections_ignoresConnectingPeer() { |
||||
final int maxPeers = 5; |
||||
final DefaultP2PNetwork network = |
||||
mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers)); |
||||
|
||||
doReturn(2).when(network).connectionCount(); |
||||
DiscoveryPeer peer = createDiscoveryPeer(0); |
||||
peer.setStatus(PeerDiscoveryStatus.BONDED); |
||||
|
||||
doReturn(true).when(network).isConnecting(peer); |
||||
doReturn(Stream.of(peer)).when(network).getDiscoveredPeers(); |
||||
|
||||
network.attemptPeerConnections(); |
||||
verify(network, times(0)).connect(any()); |
||||
} |
||||
|
||||
@Test |
||||
public void attemptPeerConnections_ignoresConnectedPeer() { |
||||
final int maxPeers = 5; |
||||
final DefaultP2PNetwork network = |
||||
mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers)); |
||||
|
||||
doReturn(2).when(network).connectionCount(); |
||||
DiscoveryPeer peer = createDiscoveryPeer(0); |
||||
peer.setStatus(PeerDiscoveryStatus.BONDED); |
||||
|
||||
doReturn(true).when(network).isConnected(peer); |
||||
doReturn(Stream.of(peer)).when(network).getDiscoveredPeers(); |
||||
|
||||
network.attemptPeerConnections(); |
||||
verify(network, times(0)).connect(any()); |
||||
} |
||||
|
||||
@Test |
||||
public void attemptPeerConnections_withSlotsAvailable() { |
||||
final int maxPeers = 5; |
||||
final DefaultP2PNetwork network = |
||||
mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers)); |
||||
|
||||
doReturn(2).when(network).connectionCount(); |
||||
List<DiscoveryPeer> peers = |
||||
Stream.iterate(1, n -> n + 1) |
||||
.limit(10) |
||||
.map( |
||||
(seed) -> { |
||||
DiscoveryPeer peer = createDiscoveryPeer(seed); |
||||
peer.setStatus(PeerDiscoveryStatus.BONDED); |
||||
return peer; |
||||
}) |
||||
.collect(Collectors.toList()); |
||||
|
||||
doReturn(peers.stream()).when(network).getDiscoveredPeers(); |
||||
ArgumentCaptor<DiscoveryPeer> peerCapture = ArgumentCaptor.forClass(DiscoveryPeer.class); |
||||
doReturn(CompletableFuture.completedFuture(mock(PeerConnection.class))) |
||||
.when(network) |
||||
.connect(peerCapture.capture()); |
||||
|
||||
network.attemptPeerConnections(); |
||||
verify(network, times(3)).connect(any()); |
||||
assertThat(peers.containsAll(peerCapture.getAllValues())).isTrue(); |
||||
} |
||||
|
||||
@Test |
||||
public void attemptPeerConnections_withNoSlotsAvailable() { |
||||
final int maxPeers = 5; |
||||
final DefaultP2PNetwork network = |
||||
mockNetwork(() -> RlpxConfiguration.create().setMaxPeers(maxPeers)); |
||||
|
||||
doReturn(maxPeers).when(network).connectionCount(); |
||||
List<DiscoveryPeer> peers = |
||||
Stream.iterate(1, n -> n + 1) |
||||
.limit(10) |
||||
.map( |
||||
(seed) -> { |
||||
DiscoveryPeer peer = createDiscoveryPeer(seed); |
||||
peer.setStatus(PeerDiscoveryStatus.BONDED); |
||||
return peer; |
||||
}) |
||||
.collect(Collectors.toList()); |
||||
|
||||
lenient().doReturn(peers.stream()).when(network).getDiscoveredPeers(); |
||||
|
||||
network.attemptPeerConnections(); |
||||
verify(network, times(0)).connect(any()); |
||||
} |
||||
|
||||
private DiscoveryPeer createDiscoveryPeer(final int seed) { |
||||
return new DiscoveryPeer(generatePeerId(seed), "127.0.0.1", 999, OptionalInt.empty()); |
||||
} |
||||
|
||||
private BytesValue generatePeerId(final int seed) { |
||||
BlockDataGenerator gen = new BlockDataGenerator(seed); |
||||
return gen.bytesValue(DefaultPeer.PEER_ID_SIZE); |
||||
} |
||||
|
||||
private BlockAddedEvent blockAddedEvent() { |
||||
return mock(BlockAddedEvent.class); |
||||
} |
||||
|
||||
private PeerConnection mockPeerConnection(final BytesValue id) { |
||||
final PeerInfo peerInfo = mock(PeerInfo.class); |
||||
when(peerInfo.getNodeId()).thenReturn(id); |
||||
final PeerConnection peerConnection = mock(PeerConnection.class); |
||||
when(peerConnection.getPeerInfo()).thenReturn(peerInfo); |
||||
return peerConnection; |
||||
} |
||||
|
||||
private PeerConnection mockPeerConnection() { |
||||
return mockPeerConnection(BytesValue.fromHexString("0x00")); |
||||
} |
||||
|
||||
private PeerConnection mockPeerConnection(final Peer remotePeer) { |
||||
final EnodeURL remoteEnode = remotePeer.getEnodeURL(); |
||||
final PeerInfo peerInfo = |
||||
new PeerInfo( |
||||
5, |
||||
"test", |
||||
Arrays.asList(Capability.create("eth", 63)), |
||||
remoteEnode.getListeningPort(), |
||||
remoteEnode.getNodeId()); |
||||
|
||||
final PeerConnection peerConnection = mock(PeerConnection.class); |
||||
when(peerConnection.getRemoteEnode()).thenReturn(remoteEnode); |
||||
when(peerConnection.getPeerInfo()).thenReturn(peerInfo); |
||||
|
||||
return peerConnection; |
||||
} |
||||
|
||||
private DefaultP2PNetwork mockNetwork() { |
||||
return mockNetwork(RlpxConfiguration::create); |
||||
} |
||||
|
||||
private DefaultP2PNetwork mockNetwork(final Supplier<RlpxConfiguration> rlpxConfig) { |
||||
DefaultP2PNetwork network = spy(network(rlpxConfig)); |
||||
lenient().doReturn(new CompletableFuture<>()).when(network).connect(any()); |
||||
return network; |
||||
} |
||||
|
||||
private DefaultP2PNetwork network() { |
||||
return network(RlpxConfiguration::create); |
||||
} |
||||
|
||||
private DefaultP2PNetwork network(final Supplier<RlpxConfiguration> rlpxConfig) { |
||||
final DiscoveryConfiguration noDiscovery = DiscoveryConfiguration.create().setActive(false); |
||||
final NetworkingConfiguration networkingConfiguration = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(noDiscovery) |
||||
.setSupportedProtocols(subProtocol()) |
||||
.setRlpx(rlpxConfig.get().setBindPort(0)); |
||||
|
||||
lenient().when(nodePermissioningController.isPermitted(any(), any())).thenReturn(true); |
||||
|
||||
return (DefaultP2PNetwork) |
||||
builder() |
||||
.config(networkingConfiguration) |
||||
.nodePermissioningController(nodePermissioningController) |
||||
.blockchain(blockchain) |
||||
.build(); |
||||
} |
||||
|
||||
private DefaultP2PNetwork.Builder builder() { |
||||
return DefaultP2PNetwork.builder() |
||||
.vertx(vertx) |
||||
.config(config) |
||||
.keyPair(KeyPair.generate()) |
||||
.peerBlacklist(new PeerBlacklist()) |
||||
.metricsSystem(new NoOpMetricsSystem()) |
||||
.supportedCapabilities(Arrays.asList(Capability.create("eth", 63))); |
||||
} |
||||
|
||||
private Peer mockPeer() { |
||||
return mockPeer( |
||||
SECP256K1.KeyPair.generate().getPublicKey().getEncodedBytes(), "127.0.0.1", 30303); |
||||
} |
||||
|
||||
private Peer mockPeer(final String host, final int port) { |
||||
final BytesValue id = SECP256K1.KeyPair.generate().getPublicKey().getEncodedBytes(); |
||||
return mockPeer(id, host, port); |
||||
} |
||||
|
||||
private Peer mockPeer(final BytesValue id, final String host, final int port) { |
||||
final Endpoint endpoint = new Endpoint(host, port, OptionalInt.of(port)); |
||||
final String enodeURL = |
||||
String.format( |
||||
"enode://%s@%s:%d?discport=%d", |
||||
id.toString().substring(2), |
||||
endpoint.getHost(), |
||||
endpoint.getUdpPort(), |
||||
endpoint.getTcpPort().getAsInt()); |
||||
|
||||
return DefaultPeer.fromURI(enodeURL); |
||||
} |
||||
|
||||
public static class EnodeURLMatcher implements ArgumentMatcher<EnodeURL> { |
||||
|
||||
private final EnodeURL enodeURL; |
||||
|
||||
EnodeURLMatcher(final EnodeURL enodeURL) { |
||||
this.enodeURL = enodeURL; |
||||
} |
||||
|
||||
@Override |
||||
public boolean matches(final EnodeURL argument) { |
||||
if (argument == null) { |
||||
return false; |
||||
} else { |
||||
return enodeURL.getNodeId().equals(argument.getNodeId()) |
||||
&& enodeURL.getIp().equals(argument.getIp()) |
||||
&& enodeURL.getListeningPort().equals(argument.getListeningPort()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private EnodeURL enodeEq(final EnodeURL enodeURL) { |
||||
return argThat(new EnodeURLMatcher(enodeURL)); |
||||
} |
||||
|
||||
private static SubProtocol subProtocol() { |
||||
return subProtocol("eth"); |
||||
} |
||||
|
||||
private static SubProtocol subProtocol(final String name) { |
||||
return new SubProtocol() { |
||||
@Override |
||||
public String getName() { |
||||
return name; |
||||
} |
||||
|
||||
@Override |
||||
public int messageSpace(final int protocolVersion) { |
||||
return 8; |
||||
} |
||||
|
||||
@Override |
||||
public boolean isValidMessageCode(final int protocolVersion, final int code) { |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public String messageName(final int protocolVersion, final int code) { |
||||
return INVALID_MESSAGE_NAME; |
||||
} |
||||
}; |
||||
} |
||||
} |
@ -0,0 +1,163 @@ |
||||
/* |
||||
* Copyright 2018 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.network; |
||||
|
||||
import static java.util.stream.Collectors.toList; |
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertTrue; |
||||
import static tech.pegasys.pantheon.ethereum.p2p.NetworkingTestHelper.configWithRandomPorts; |
||||
|
||||
import tech.pegasys.pantheon.crypto.SECP256K1; |
||||
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.NetworkingConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryServiceException; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; |
||||
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; |
||||
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; |
||||
import tech.pegasys.pantheon.util.enode.EnodeURL; |
||||
|
||||
import java.io.IOException; |
||||
import java.util.Arrays; |
||||
|
||||
import io.vertx.core.Vertx; |
||||
import org.assertj.core.api.Assertions; |
||||
import org.junit.After; |
||||
import org.junit.Test; |
||||
|
||||
public class NetworkingServiceLifecycleTest { |
||||
|
||||
private final Vertx vertx = Vertx.vertx(); |
||||
private final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
private final NetworkingConfiguration config = configWithRandomPorts(); |
||||
|
||||
@After |
||||
public void closeVertx() { |
||||
vertx.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void createP2PNetwork() throws IOException { |
||||
final NetworkingConfiguration config = configWithRandomPorts(); |
||||
try (final P2PNetwork service = builder().build()) { |
||||
service.start(); |
||||
final EnodeURL enode = service.getLocalEnode().get(); |
||||
final int udpPort = enode.getEffectiveDiscoveryPort(); |
||||
final int tcpPort = enode.getListeningPort(); |
||||
|
||||
assertEquals(config.getDiscovery().getAdvertisedHost(), enode.getIp()); |
||||
assertThat(udpPort).isNotZero(); |
||||
assertThat(tcpPort).isNotZero(); |
||||
assertThat(service.getDiscoveredPeers()).hasSize(0); |
||||
} |
||||
} |
||||
|
||||
@Test(expected = IllegalArgumentException.class) |
||||
public void createP2PNetwork_NullHost() throws IOException { |
||||
final NetworkingConfiguration config = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(DiscoveryConfiguration.create().setBindHost(null)); |
||||
try (final P2PNetwork broken = builder().config(config).build()) { |
||||
Assertions.fail("Expected Exception"); |
||||
} |
||||
} |
||||
|
||||
@Test(expected = IllegalArgumentException.class) |
||||
public void createP2PNetwork_InvalidHost() throws IOException { |
||||
final NetworkingConfiguration config = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(DiscoveryConfiguration.create().setBindHost("fake.fake.fake")); |
||||
try (final P2PNetwork broken = builder().config(config).build()) { |
||||
Assertions.fail("Expected Exception"); |
||||
} |
||||
} |
||||
|
||||
@Test(expected = IllegalArgumentException.class) |
||||
public void createP2PNetwork_InvalidPort() throws IOException { |
||||
final NetworkingConfiguration config = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(DiscoveryConfiguration.create().setBindPort(-1)); |
||||
try (final P2PNetwork broken = builder().config(config).build()) { |
||||
Assertions.fail("Expected Exception"); |
||||
} |
||||
} |
||||
|
||||
@Test(expected = NullPointerException.class) |
||||
public void createP2PNetwork_NullKeyPair() throws IOException { |
||||
try (final P2PNetwork broken = builder().config(config).keyPair(null).build()) { |
||||
Assertions.fail("Expected Exception"); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void startStopP2PNetwork() throws IOException { |
||||
try (final P2PNetwork service = builder().build()) { |
||||
service.start(); |
||||
service.stop(); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void startDiscoveryAgentBackToBack() throws IOException { |
||||
try (final P2PNetwork service1 = builder().build(); |
||||
final P2PNetwork service2 = builder().build()) { |
||||
service1.start(); |
||||
service1.stop(); |
||||
service2.start(); |
||||
service2.stop(); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void startDiscoveryPortInUse() throws IOException { |
||||
try (final P2PNetwork service1 = builder().config(config).build()) { |
||||
service1.start(); |
||||
final NetworkingConfiguration config = configWithRandomPorts(); |
||||
config.getDiscovery().setBindPort(service1.getLocalEnode().get().getEffectiveDiscoveryPort()); |
||||
try (final P2PNetwork service2 = builder().config(config).build()) { |
||||
try { |
||||
service2.start(); |
||||
} catch (final Exception e) { |
||||
assertThat(e).hasCauseExactlyInstanceOf(PeerDiscoveryServiceException.class); |
||||
assertThat(e) |
||||
.hasMessageStartingWith( |
||||
"tech.pegasys.pantheon.ethereum.p2p.discovery." |
||||
+ "PeerDiscoveryServiceException: Failed to bind Ethereum UDP discovery listener to 0.0.0.0:"); |
||||
assertThat(e).hasMessageContaining("Address already in use"); |
||||
} finally { |
||||
service1.stop(); |
||||
service2.stop(); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void createP2PNetwork_NoActivePeers() throws IOException { |
||||
try (final P2PNetwork agent = builder().build()) { |
||||
assertTrue(agent.getDiscoveredPeers().collect(toList()).isEmpty()); |
||||
assertEquals(0, agent.getPeers().size()); |
||||
} |
||||
} |
||||
|
||||
private DefaultP2PNetwork.Builder builder() { |
||||
return DefaultP2PNetwork.builder() |
||||
.vertx(vertx) |
||||
.keyPair(keyPair) |
||||
.config(config) |
||||
.peerBlacklist(new PeerBlacklist()) |
||||
.metricsSystem(new NoOpMetricsSystem()) |
||||
.supportedCapabilities(Arrays.asList(Capability.create("eth", 63))); |
||||
} |
||||
} |
@ -0,0 +1,431 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.ethereum.p2p.network; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import tech.pegasys.pantheon.crypto.SECP256K1; |
||||
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; |
||||
import tech.pegasys.pantheon.ethereum.chain.BlockAddedObserver; |
||||
import tech.pegasys.pantheon.ethereum.chain.Blockchain; |
||||
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork; |
||||
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.NetworkingConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.config.RlpxConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.p2p.network.netty.exceptions.IncompatiblePeerException; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.Endpoint; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer; |
||||
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; |
||||
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; |
||||
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; |
||||
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; |
||||
import tech.pegasys.pantheon.ethereum.permissioning.LocalPermissioningConfiguration; |
||||
import tech.pegasys.pantheon.ethereum.permissioning.NodeLocalConfigPermissioningController; |
||||
import tech.pegasys.pantheon.ethereum.permissioning.node.NodePermissioningController; |
||||
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; |
||||
import tech.pegasys.pantheon.util.bytes.BytesValue; |
||||
import tech.pegasys.pantheon.util.enode.EnodeURL; |
||||
|
||||
import java.net.InetAddress; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Path; |
||||
import java.util.Arrays; |
||||
import java.util.Collections; |
||||
import java.util.Optional; |
||||
import java.util.OptionalInt; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import io.vertx.core.Vertx; |
||||
import org.junit.After; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.ArgumentMatcher; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
@RunWith(MockitoJUnitRunner.StrictStubs.class) |
||||
public class P2PNetworkTest { |
||||
|
||||
@Mock private Blockchain blockchain; |
||||
|
||||
private ArgumentCaptor<BlockAddedObserver> observerCaptor = |
||||
ArgumentCaptor.forClass(BlockAddedObserver.class); |
||||
|
||||
private final Vertx vertx = Vertx.vertx(); |
||||
private final NetworkingConfiguration config = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(DiscoveryConfiguration.create().setActive(false)) |
||||
.setSupportedProtocols(subProtocol()) |
||||
.setRlpx(RlpxConfiguration.create().setBindPort(0)); |
||||
|
||||
private final String selfEnodeString = |
||||
"enode://5f8a80d14311c39f35f516fa664deaaaa13e85b2f7493f37f6144d86991ec012937307647bd3b9a82abe2974e1407241d54947bbb39763a4cac9f77166ad92a0@192.168.0.10:1111"; |
||||
private final EnodeURL selfEnode = EnodeURL.fromString(selfEnodeString); |
||||
|
||||
@Before |
||||
public void before() { |
||||
when(blockchain.observeBlockAdded(observerCaptor.capture())).thenReturn(1L); |
||||
} |
||||
|
||||
@After |
||||
public void closeVertx() { |
||||
vertx.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void handshaking() throws Exception { |
||||
final SECP256K1.KeyPair listenKp = SECP256K1.KeyPair.generate(); |
||||
try (final P2PNetwork listener = builder().keyPair(listenKp).build(); |
||||
final P2PNetwork connector = builder().build()) { |
||||
|
||||
listener.start(); |
||||
connector.start(); |
||||
final EnodeURL listenerEnode = listener.getLocalEnode().get(); |
||||
final BytesValue listenId = listenerEnode.getNodeId(); |
||||
final int listenPort = listenerEnode.getListeningPort(); |
||||
|
||||
assertThat( |
||||
connector |
||||
.connect( |
||||
new DefaultPeer( |
||||
listenId, |
||||
new Endpoint( |
||||
InetAddress.getLoopbackAddress().getHostAddress(), |
||||
listenPort, |
||||
OptionalInt.of(listenPort)))) |
||||
.get(30L, TimeUnit.SECONDS) |
||||
.getPeerInfo() |
||||
.getNodeId()) |
||||
.isEqualTo(listenId); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void preventMultipleConnections() throws Exception { |
||||
final SECP256K1.KeyPair listenKp = SECP256K1.KeyPair.generate(); |
||||
try (final P2PNetwork listener = builder().keyPair(listenKp).build(); |
||||
final P2PNetwork connector = builder().build()) { |
||||
|
||||
listener.start(); |
||||
connector.start(); |
||||
final EnodeURL listenerEnode = listener.getLocalEnode().get(); |
||||
final BytesValue listenId = listenerEnode.getNodeId(); |
||||
final int listenPort = listenerEnode.getListeningPort(); |
||||
|
||||
assertThat( |
||||
connector |
||||
.connect( |
||||
new DefaultPeer( |
||||
listenId, |
||||
new Endpoint( |
||||
InetAddress.getLoopbackAddress().getHostAddress(), |
||||
listenPort, |
||||
OptionalInt.of(listenPort)))) |
||||
.get(30L, TimeUnit.SECONDS) |
||||
.getPeerInfo() |
||||
.getNodeId()) |
||||
.isEqualTo(listenId); |
||||
final CompletableFuture<PeerConnection> secondConnectionFuture = |
||||
connector.connect( |
||||
new DefaultPeer( |
||||
listenId, |
||||
new Endpoint( |
||||
InetAddress.getLoopbackAddress().getHostAddress(), |
||||
listenPort, |
||||
OptionalInt.of(listenPort)))); |
||||
assertThatThrownBy(secondConnectionFuture::get) |
||||
.hasCause(new IllegalStateException("Client already connected")); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Tests that max peers setting is honoured and inbound connections that would exceed the limit |
||||
* are correctly disconnected. |
||||
* |
||||
* @throws Exception On Failure |
||||
*/ |
||||
@Test |
||||
public void limitMaxPeers() throws Exception { |
||||
final SECP256K1.KeyPair listenKp = SECP256K1.KeyPair.generate(); |
||||
final int maxPeers = 1; |
||||
final NetworkingConfiguration listenerConfig = |
||||
NetworkingConfiguration.create() |
||||
.setDiscovery(DiscoveryConfiguration.create().setActive(false)) |
||||
.setSupportedProtocols(subProtocol()) |
||||
.setRlpx(RlpxConfiguration.create().setBindPort(0).setMaxPeers(maxPeers)); |
||||
try (final P2PNetwork listener = builder().keyPair(listenKp).config(listenerConfig).build(); |
||||
final P2PNetwork connector1 = builder().build(); |
||||
final P2PNetwork connector2 = builder().build()) { |
||||
|
||||
// Setup listener and first connection
|
||||
listener.start(); |
||||
connector1.start(); |
||||
final EnodeURL listenerEnode = listener.getLocalEnode().get(); |
||||
final BytesValue listenId = listenerEnode.getNodeId(); |
||||
final int listenPort = listenerEnode.getListeningPort(); |
||||
|
||||
final Peer listeningPeer = |
||||
new DefaultPeer( |
||||
listenId, |
||||
new Endpoint( |
||||
InetAddress.getLoopbackAddress().getHostAddress(), |
||||
listenPort, |
||||
OptionalInt.of(listenPort))); |
||||
assertThat( |
||||
connector1 |
||||
.connect(listeningPeer) |
||||
.get(30L, TimeUnit.SECONDS) |
||||
.getPeerInfo() |
||||
.getNodeId()) |
||||
.isEqualTo(listenId); |
||||
|
||||
// Setup second connection and check that connection is not accepted
|
||||
final CompletableFuture<PeerConnection> peerFuture = new CompletableFuture<>(); |
||||
final CompletableFuture<DisconnectReason> reasonFuture = new CompletableFuture<>(); |
||||
connector2.subscribeDisconnect( |
||||
(peerConnection, reason, initiatedByPeer) -> { |
||||
peerFuture.complete(peerConnection); |
||||
reasonFuture.complete(reason); |
||||
}); |
||||
connector2.start(); |
||||
assertThat( |
||||
connector2 |
||||
.connect(listeningPeer) |
||||
.get(30L, TimeUnit.SECONDS) |
||||
.getPeerInfo() |
||||
.getNodeId()) |
||||
.isEqualTo(listenId); |
||||
assertThat(peerFuture.get(30L, TimeUnit.SECONDS).getPeerInfo().getNodeId()) |
||||
.isEqualTo(listenId); |
||||
assertThat(reasonFuture.get(30L, TimeUnit.SECONDS)) |
||||
.isEqualByComparingTo(DisconnectReason.TOO_MANY_PEERS); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void rejectPeerWithNoSharedCaps() throws Exception { |
||||
final SECP256K1.KeyPair listenKp = SECP256K1.KeyPair.generate(); |
||||
|
||||
final SubProtocol subprotocol1 = subProtocol("eth"); |
||||
final Capability cap1 = Capability.create(subprotocol1.getName(), 63); |
||||
final SubProtocol subprotocol2 = subProtocol("oth"); |
||||
final Capability cap2 = Capability.create(subprotocol2.getName(), 63); |
||||
try (final P2PNetwork listener = |
||||
builder().keyPair(listenKp).supportedCapabilities(cap1).build(); |
||||
final P2PNetwork connector = |
||||
builder().keyPair(SECP256K1.KeyPair.generate()).supportedCapabilities(cap2).build()) { |
||||
listener.start(); |
||||
connector.start(); |
||||
final EnodeURL listenerEnode = listener.getLocalEnode().get(); |
||||
final BytesValue listenId = listenerEnode.getNodeId(); |
||||
final int listenPort = listenerEnode.getListeningPort(); |
||||
|
||||
final Peer listenerPeer = |
||||
new DefaultPeer( |
||||
listenId, |
||||
new Endpoint( |
||||
InetAddress.getLoopbackAddress().getHostAddress(), |
||||
listenPort, |
||||
OptionalInt.of(listenPort))); |
||||
final CompletableFuture<PeerConnection> connectFuture = connector.connect(listenerPeer); |
||||
assertThatThrownBy(connectFuture::get).hasCauseInstanceOf(IncompatiblePeerException.class); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void rejectIncomingConnectionFromBlacklistedPeer() throws Exception { |
||||
final PeerBlacklist localBlacklist = new PeerBlacklist(); |
||||
final PeerBlacklist remoteBlacklist = new PeerBlacklist(); |
||||
|
||||
try (final P2PNetwork localNetwork = builder().peerBlacklist(localBlacklist).build(); |
||||
final P2PNetwork remoteNetwork = builder().peerBlacklist(remoteBlacklist).build()) { |
||||
|
||||
localNetwork.start(); |
||||
remoteNetwork.start(); |
||||
|
||||
final EnodeURL localEnode = localNetwork.getLocalEnode().get(); |
||||
final BytesValue localId = localEnode.getNodeId(); |
||||
final int localPort = localEnode.getListeningPort(); |
||||
|
||||
final EnodeURL remoteEnode = remoteNetwork.getLocalEnode().get(); |
||||
final BytesValue remoteId = remoteEnode.getNodeId(); |
||||
final int remotePort = remoteEnode.getListeningPort(); |
||||
|
||||
final Peer localPeer = |
||||
new DefaultPeer( |
||||
localId, |
||||
new Endpoint( |
||||
InetAddress.getLoopbackAddress().getHostAddress(), |
||||
localPort, |
||||
OptionalInt.of(localPort))); |
||||
|
||||
final Peer remotePeer = |
||||
new DefaultPeer( |
||||
remoteId, |
||||
new Endpoint( |
||||
InetAddress.getLoopbackAddress().getHostAddress(), |
||||
remotePort, |
||||
OptionalInt.of(remotePort))); |
||||
|
||||
// Blacklist the remote peer
|
||||
localBlacklist.add(remotePeer); |
||||
|
||||
localNetwork.start(); |
||||
remoteNetwork.start(); |
||||
|
||||
// Setup disconnect listener
|
||||
final CompletableFuture<PeerConnection> peerFuture = new CompletableFuture<>(); |
||||
final CompletableFuture<DisconnectReason> reasonFuture = new CompletableFuture<>(); |
||||
remoteNetwork.subscribeDisconnect( |
||||
(peerConnection, reason, initiatedByPeer) -> { |
||||
peerFuture.complete(peerConnection); |
||||
reasonFuture.complete(reason); |
||||
}); |
||||
|
||||
// Remote connect to local
|
||||
final CompletableFuture<PeerConnection> connectFuture = remoteNetwork.connect(localPeer); |
||||
|
||||
// Check connection is made, and then a disconnect is registered at remote
|
||||
assertThat(connectFuture.get(5L, TimeUnit.SECONDS).getPeerInfo().getNodeId()) |
||||
.isEqualTo(localId); |
||||
assertThat(peerFuture.get(5L, TimeUnit.SECONDS).getPeerInfo().getNodeId()).isEqualTo(localId); |
||||
assertThat(reasonFuture.get(5L, TimeUnit.SECONDS)) |
||||
.isEqualByComparingTo(DisconnectReason.UNKNOWN); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void rejectIncomingConnectionFromNonWhitelistedPeer() throws Exception { |
||||
final LocalPermissioningConfiguration config = LocalPermissioningConfiguration.createDefault(); |
||||
final Path tempFile = Files.createTempFile("test", "test"); |
||||
tempFile.toFile().deleteOnExit(); |
||||
config.setNodePermissioningConfigFilePath(tempFile.toAbsolutePath().toString()); |
||||
|
||||
final NodeLocalConfigPermissioningController localWhitelistController = |
||||
new NodeLocalConfigPermissioningController(config, Collections.emptyList(), selfEnode); |
||||
// turn on whitelisting by adding a different node NOT remote node
|
||||
localWhitelistController.addNode( |
||||
EnodeURL.builder().ipAddress("127.0.0.1").nodeId(Peer.randomId()).build()); |
||||
final NodePermissioningController nodePermissioningController = |
||||
new NodePermissioningController( |
||||
Optional.empty(), Collections.singletonList(localWhitelistController)); |
||||
|
||||
try (final P2PNetwork localNetwork = |
||||
builder() |
||||
.nodePermissioningController(nodePermissioningController) |
||||
.nodeLocalConfigPermissioningController(localWhitelistController) |
||||
.blockchain(blockchain) |
||||
.build(); |
||||
final P2PNetwork remoteNetwork = builder().build()) { |
||||
|
||||
localNetwork.start(); |
||||
remoteNetwork.start(); |
||||
|
||||
final EnodeURL localEnode = localNetwork.getLocalEnode().get(); |
||||
final BytesValue localId = localEnode.getNodeId(); |
||||
final int localPort = localEnode.getListeningPort(); |
||||
|
||||
final Peer localPeer = |
||||
new DefaultPeer( |
||||
localId, |
||||
new Endpoint( |
||||
InetAddress.getLoopbackAddress().getHostAddress(), |
||||
localPort, |
||||
OptionalInt.of(localPort))); |
||||
|
||||
// Setup disconnect listener
|
||||
final CompletableFuture<PeerConnection> peerFuture = new CompletableFuture<>(); |
||||
final CompletableFuture<DisconnectReason> reasonFuture = new CompletableFuture<>(); |
||||
remoteNetwork.subscribeDisconnect( |
||||
(peerConnection, reason, initiatedByPeer) -> { |
||||
peerFuture.complete(peerConnection); |
||||
reasonFuture.complete(reason); |
||||
}); |
||||
|
||||
// Remote connect to local
|
||||
final CompletableFuture<PeerConnection> connectFuture = remoteNetwork.connect(localPeer); |
||||
|
||||
// Check connection is made, and then a disconnect is registered at remote
|
||||
assertThat(connectFuture.get(5L, TimeUnit.SECONDS).getPeerInfo().getNodeId()) |
||||
.isEqualTo(localId); |
||||
assertThat(peerFuture.get(5L, TimeUnit.SECONDS).getPeerInfo().getNodeId()).isEqualTo(localId); |
||||
assertThat(reasonFuture.get(5L, TimeUnit.SECONDS)) |
||||
.isEqualByComparingTo(DisconnectReason.UNKNOWN); |
||||
} |
||||
} |
||||
|
||||
private static SubProtocol subProtocol() { |
||||
return subProtocol("eth"); |
||||
} |
||||
|
||||
private static SubProtocol subProtocol(final String name) { |
||||
return new SubProtocol() { |
||||
@Override |
||||
public String getName() { |
||||
return name; |
||||
} |
||||
|
||||
@Override |
||||
public int messageSpace(final int protocolVersion) { |
||||
return 8; |
||||
} |
||||
|
||||
@Override |
||||
public boolean isValidMessageCode(final int protocolVersion, final int code) { |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public String messageName(final int protocolVersion, final int code) { |
||||
return INVALID_MESSAGE_NAME; |
||||
} |
||||
}; |
||||
} |
||||
|
||||
public static class EnodeURLMatcher implements ArgumentMatcher<EnodeURL> { |
||||
|
||||
private final EnodeURL enodeURL; |
||||
|
||||
EnodeURLMatcher(final EnodeURL enodeURL) { |
||||
this.enodeURL = enodeURL; |
||||
} |
||||
|
||||
@Override |
||||
public boolean matches(final EnodeURL argument) { |
||||
if (argument == null) { |
||||
return false; |
||||
} else { |
||||
return enodeURL.getNodeId().equals(argument.getNodeId()) |
||||
&& enodeURL.getIp().equals(argument.getIp()) |
||||
&& enodeURL.getListeningPort().equals(argument.getListeningPort()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private DefaultP2PNetwork.Builder builder() { |
||||
return DefaultP2PNetwork.builder() |
||||
.vertx(vertx) |
||||
.config(config) |
||||
.keyPair(KeyPair.generate()) |
||||
.peerBlacklist(new PeerBlacklist()) |
||||
.metricsSystem(new NoOpMetricsSystem()) |
||||
.supportedCapabilities(Arrays.asList(Capability.create("eth", 63))); |
||||
} |
||||
} |
Loading…
Reference in new issue