From 2862f54f55f499e23524a14d5e864ce44ec08ba2 Mon Sep 17 00:00:00 2001 From: tmohay <37158202+rain-on@users.noreply.github.com> Date: Tue, 23 Oct 2018 11:37:14 +1100 Subject: [PATCH] Ibft tracks validating peers for multicasting (#101) The Ibft consensus mechanism is responsible for sending a variety of messages to other validating nodes in the network (provided they have a point-to-point connection to them). This change tracks which nodes have connected to the IBFT subprotocol, and provides the functionality to only transmit messages to nodes which are also validators. --- .../ibft/network/IbftNetworkPeers.java | 81 +++++++++++++ .../ibft/protocol/IbftProtocolManager.java | 14 ++- .../ibft/network/IbftNetworkPeersTest.java | 106 ++++++++++++++++++ .../controller/IbftPantheonController.java | 6 +- 4 files changed, 203 insertions(+), 4 deletions(-) create mode 100644 consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java create mode 100644 consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java new file mode 100644 index 0000000000..82913d3b99 --- /dev/null +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java @@ -0,0 +1,81 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.network; + +import tech.pegasys.pantheon.consensus.common.ValidatorProvider; +import tech.pegasys.pantheon.crypto.SECP256K1.PublicKey; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.Util; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class IbftNetworkPeers { + + private static final Logger LOG = LogManager.getLogger(); + + private static final String PROTOCOL_NAME = "IBF"; + + private final Map peerConnections = Maps.newConcurrentMap(); + private final ValidatorProvider validatorProvider; + + public IbftNetworkPeers(final ValidatorProvider validatorProvider) { + this.validatorProvider = validatorProvider; + } + + public void peerAdded(final PeerConnection newConnection) { + final Address peerAddress = getAddressFrom(newConnection); + peerConnections.put(peerAddress, newConnection); + } + + public void peerRemoved(final PeerConnection removedConnection) { + final Address peerAddress = getAddressFrom(removedConnection); + peerConnections.remove(peerAddress); + } + + public void multicastToValidators(final MessageData message) { + Collection
validators = validatorProvider.getCurrentValidators(); + sendMessageToSpecificAddresses(validators, message); + } + + private void sendMessageToSpecificAddresses( + final Collection
recipients, final MessageData message) { + recipients + .stream() + .map(peerConnections::get) + .filter(Objects::nonNull) + .forEach( + connection -> { + try { + connection.sendForProtocol(PROTOCOL_NAME, message); + } catch (PeerNotConnected peerNotConnected) { + LOG.trace("Lost connection to a validator."); + } + }); + } + + private Address getAddressFrom(final PeerConnection connection) { + final BytesValue peerNodeId = connection.getPeer().getNodeId(); + final PublicKey remotePublicKey = PublicKey.create(peerNodeId); + return Util.publicKeyToAddress(remotePublicKey); + } +} diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftProtocolManager.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftProtocolManager.java index 9e4defa5c3..b9e3db317e 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftProtocolManager.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftProtocolManager.java @@ -15,6 +15,7 @@ package tech.pegasys.pantheon.consensus.ibft.protocol; import tech.pegasys.pantheon.consensus.ibft.IbftEvent; import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue; import tech.pegasys.pantheon.consensus.ibft.IbftEvents; +import tech.pegasys.pantheon.consensus.ibft.network.IbftNetworkPeers; import tech.pegasys.pantheon.ethereum.p2p.api.Message; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager; @@ -31,14 +32,17 @@ public class IbftProtocolManager implements ProtocolManager { private final IbftEventQueue ibftEventQueue; private final Logger LOG = LogManager.getLogger(); + private final IbftNetworkPeers peers; /** * Constructor for the ibft protocol manager * * @param ibftEventQueue Entry point into the ibft event processor + * @param peers */ - public IbftProtocolManager(final IbftEventQueue ibftEventQueue) { + public IbftProtocolManager(final IbftEventQueue ibftEventQueue, final IbftNetworkPeers peers) { this.ibftEventQueue = ibftEventQueue; + this.peers = peers; } @Override @@ -79,13 +83,17 @@ public class IbftProtocolManager implements ProtocolManager { } @Override - public void handleNewConnection(final PeerConnection peerConnection) {} + public void handleNewConnection(final PeerConnection peerConnection) { + peers.peerAdded(peerConnection); + } @Override public void handleDisconnect( final PeerConnection peerConnection, final DisconnectReason disconnectReason, - final boolean initiatedByPeer) {} + final boolean initiatedByPeer) { + peers.peerRemoved(peerConnection); + } @Override public boolean hasSufficientPeers() { diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java new file mode 100644 index 0000000000..3d94a9a865 --- /dev/null +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.network; + +import static io.netty.buffer.Unpooled.EMPTY_BUFFER; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import tech.pegasys.pantheon.consensus.common.ValidatorProvider; +import tech.pegasys.pantheon.crypto.SECP256K1.PublicKey; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.Util; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; +import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; + +import java.math.BigInteger; +import java.util.List; + +import com.google.common.collect.Lists; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class IbftNetworkPeersTest { + + private final List
validators = Lists.newArrayList(); + private final List publicKeys = Lists.newArrayList(); + + private final List peerConnections = Lists.newArrayList(); + + @Before + public void setup() { + for (int i = 0; i < 4; i++) { + PublicKey pubKey = PublicKey.create(BigInteger.valueOf(i)); + publicKeys.add(pubKey); + + final PeerInfo peerInfo = mock(PeerInfo.class); + final PeerConnection peerConnection = mock(PeerConnection.class); + when(peerConnection.getPeer()).thenReturn(peerInfo); + when(peerInfo.getNodeId()).thenReturn(pubKey.getEncodedBytes()); + + peerConnections.add(peerConnection); + } + } + + @Test + public void onlyValidatorsAreSentAMessage() throws PeerNotConnected { + // Only add the first Peer's address to the validators. + validators.add(Util.publicKeyToAddress(publicKeys.get(0))); + ValidatorProvider validatorProvider = mock(ValidatorProvider.class); + when(validatorProvider.getCurrentValidators()).thenReturn(validators); + + IbftNetworkPeers peers = new IbftNetworkPeers(validatorProvider); + for (PeerConnection peer : peerConnections) { + peers.peerAdded(peer); + } + + MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER); + peers.multicastToValidators(messageToSend); + + verify(peerConnections.get(0), times(1)).sendForProtocol("IBF", messageToSend); + verify(peerConnections.get(1), never()).sendForProtocol(any(), any()); + verify(peerConnections.get(2), never()).sendForProtocol(any(), any()); + verify(peerConnections.get(3), never()).sendForProtocol(any(), any()); + } + + @Test + public void doesntSendToValidatorsWhichAreNotDirectlyConnected() throws PeerNotConnected { + validators.add(Util.publicKeyToAddress(publicKeys.get(0))); + + ValidatorProvider validatorProvider = mock(ValidatorProvider.class); + when(validatorProvider.getCurrentValidators()).thenReturn(validators); + + IbftNetworkPeers peers = new IbftNetworkPeers(validatorProvider); + + // only add peer connections 1, 2 & 3, none of which should be invoked. + Lists.newArrayList(1, 2, 3).stream().forEach(i -> peers.peerAdded(peerConnections.get(i))); + + MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER); + peers.multicastToValidators(messageToSend); + + verify(peerConnections.get(0), never()).sendForProtocol(any(), any()); + verify(peerConnections.get(1), never()).sendForProtocol(any(), any()); + verify(peerConnections.get(2), never()).sendForProtocol(any(), any()); + verify(peerConnections.get(3), never()).sendForProtocol(any(), any()); + } +} diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index 6565f9e49d..29591405b1 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -23,6 +23,7 @@ import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue; import tech.pegasys.pantheon.consensus.ibft.IbftProcessor; import tech.pegasys.pantheon.consensus.ibft.IbftStateMachine; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockMiner; +import tech.pegasys.pantheon.consensus.ibft.network.IbftNetworkPeers; import tech.pegasys.pantheon.consensus.ibft.protocol.IbftProtocolManager; import tech.pegasys.pantheon.consensus.ibft.protocol.IbftSubProtocol; import tech.pegasys.pantheon.consensus.ibftlegacy.IbftProtocolSchedule; @@ -185,12 +186,15 @@ public class IbftPantheonController implements PantheonController