[PAN-2348] Changes in chain head should trigger new permissioning check for active peers (#1071)

* [PAN-2348] Changes in chain head (new blocks or chain reorgs) should trigger new permissioning check

* [PAN-2348] Changes in chain head (new blocks or chain reorgs) should trigger new permissioning check

* Errorprone

* Fix changes

* Spotless

* PR comments

* Spotless

* Fix test

* Fix javadoc

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Lucas Saldanha 6 years ago committed by GitHub
parent fde3a80f85
commit 1ec0139a30
  1. 89
      ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/NettyP2PNetwork.java
  2. 178
      ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/NettyP2PNetworkTest.java

@ -15,6 +15,8 @@ package tech.pegasys.pantheon.ethereum.p2p.netty;
import static com.google.common.base.Preconditions.checkState;
import tech.pegasys.pantheon.crypto.SECP256K1;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.p2p.PeerNotWhitelistedException;
import tech.pegasys.pantheon.ethereum.p2p.api.DisconnectCallback;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
@ -36,11 +38,13 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController;
import tech.pegasys.pantheon.ethereum.permissioning.node.NodePermissioningController;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.enode.EnodeURL;
import java.net.InetSocketAddress;
import java.util.Collection;
@ -159,6 +163,32 @@ public class NettyP2PNetwork implements P2PNetwork {
private final Optional<NodeWhitelistController> nodeWhitelistController;
private final Optional<NodePermissioningController> nodePermissioningController;
private final Optional<Blockchain> blockchain;
private OptionalLong blockAddedObserverId = OptionalLong.empty();
public NettyP2PNetwork(
final Vertx vertx,
final SECP256K1.KeyPair keyPair,
final NetworkingConfiguration config,
final List<Capability> supportedCapabilities,
final PeerRequirement peerRequirement,
final PeerBlacklist peerBlacklist,
final MetricsSystem metricsSystem,
final Optional<NodeWhitelistController> nodeWhitelistController) {
this(
vertx,
keyPair,
config,
supportedCapabilities,
peerRequirement,
peerBlacklist,
metricsSystem,
nodeWhitelistController,
null,
null);
}
/**
* Creates a peer networking service for production purposes.
*
@ -174,6 +204,8 @@ public class NettyP2PNetwork implements P2PNetwork {
* @param peerRequirement Queried to determine if enough peers are currently connected.
* @param metricsSystem The metrics system to capture metrics with.
* @param nodeWhitelistController Controls the whitelist of nodes to which this node will connect.
* @param nodePermissioningController Controls node permissioning.
* @param blockchain The blockchain to subscribe to BlockAddedEvents.
*/
public NettyP2PNetwork(
final Vertx vertx,
@ -183,7 +215,9 @@ public class NettyP2PNetwork implements P2PNetwork {
final PeerRequirement peerRequirement,
final PeerBlacklist peerBlacklist,
final MetricsSystem metricsSystem,
final Optional<NodeWhitelistController> nodeWhitelistController) {
final Optional<NodeWhitelistController> nodeWhitelistController,
final NodePermissioningController nodePermissioningController,
final Blockchain blockchain) {
connections = new PeerConnectionRegistry(metricsSystem);
this.peerBlacklist = peerBlacklist;
@ -266,6 +300,9 @@ public class NettyP2PNetwork implements P2PNetwork {
} catch (final InterruptedException e) {
throw new RuntimeException("Interrupted before startup completed", e);
}
this.nodePermissioningController = Optional.ofNullable(nodePermissioningController);
this.blockchain = Optional.ofNullable(blockchain);
}
private Supplier<Integer> pendingTaskCounter(final EventLoopGroup eventLoopGroup) {
@ -473,6 +510,20 @@ public class NettyP2PNetwork implements P2PNetwork {
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent()));
peerDroppedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerDroppedEvents(handlePeerDroppedEvents()));
if (nodePermissioningController.isPresent()) {
if (blockchain.isPresent()) {
synchronized (this) {
if (!blockAddedObserverId.isPresent()) {
blockAddedObserverId =
OptionalLong.of(blockchain.get().observeBlockAdded(this::handleBlockAddedEvent));
}
}
} else {
throw new IllegalStateException(
"NettyP2PNetwork permissioning needs to listen to BlockAddedEvents. Blockchain can't be null.");
}
}
}
private Consumer<PeerBondedEvent> handlePeerBondedEvent() {
@ -497,6 +548,40 @@ public class NettyP2PNetwork implements P2PNetwork {
};
}
private synchronized void handleBlockAddedEvent(
final BlockAddedEvent event, final Blockchain blockchain) {
connections
.getPeerConnections()
.forEach(
peerConnection -> {
if (!isPeerConnectionAllowed(peerConnection)) {
peerConnection.disconnect(DisconnectReason.REQUESTED);
}
});
}
private boolean isPeerConnectionAllowed(final PeerConnection peerConnection) {
LOG.trace(
"Checking if connection with peer {} is permitted", peerConnection.getPeer().getNodeId());
final EnodeURL localPeerEnodeURL =
peerInfoToEnodeURL(ourPeerInfo, (InetSocketAddress) peerConnection.getLocalAddress());
final EnodeURL remotePeerEnodeURL =
peerInfoToEnodeURL(
peerConnection.getPeer(), (InetSocketAddress) peerConnection.getRemoteAddress());
return nodePermissioningController
.map(c -> c.isPermitted(localPeerEnodeURL, remotePeerEnodeURL))
.orElse(true);
}
private EnodeURL peerInfoToEnodeURL(final PeerInfo ourPeerInfo, final InetSocketAddress address) {
final String localNodeId = ourPeerInfo.getNodeId().toString().substring(2);
final String localHostAddress = address.getHostString();
final int localPort = address.getPort();
return new EnodeURL(localNodeId, localHostAddress, localPort);
}
private boolean isConnecting(final Peer peer) {
return pendingConnections.containsKey(peer);
}
@ -513,6 +598,8 @@ public class NettyP2PNetwork implements P2PNetwork {
peerBondedObserverId = OptionalLong.empty();
peerDroppedObserverId.ifPresent(peerDiscoveryAgent::removePeerDroppedObserver);
peerDroppedObserverId = OptionalLong.empty();
blockchain.ifPresent(b -> blockAddedObserverId.ifPresent(b::removeObserver));
blockAddedObserverId = OptionalLong.empty();
peerDiscoveryAgent.stop().join();
workers.shutdownGracefully();
boss.shutdownGracefully();

@ -15,7 +15,12 @@ package tech.pegasys.pantheon.ethereum.p2p;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Java6Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -24,6 +29,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.crypto.SECP256K1;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedObserver;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration;
@ -41,10 +49,13 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController;
import tech.pegasys.pantheon.ethereum.permissioning.PermissioningConfiguration;
import tech.pegasys.pantheon.ethereum.permissioning.node.NodePermissioningController;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.enode.EnodeURL;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
@ -57,16 +68,32 @@ import java.util.concurrent.TimeUnit;
import io.vertx.core.Vertx;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
/** Tests for {@link NettyP2PNetwork}. */
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public final class NettyP2PNetworkTest {
@Mock private NodePermissioningController nodePermissioningController;
@Mock private Blockchain blockchain;
private ArgumentCaptor<BlockAddedObserver> observerCaptor =
ArgumentCaptor.forClass(BlockAddedObserver.class);
private final Vertx vertx = Vertx.vertx();
@Before
public void before() {
when(blockchain.observeBlockAdded(observerCaptor.capture())).thenReturn(1L);
}
@After
public void closeVertx() {
vertx.close();
@ -616,6 +643,96 @@ public final class NettyP2PNetworkTest {
assertThat(nettyP2PNetwork.connect(peer)).isEqualTo(connectingFuture);
}
@Test
public void whenStartingNetworkWithNodePermissioningShouldSubscribeToBlockAddedEvents() {
final NettyP2PNetwork nettyP2PNetwork = mockNettyP2PNetwork();
nettyP2PNetwork.start();
verify(blockchain).observeBlockAdded(any());
}
@Test
public void whenStartingNetworkWithNodePermissioningWithoutBlockchainShouldThrowIllegalState() {
blockchain = null;
final NettyP2PNetwork nettyP2PNetwork = mockNettyP2PNetwork();
final Throwable throwable = catchThrowable(nettyP2PNetwork::start);
assertThat(throwable)
.isInstanceOf(IllegalStateException.class)
.hasMessage(
"NettyP2PNetwork permissioning needs to listen to BlockAddedEvents. Blockchain can't be null.");
}
@Test
public void whenStoppingNetworkWithNodePermissioningShouldUnsubscribeBlockAddedEvents() {
final NettyP2PNetwork nettyP2PNetwork = mockNettyP2PNetwork();
nettyP2PNetwork.start();
nettyP2PNetwork.stop();
verify(blockchain).removeObserver(eq(1L));
}
@Test
public void onBlockAddedShouldCheckPermissionsForAllPeers() {
final BlockAddedEvent blockAddedEvent = blockAddedEvent();
final NettyP2PNetwork nettyP2PNetwork = mockNettyP2PNetwork();
final Peer localPeer = mockPeer("127.0.0.1", 30301);
final Peer remotePeer1 = mockPeer("127.0.0.2", 30302);
final Peer remotePeer2 = mockPeer("127.0.0.3", 30303);
final PeerConnection peerConnection1 = mockPeerConnection(localPeer, remotePeer1);
final PeerConnection peerConnection2 = mockPeerConnection(localPeer, remotePeer2);
nettyP2PNetwork.start();
nettyP2PNetwork.connect(remotePeer1).complete(peerConnection1);
nettyP2PNetwork.connect(remotePeer2).complete(peerConnection2);
final BlockAddedObserver blockAddedObserver = observerCaptor.getValue();
blockAddedObserver.onBlockAdded(blockAddedEvent, blockchain);
verify(nodePermissioningController, times(2)).isPermitted(any(), any());
}
@Test
public void onBlockAddedAndPeerNotPermittedShouldDisconnect() {
final BlockAddedEvent blockAddedEvent = blockAddedEvent();
final NettyP2PNetwork nettyP2PNetwork = mockNettyP2PNetwork();
final Peer localPeer = mockPeer("127.0.0.1", 30301);
final Peer permittedPeer = mockPeer("127.0.0.2", 30302);
final Peer notPermittedPeer = mockPeer("127.0.0.3", 30303);
final PeerConnection permittedPeerConnection = mockPeerConnection(localPeer, permittedPeer);
final PeerConnection notPermittedPeerConnection =
mockPeerConnection(localPeer, notPermittedPeer);
final EnodeURL permittedEnodeURL = new EnodeURL(permittedPeer.getEnodeURI());
final EnodeURL notPermittedEnodeURL = new EnodeURL(notPermittedPeer.getEnodeURI());
nettyP2PNetwork.start();
nettyP2PNetwork.connect(permittedPeer).complete(permittedPeerConnection);
nettyP2PNetwork.connect(notPermittedPeer).complete(notPermittedPeerConnection);
lenient()
.when(nodePermissioningController.isPermitted(any(), enodeEq(notPermittedEnodeURL)))
.thenReturn(false);
lenient()
.when(nodePermissioningController.isPermitted(any(), enodeEq(permittedEnodeURL)))
.thenReturn(true);
final BlockAddedObserver blockAddedObserver = observerCaptor.getValue();
blockAddedObserver.onBlockAdded(blockAddedEvent, blockchain);
verify(notPermittedPeerConnection).disconnect(eq(DisconnectReason.REQUESTED));
verify(permittedPeerConnection, never()).disconnect(any());
}
private BlockAddedEvent blockAddedEvent() {
return mock(BlockAddedEvent.class);
}
private PeerConnection mockPeerConnection(final BytesValue id) {
final PeerInfo peerInfo = mock(PeerInfo.class);
when(peerInfo.getNodeId()).thenReturn(id);
@ -628,6 +745,26 @@ public final class NettyP2PNetworkTest {
return mockPeerConnection(BytesValue.fromHexString("0x00"));
}
private PeerConnection mockPeerConnection(final Peer localPeer, final Peer remotePeer) {
final PeerInfo peerInfo = mock(PeerInfo.class);
doReturn(remotePeer.getId()).when(peerInfo).getNodeId();
final PeerConnection peerConnection = mock(PeerConnection.class);
when(peerConnection.getPeer()).thenReturn(peerInfo);
Endpoint localEndpoint = localPeer.getEndpoint();
InetSocketAddress localSocketAddress =
new InetSocketAddress(localEndpoint.getHost(), localEndpoint.getTcpPort().getAsInt());
when(peerConnection.getLocalAddress()).thenReturn(localSocketAddress);
Endpoint remoteEndpoint = remotePeer.getEndpoint();
InetSocketAddress remoteSocketAddress =
new InetSocketAddress(remoteEndpoint.getHost(), remoteEndpoint.getTcpPort().getAsInt());
when(peerConnection.getRemoteAddress()).thenReturn(remoteSocketAddress);
return peerConnection;
}
private NettyP2PNetwork mockNettyP2PNetwork() {
final DiscoveryConfiguration noDiscovery = DiscoveryConfiguration.create().setActive(false);
final SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate();
@ -646,13 +783,24 @@ public final class NettyP2PNetworkTest {
() -> false,
new PeerBlacklist(),
new NoOpMetricsSystem(),
Optional.empty());
Optional.empty(),
nodePermissioningController,
blockchain);
}
private Peer mockPeer() {
final Peer peer = mock(Peer.class);
return mockPeer(
SECP256K1.KeyPair.generate().getPublicKey().getEncodedBytes(), "127.0.0.1", 30303);
}
private Peer mockPeer(final String host, final int port) {
final BytesValue id = SECP256K1.KeyPair.generate().getPublicKey().getEncodedBytes();
final Endpoint endpoint = new Endpoint("127.0.0.1", 30303, OptionalInt.of(30303));
return mockPeer(id, host, port);
}
private Peer mockPeer(final BytesValue id, final String host, final int port) {
final Peer peer = mock(Peer.class);
final Endpoint endpoint = new Endpoint(host, port, OptionalInt.of(port));
final String enodeURL =
String.format(
"enode://%s@%s:%d?discport=%d",
@ -667,4 +815,28 @@ public final class NettyP2PNetworkTest {
return peer;
}
public static class EnodeURLMatcher implements ArgumentMatcher<EnodeURL> {
private final EnodeURL enodeURL;
EnodeURLMatcher(final EnodeURL enodeURL) {
this.enodeURL = enodeURL;
}
@Override
public boolean matches(final EnodeURL argument) {
if (argument == null) {
return false;
} else {
return enodeURL.getNodeId().equals(argument.getNodeId())
&& enodeURL.getIp().equals(argument.getIp())
&& enodeURL.getListeningPort().equals(argument.getListeningPort());
}
}
}
private EnodeURL enodeEq(final EnodeURL enodeURL) {
return argThat(new EnodeURLMatcher(enodeURL));
}
}

Loading…
Cancel
Save