Ibft tracks validating peers for multicasting (#101)

The Ibft consensus mechanism is responsible for sending a variety of
messages to other validating nodes in the network (provided they have
a point-to-point connection to them).

This change tracks which nodes have connected to the IBFT
subprotocol, and provides the functionality to only transmit messages
to nodes which are also validators.
tmohay 6 years ago committed by GitHub
parent 725cdb34c2
commit 2862f54f55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 81
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java
  2. 14
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftProtocolManager.java
  3. 106
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java
  4. 6
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java

@ -0,0 +1,81 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.network;
import tech.pegasys.pantheon.consensus.common.ValidatorProvider;
import tech.pegasys.pantheon.crypto.SECP256K1.PublicKey;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class IbftNetworkPeers {
private static final Logger LOG = LogManager.getLogger();
private static final String PROTOCOL_NAME = "IBF";
private final Map<Address, PeerConnection> peerConnections = Maps.newConcurrentMap();
private final ValidatorProvider validatorProvider;
public IbftNetworkPeers(final ValidatorProvider validatorProvider) {
this.validatorProvider = validatorProvider;
}
public void peerAdded(final PeerConnection newConnection) {
final Address peerAddress = getAddressFrom(newConnection);
peerConnections.put(peerAddress, newConnection);
}
public void peerRemoved(final PeerConnection removedConnection) {
final Address peerAddress = getAddressFrom(removedConnection);
peerConnections.remove(peerAddress);
}
public void multicastToValidators(final MessageData message) {
Collection<Address> validators = validatorProvider.getCurrentValidators();
sendMessageToSpecificAddresses(validators, message);
}
private void sendMessageToSpecificAddresses(
final Collection<Address> recipients, final MessageData message) {
recipients
.stream()
.map(peerConnections::get)
.filter(Objects::nonNull)
.forEach(
connection -> {
try {
connection.sendForProtocol(PROTOCOL_NAME, message);
} catch (PeerNotConnected peerNotConnected) {
LOG.trace("Lost connection to a validator.");
}
});
}
private Address getAddressFrom(final PeerConnection connection) {
final BytesValue peerNodeId = connection.getPeer().getNodeId();
final PublicKey remotePublicKey = PublicKey.create(peerNodeId);
return Util.publicKeyToAddress(remotePublicKey);
}
}

@ -15,6 +15,7 @@ package tech.pegasys.pantheon.consensus.ibft.protocol;
import tech.pegasys.pantheon.consensus.ibft.IbftEvent; import tech.pegasys.pantheon.consensus.ibft.IbftEvent;
import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue; import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftEvents; import tech.pegasys.pantheon.consensus.ibft.IbftEvents;
import tech.pegasys.pantheon.consensus.ibft.network.IbftNetworkPeers;
import tech.pegasys.pantheon.ethereum.p2p.api.Message; import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager; import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager;
@ -31,14 +32,17 @@ public class IbftProtocolManager implements ProtocolManager {
private final IbftEventQueue ibftEventQueue; private final IbftEventQueue ibftEventQueue;
private final Logger LOG = LogManager.getLogger(); private final Logger LOG = LogManager.getLogger();
private final IbftNetworkPeers peers;
/** /**
* Constructor for the ibft protocol manager * Constructor for the ibft protocol manager
* *
* @param ibftEventQueue Entry point into the ibft event processor * @param ibftEventQueue Entry point into the ibft event processor
* @param peers
*/ */
public IbftProtocolManager(final IbftEventQueue ibftEventQueue) { public IbftProtocolManager(final IbftEventQueue ibftEventQueue, final IbftNetworkPeers peers) {
this.ibftEventQueue = ibftEventQueue; this.ibftEventQueue = ibftEventQueue;
this.peers = peers;
} }
@Override @Override
@ -79,13 +83,17 @@ public class IbftProtocolManager implements ProtocolManager {
} }
@Override @Override
public void handleNewConnection(final PeerConnection peerConnection) {} public void handleNewConnection(final PeerConnection peerConnection) {
peers.peerAdded(peerConnection);
}
@Override @Override
public void handleDisconnect( public void handleDisconnect(
final PeerConnection peerConnection, final PeerConnection peerConnection,
final DisconnectReason disconnectReason, final DisconnectReason disconnectReason,
final boolean initiatedByPeer) {} final boolean initiatedByPeer) {
peers.peerRemoved(peerConnection);
}
@Override @Override
public boolean hasSufficientPeers() { public boolean hasSufficientPeers() {

@ -0,0 +1,106 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.network;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.consensus.common.ValidatorProvider;
import tech.pegasys.pantheon.crypto.SECP256K1.PublicKey;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import java.math.BigInteger;
import java.util.List;
import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class IbftNetworkPeersTest {
private final List<Address> validators = Lists.newArrayList();
private final List<PublicKey> publicKeys = Lists.newArrayList();
private final List<PeerConnection> peerConnections = Lists.newArrayList();
@Before
public void setup() {
for (int i = 0; i < 4; i++) {
PublicKey pubKey = PublicKey.create(BigInteger.valueOf(i));
publicKeys.add(pubKey);
final PeerInfo peerInfo = mock(PeerInfo.class);
final PeerConnection peerConnection = mock(PeerConnection.class);
when(peerConnection.getPeer()).thenReturn(peerInfo);
when(peerInfo.getNodeId()).thenReturn(pubKey.getEncodedBytes());
peerConnections.add(peerConnection);
}
}
@Test
public void onlyValidatorsAreSentAMessage() throws PeerNotConnected {
// Only add the first Peer's address to the validators.
validators.add(Util.publicKeyToAddress(publicKeys.get(0)));
ValidatorProvider validatorProvider = mock(ValidatorProvider.class);
when(validatorProvider.getCurrentValidators()).thenReturn(validators);
IbftNetworkPeers peers = new IbftNetworkPeers(validatorProvider);
for (PeerConnection peer : peerConnections) {
peers.peerAdded(peer);
}
MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER);
peers.multicastToValidators(messageToSend);
verify(peerConnections.get(0), times(1)).sendForProtocol("IBF", messageToSend);
verify(peerConnections.get(1), never()).sendForProtocol(any(), any());
verify(peerConnections.get(2), never()).sendForProtocol(any(), any());
verify(peerConnections.get(3), never()).sendForProtocol(any(), any());
}
@Test
public void doesntSendToValidatorsWhichAreNotDirectlyConnected() throws PeerNotConnected {
validators.add(Util.publicKeyToAddress(publicKeys.get(0)));
ValidatorProvider validatorProvider = mock(ValidatorProvider.class);
when(validatorProvider.getCurrentValidators()).thenReturn(validators);
IbftNetworkPeers peers = new IbftNetworkPeers(validatorProvider);
// only add peer connections 1, 2 & 3, none of which should be invoked.
Lists.newArrayList(1, 2, 3).stream().forEach(i -> peers.peerAdded(peerConnections.get(i)));
MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER);
peers.multicastToValidators(messageToSend);
verify(peerConnections.get(0), never()).sendForProtocol(any(), any());
verify(peerConnections.get(1), never()).sendForProtocol(any(), any());
verify(peerConnections.get(2), never()).sendForProtocol(any(), any());
verify(peerConnections.get(3), never()).sendForProtocol(any(), any());
}
}

@ -23,6 +23,7 @@ import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftProcessor; import tech.pegasys.pantheon.consensus.ibft.IbftProcessor;
import tech.pegasys.pantheon.consensus.ibft.IbftStateMachine; import tech.pegasys.pantheon.consensus.ibft.IbftStateMachine;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockMiner; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockMiner;
import tech.pegasys.pantheon.consensus.ibft.network.IbftNetworkPeers;
import tech.pegasys.pantheon.consensus.ibft.protocol.IbftProtocolManager; import tech.pegasys.pantheon.consensus.ibft.protocol.IbftProtocolManager;
import tech.pegasys.pantheon.consensus.ibft.protocol.IbftSubProtocol; import tech.pegasys.pantheon.consensus.ibft.protocol.IbftSubProtocol;
import tech.pegasys.pantheon.consensus.ibftlegacy.IbftProtocolSchedule; import tech.pegasys.pantheon.consensus.ibftlegacy.IbftProtocolSchedule;
@ -185,12 +186,15 @@ public class IbftPantheonController implements PantheonController<IbftContext, I
TransactionPoolFactory.createTransactionPool( TransactionPoolFactory.createTransactionPool(
protocolSchedule, protocolContext, ethProtocolManager.ethContext()); protocolSchedule, protocolContext, ethProtocolManager.ethContext());
final IbftNetworkPeers peers =
new IbftNetworkPeers(protocolContext.getConsensusState().getVoteTally());
return new IbftPantheonController( return new IbftPantheonController(
genesisConfig, genesisConfig,
protocolContext, protocolContext,
ethSubProtocol, ethSubProtocol,
ethProtocolManager, ethProtocolManager,
new IbftProtocolManager(ibftEventQueue), new IbftProtocolManager(ibftEventQueue, peers),
synchronizer, synchronizer,
nodeKeys, nodeKeys,
transactionPool, transactionPool,

Loading…
Cancel
Save