diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java new file mode 100644 index 0000000000..82913d3b99 --- /dev/null +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java @@ -0,0 +1,81 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.network; + +import tech.pegasys.pantheon.consensus.common.ValidatorProvider; +import tech.pegasys.pantheon.crypto.SECP256K1.PublicKey; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.Util; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class IbftNetworkPeers { + + private static final Logger LOG = LogManager.getLogger(); + + private static final String PROTOCOL_NAME = "IBF"; + + private final Map
peerConnections = Maps.newConcurrentMap(); + private final ValidatorProvider validatorProvider; + + public IbftNetworkPeers(final ValidatorProvider validatorProvider) { + this.validatorProvider = validatorProvider; + } + + public void peerAdded(final PeerConnection newConnection) { + final Address peerAddress = getAddressFrom(newConnection); + peerConnections.put(peerAddress, newConnection); + } + + public void peerRemoved(final PeerConnection removedConnection) { + final Address peerAddress = getAddressFrom(removedConnection); + peerConnections.remove(peerAddress); + } + + public void multicastToValidators(final MessageData message) { + Collection validators = validatorProvider.getCurrentValidators(); + sendMessageToSpecificAddresses(validators, message); + } + + private void sendMessageToSpecificAddresses( + final Collection recipients, final MessageData message) { + recipients + .stream() + .map(peerConnections::get) + .filter(Objects::nonNull) + .forEach( + connection -> { + try { + connection.sendForProtocol(PROTOCOL_NAME, message); + } catch (PeerNotConnected peerNotConnected) { + LOG.trace("Lost connection to a validator."); + } + }); + } + + private Address getAddressFrom(final PeerConnection connection) { + final BytesValue peerNodeId = connection.getPeer().getNodeId(); + final PublicKey remotePublicKey = PublicKey.create(peerNodeId); + return Util.publicKeyToAddress(remotePublicKey); + } +} diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftProtocolManager.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftProtocolManager.java index 9e4defa5c3..b9e3db317e 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftProtocolManager.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/protocol/IbftProtocolManager.java @@ -15,6 +15,7 @@ package tech.pegasys.pantheon.consensus.ibft.protocol; import tech.pegasys.pantheon.consensus.ibft.IbftEvent; import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue; import tech.pegasys.pantheon.consensus.ibft.IbftEvents; +import tech.pegasys.pantheon.consensus.ibft.network.IbftNetworkPeers; import tech.pegasys.pantheon.ethereum.p2p.api.Message; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.ProtocolManager; @@ -31,14 +32,17 @@ public class IbftProtocolManager implements ProtocolManager { private final IbftEventQueue ibftEventQueue; private final Logger LOG = LogManager.getLogger(); + private final IbftNetworkPeers peers; /** * Constructor for the ibft protocol manager * * @param ibftEventQueue Entry point into the ibft event processor + * @param peers */ - public IbftProtocolManager(final IbftEventQueue ibftEventQueue) { + public IbftProtocolManager(final IbftEventQueue ibftEventQueue, final IbftNetworkPeers peers) { this.ibftEventQueue = ibftEventQueue; + this.peers = peers; } @Override @@ -79,13 +83,17 @@ public class IbftProtocolManager implements ProtocolManager { } @Override - public void handleNewConnection(final PeerConnection peerConnection) {} + public void handleNewConnection(final PeerConnection peerConnection) { + peers.peerAdded(peerConnection); + } @Override public void handleDisconnect( final PeerConnection peerConnection, final DisconnectReason disconnectReason, - final boolean initiatedByPeer) {} + final boolean initiatedByPeer) { + peers.peerRemoved(peerConnection); + } @Override public boolean hasSufficientPeers() { diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java new file mode 100644 index 0000000000..3d94a9a865 --- /dev/null +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.network; + +import static io.netty.buffer.Unpooled.EMPTY_BUFFER; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import tech.pegasys.pantheon.consensus.common.ValidatorProvider; +import tech.pegasys.pantheon.crypto.SECP256K1.PublicKey; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.Util; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; +import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; +import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; + +import java.math.BigInteger; +import java.util.List; + +import com.google.common.collect.Lists; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class IbftNetworkPeersTest { + + private final List validators = Lists.newArrayList(); + private final List