Ibft transmitted packets are logged by gossiper (#652)

Messages which originate with the current node are logged in the
gossiper such that if a remote peer sends a packet which originated
from the local back to the local node, it should not go back out
again.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
tmohay 6 years ago committed by GitHub
parent 73f4299b8c
commit 598fd59f21
  1. 16
      consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java
  2. 20
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/Gossiper.java
  3. 47
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftGossip.java
  4. 68
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticaster.java
  5. 19
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java
  6. 114
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftGossipTest.java
  7. 93
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticasterTest.java
  8. 28
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java
  9. 14
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java

@ -25,6 +25,7 @@ import tech.pegasys.pantheon.consensus.common.VoteTally;
import tech.pegasys.pantheon.consensus.common.VoteTallyUpdater; import tech.pegasys.pantheon.consensus.common.VoteTallyUpdater;
import tech.pegasys.pantheon.consensus.ibft.BlockTimer; import tech.pegasys.pantheon.consensus.ibft.BlockTimer;
import tech.pegasys.pantheon.consensus.ibft.EventMultiplexer; import tech.pegasys.pantheon.consensus.ibft.EventMultiplexer;
import tech.pegasys.pantheon.consensus.ibft.Gossiper;
import tech.pegasys.pantheon.consensus.ibft.IbftBlockHashing; import tech.pegasys.pantheon.consensus.ibft.IbftBlockHashing;
import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface; import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface;
import tech.pegasys.pantheon.consensus.ibft.IbftContext; import tech.pegasys.pantheon.consensus.ibft.IbftContext;
@ -34,6 +35,7 @@ import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.IbftHelpers; import tech.pegasys.pantheon.consensus.ibft.IbftHelpers;
import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule; import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer; import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.UniqueMessageMulticaster;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.ProposerSelector; import tech.pegasys.pantheon.consensus.ibft.blockcreation.ProposerSelector;
import tech.pegasys.pantheon.consensus.ibft.payload.MessageFactory; import tech.pegasys.pantheon.consensus.ibft.payload.MessageFactory;
@ -156,7 +158,9 @@ public class TestContextBuilder {
// Use a stubbed version of the multicaster, to prevent creating PeerConnections etc. // Use a stubbed version of the multicaster, to prevent creating PeerConnections etc.
final StubValidatorMulticaster multicaster = new StubValidatorMulticaster(); final StubValidatorMulticaster multicaster = new StubValidatorMulticaster();
final IbftGossip gossiper = useGossip ? new IbftGossip(multicaster) : mock(IbftGossip.class); final UniqueMessageMulticaster uniqueMulticaster = new UniqueMessageMulticaster(multicaster);
final Gossiper gossiper = useGossip ? new IbftGossip(uniqueMulticaster) : mock(Gossiper.class);
final ControllerAndState controllerAndState = final ControllerAndState controllerAndState =
createControllerAndFinalState( createControllerAndFinalState(
@ -219,11 +223,11 @@ public class TestContextBuilder {
private static ControllerAndState createControllerAndFinalState( private static ControllerAndState createControllerAndFinalState(
final MutableBlockchain blockChain, final MutableBlockchain blockChain,
final StubValidatorMulticaster stubbedMulticaster, final StubValidatorMulticaster multicaster,
final KeyPair nodeKeys, final KeyPair nodeKeys,
final Clock clock, final Clock clock,
final IbftEventQueue ibftEventQueue, final IbftEventQueue ibftEventQueue,
final IbftGossip gossiper) { final Gossiper gossiper) {
final WorldStateArchive worldStateArchive = createInMemoryWorldStateArchive(); final WorldStateArchive worldStateArchive = createInMemoryWorldStateArchive();
@ -272,7 +276,7 @@ public class TestContextBuilder {
nodeKeys, nodeKeys,
Util.publicKeyToAddress(nodeKeys.getPublicKey()), Util.publicKeyToAddress(nodeKeys.getPublicKey()),
proposerSelector, proposerSelector,
stubbedMulticaster, multicaster,
new RoundTimer( new RoundTimer(
ibftEventQueue, ROUND_TIMER_SEC * 1000, Executors.newScheduledThreadPool(1)), ibftEventQueue, ROUND_TIMER_SEC * 1000, Executors.newScheduledThreadPool(1)),
new BlockTimer( new BlockTimer(
@ -298,8 +302,8 @@ public class TestContextBuilder {
new IbftRoundFactory( new IbftRoundFactory(
finalState, protocolContext, protocolSchedule, minedBlockObservers), finalState, protocolContext, protocolSchedule, minedBlockObservers),
messageValidatorFactory), messageValidatorFactory),
new HashMap<>(), gossiper,
gossiper); new HashMap<>());
final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController); final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);
//////////////////////////// END IBFT PantheonController //////////////////////////// //////////////////////////// END IBFT PantheonController ////////////////////////////

@ -0,0 +1,20 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
public interface Gossiper {
void send(Message message);
}

@ -20,40 +20,18 @@ import tech.pegasys.pantheon.consensus.ibft.messagedata.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.messagedata.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster; import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.consensus.ibft.payload.SignedData; import tech.pegasys.pantheon.consensus.ibft.payload.SignedData;
import tech.pegasys.pantheon.crypto.SECP256K1.Signature;
import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.p2p.api.Message; import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
/** Class responsible for rebroadcasting IBFT messages to known validators */ /** Class responsible for rebroadcasting IBFT messages to known validators */
public class IbftGossip { public class IbftGossip implements Gossiper {
private final ValidatorMulticaster multicaster;
// Size of the seenMessages cache, should end up utilising 65bytes * this number + some meta data
private final int maxSeenMessages;
// Set that starts evicting members when it hits capacity
private final Set<Signature> seenMessages =
Collections.newSetFromMap(
new LinkedHashMap<Signature, Boolean>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Signature, Boolean> eldest) {
return size() > maxSeenMessages;
}
});
IbftGossip(final ValidatorMulticaster multicaster, final int maxSeenMessages) { private final ValidatorMulticaster multicaster;
this.maxSeenMessages = maxSeenMessages;
this.multicaster = multicaster;
}
/** /**
* Constructor that attaches gossip logic to a set of multicaster * Constructor that attaches gossip logic to a set of multicaster
@ -61,16 +39,16 @@ public class IbftGossip {
* @param multicaster Network connections to the remote validators * @param multicaster Network connections to the remote validators
*/ */
public IbftGossip(final ValidatorMulticaster multicaster) { public IbftGossip(final ValidatorMulticaster multicaster) {
this(multicaster, 10_000); this.multicaster = multicaster;
} }
/** /**
* Retransmit a given IBFT message to other known validators nodes * Retransmit a given IBFT message to other known validators nodes
* *
* @param message The raw message to be gossiped * @param message The raw message to be gossiped
* @return Whether the message was rebroadcast or has been ignored as a repeat
*/ */
public boolean gossipMessage(final Message message) { @Override
public void send(final Message message) {
final MessageData messageData = message.getData(); final MessageData messageData = message.getData();
final SignedData<?> signedData; final SignedData<?> signedData;
switch (messageData.getCode()) { switch (messageData.getCode()) {
@ -93,16 +71,9 @@ public class IbftGossip {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Received message does not conform to any recognised IBFT message structure."); "Received message does not conform to any recognised IBFT message structure.");
} }
final Signature signature = signedData.getSignature(); final List<Address> excludeAddressesList =
if (seenMessages.contains(signature)) { Lists.newArrayList(message.getConnection().getPeer().getAddress(), signedData.getSender());
return false;
} else { multicaster.send(messageData, excludeAddressesList);
final List<Address> excludeAddressesList =
Lists.newArrayList(
message.getConnection().getPeer().getAddress(), signedData.getSender());
multicaster.send(messageData, excludeAddressesList);
seenMessages.add(signature);
return true;
}
} }
} }

@ -0,0 +1,68 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;
import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
public class UniqueMessageMulticaster implements ValidatorMulticaster {
private final int maxSeenMessages;
private final ValidatorMulticaster multicaster;
UniqueMessageMulticaster(final ValidatorMulticaster multicaster, final int maxSeenMessages) {
this.maxSeenMessages = maxSeenMessages;
this.multicaster = multicaster;
}
/**
* Constructor that attaches gossip logic to a set of multicaster
*
* @param multicaster Network connections to the remote validators
*/
public UniqueMessageMulticaster(final ValidatorMulticaster multicaster) {
this(multicaster, 10_000);
}
// Set that starts evicting members when it hits capacity
private final Set<Integer> seenMessages =
Collections.newSetFromMap(
new LinkedHashMap<Integer, Boolean>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Integer, Boolean> eldest) {
return size() > maxSeenMessages;
}
});
@Override
public void send(final MessageData message) {
send(message, Collections.emptyList());
}
@Override
public void send(final MessageData message, final Collection<Address> blackList) {
final int uniqueID = message.hashCode();
if (seenMessages.contains(uniqueID)) {
return;
}
multicaster.send(message, blackList);
seenMessages.add(uniqueID);
}
}

@ -15,6 +15,7 @@ package tech.pegasys.pantheon.consensus.ibft.statemachine;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.Gossiper;
import tech.pegasys.pantheon.consensus.ibft.IbftGossip; import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry; import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent; import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
@ -51,18 +52,14 @@ public class IbftController {
private final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory; private final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory;
private final Map<Long, List<Message>> futureMessages; private final Map<Long, List<Message>> futureMessages;
private BlockHeightManager currentHeightManager; private BlockHeightManager currentHeightManager;
private final IbftGossip gossiper; private final Gossiper gossiper;
public IbftController( public IbftController(
final Blockchain blockchain, final Blockchain blockchain,
final IbftFinalState ibftFinalState, final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory) { final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
this( final IbftGossip gossiper) {
blockchain, this(blockchain, ibftFinalState, ibftBlockHeightManagerFactory, gossiper, Maps.newHashMap());
ibftFinalState,
ibftBlockHeightManagerFactory,
Maps.newHashMap(),
new IbftGossip(ibftFinalState.getValidatorMulticaster()));
} }
@VisibleForTesting @VisibleForTesting
@ -70,8 +67,8 @@ public class IbftController {
final Blockchain blockchain, final Blockchain blockchain,
final IbftFinalState ibftFinalState, final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory, final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final Map<Long, List<Message>> futureMessages, final Gossiper gossiper,
final IbftGossip gossiper) { final Map<Long, List<Message>> futureMessages) {
this.blockchain = blockchain; this.blockchain = blockchain;
this.ibftFinalState = ibftFinalState; this.ibftFinalState = ibftFinalState;
this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory; this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory;
@ -142,7 +139,7 @@ public class IbftController {
signedPayload.getPayload().getMessageType(), signedPayload.getPayload().getMessageType(),
signedPayload); signedPayload);
if (processMessage(signedPayload, message)) { if (processMessage(signedPayload, message)) {
gossiper.gossipMessage(message); gossiper.send(message);
handleMessage.accept(signedPayload); handleMessage.accept(signedPayload);
} }
} }

@ -13,19 +13,14 @@
package tech.pegasys.pantheon.consensus.ibft; package tech.pegasys.pantheon.consensus.ibft;
import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Lists.newArrayList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import tech.pegasys.pantheon.consensus.ibft.messagedata.CommitMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.NewRoundMessageData; import tech.pegasys.pantheon.consensus.ibft.messagedata.NewRoundMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.PrepareMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.ProposalMessageData; import tech.pegasys.pantheon.consensus.ibft.messagedata.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.messagedata.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.messagedata.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.network.MockPeerFactory; import tech.pegasys.pantheon.consensus.ibft.network.MockPeerFactory;
import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster; import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.consensus.ibft.payload.Payload; import tech.pegasys.pantheon.consensus.ibft.payload.Payload;
import tech.pegasys.pantheon.consensus.ibft.payload.ProposalPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.SignedData; import tech.pegasys.pantheon.consensus.ibft.payload.SignedData;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.core.Address;
@ -52,7 +47,7 @@ public class IbftGossipTest {
@Before @Before
public void setup() { public void setup() {
ibftGossip = new IbftGossip(validatorMulticaster, 10); ibftGossip = new IbftGossip(validatorMulticaster);
peerConnection = MockPeerFactory.create(senderAddress); peerConnection = MockPeerFactory.create(senderAddress);
} }
@ -64,131 +59,26 @@ public class IbftGossipTest {
final MessageData messageData = createMessageData.apply(payload); final MessageData messageData = createMessageData.apply(payload);
final Message message = new DefaultMessage(peerConnection, messageData); final Message message = new DefaultMessage(peerConnection, messageData);
final boolean gossipResult = ibftGossip.gossipMessage(message); ibftGossip.send(message);
assertThat(gossipResult).isTrue();
verify(validatorMulticaster) verify(validatorMulticaster)
.send(messageData, newArrayList(senderAddress, payload.getSender())); .send(messageData, newArrayList(senderAddress, payload.getSender()));
} }
private <P extends Payload> void assertRebroadcastOnlyOnce(
final Function<KeyPair, SignedData<P>> createPayload,
final Function<SignedData<P>, MessageData> createMessageData) {
final KeyPair keypair = KeyPair.generate();
final SignedData<P> payload = createPayload.apply(keypair);
final MessageData messageData = createMessageData.apply(payload);
final Message message = new DefaultMessage(peerConnection, messageData);
final boolean gossip1Result = ibftGossip.gossipMessage(message);
final boolean gossip2Result = ibftGossip.gossipMessage(message);
assertThat(gossip1Result).isTrue();
assertThat(gossip2Result).isFalse();
verify(validatorMulticaster, times(1))
.send(messageData, newArrayList(senderAddress, payload.getSender()));
}
@Test @Test
public void assertRebroadcastsProposalToAllExceptSignerAndSender() { public void assertRebroadcastsProposalToAllExceptSignerAndSender() {
assertRebroadcastToAllExceptSignerAndSender( assertRebroadcastToAllExceptSignerAndSender(
TestHelpers::createSignedProposalPayload, ProposalMessageData::create); TestHelpers::createSignedProposalPayload, ProposalMessageData::create);
} }
@Test
public void assertRebroadcastsProposalOnlyOnce() {
assertRebroadcastOnlyOnce(
TestHelpers::createSignedProposalPayload, ProposalMessageData::create);
}
@Test
public void assertRebroadcastsPrepareToAllExceptSignerAndSender() {
assertRebroadcastToAllExceptSignerAndSender(
TestHelpers::createSignedPreparePayload, PrepareMessageData::create);
}
@Test
public void assertRebroadcastsPrepareOnlyOnce() {
assertRebroadcastOnlyOnce(TestHelpers::createSignedPreparePayload, PrepareMessageData::create);
}
@Test
public void assertRebroadcastsCommitToAllExceptSignerAndSender() {
assertRebroadcastToAllExceptSignerAndSender(
TestHelpers::createSignedCommitPayload, CommitMessageData::create);
}
@Test
public void assertRebroadcastsCommitOnlyOnce() {
assertRebroadcastOnlyOnce(TestHelpers::createSignedCommitPayload, CommitMessageData::create);
}
@Test @Test
public void assertRebroadcastsRoundChangeToAllExceptSignerAndSender() { public void assertRebroadcastsRoundChangeToAllExceptSignerAndSender() {
assertRebroadcastToAllExceptSignerAndSender( assertRebroadcastToAllExceptSignerAndSender(
TestHelpers::createSignedRoundChangePayload, RoundChangeMessageData::create); TestHelpers::createSignedRoundChangePayload, RoundChangeMessageData::create);
} }
@Test
public void assertRebroadcastsRoundChangeOnlyOnce() {
assertRebroadcastOnlyOnce(
TestHelpers::createSignedRoundChangePayload, RoundChangeMessageData::create);
}
@Test @Test
public void assertRebroadcastsNewRoundToAllExceptSignerAndSender() { public void assertRebroadcastsNewRoundToAllExceptSignerAndSender() {
assertRebroadcastToAllExceptSignerAndSender( assertRebroadcastToAllExceptSignerAndSender(
TestHelpers::createSignedNewRoundPayload, NewRoundMessageData::create); TestHelpers::createSignedNewRoundPayload, NewRoundMessageData::create);
} }
@Test
public void assertRebroadcastsNewRoundOnlyOnce() {
assertRebroadcastOnlyOnce(
TestHelpers::createSignedNewRoundPayload, NewRoundMessageData::create);
}
@Test
public void evictMessageRecordAtCapacity() {
final KeyPair keypair = KeyPair.generate();
final SignedData<ProposalPayload> payload =
TestHelpers.createSignedProposalPayloadWithRound(keypair, 0);
final MessageData messageData = ProposalMessageData.create(payload);
final Message message = new DefaultMessage(peerConnection, messageData);
final boolean gossip1Result = ibftGossip.gossipMessage(message);
final boolean gossip2Result = ibftGossip.gossipMessage(message);
assertThat(gossip1Result).isTrue();
assertThat(gossip2Result).isFalse();
verify(validatorMulticaster, times(1))
.send(messageData, newArrayList(senderAddress, payload.getSender()));
for (int i = 1; i <= 9; i++) {
final SignedData<ProposalPayload> nextPayload =
TestHelpers.createSignedProposalPayloadWithRound(keypair, i);
final MessageData nextMessageData = ProposalMessageData.create(nextPayload);
final Message nextMessage = new DefaultMessage(peerConnection, nextMessageData);
final boolean nextGossipResult = ibftGossip.gossipMessage(nextMessage);
assertThat(nextGossipResult).isTrue();
}
final boolean gossip3Result = ibftGossip.gossipMessage(message);
assertThat(gossip3Result).isFalse();
verify(validatorMulticaster, times(1))
.send(messageData, newArrayList(senderAddress, payload.getSender()));
{
final SignedData<ProposalPayload> nextPayload =
TestHelpers.createSignedProposalPayloadWithRound(keypair, 10);
final MessageData nextMessageData = ProposalMessageData.create(nextPayload);
final Message nextMessage = new DefaultMessage(peerConnection, nextMessageData);
final boolean nextGossipResult = ibftGossip.gossipMessage(nextMessage);
assertThat(nextGossipResult).isTrue();
}
final boolean gossip4Result = ibftGossip.gossipMessage(message);
assertThat(gossip4Result).isTrue();
verify(validatorMulticaster, times(2))
.send(messageData, newArrayList(senderAddress, payload.getSender()));
final boolean gossip5Result = ibftGossip.gossipMessage(message);
assertThat(gossip5Result).isFalse();
verify(validatorMulticaster, times(2))
.send(messageData, newArrayList(senderAddress, payload.getSender()));
}
} }

@ -0,0 +1,93 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;
import static java.util.Collections.emptyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.AddressHelpers;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.util.List;
import com.google.common.collect.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class UniqueMessageMulticasterTest {
private final ValidatorMulticaster multicaster = mock(ValidatorMulticaster.class);
private final UniqueMessageMulticaster messageTracker =
new UniqueMessageMulticaster(multicaster, 5);
private final RawMessage messageSent = new RawMessage(5, BytesValue.wrap(new byte[5]));
@Test
public void previouslySentMessageIsNotSentAgain() {
messageTracker.send(messageSent);
verify(multicaster, times(1)).send(messageSent, emptyList());
reset(multicaster);
messageTracker.send(messageSent);
messageTracker.send(messageSent, emptyList());
verifyZeroInteractions(multicaster);
}
@Test
public void messagesSentWithABlackListAreNotRetransmitted() {
messageTracker.send(messageSent, emptyList());
verify(multicaster, times(1)).send(messageSent, emptyList());
reset(multicaster);
messageTracker.send(messageSent, emptyList());
messageTracker.send(messageSent);
verifyZeroInteractions(multicaster);
}
@Test
public void oldMessagesAreEvictedWhenFullAndCanThenBeRetransmitted() {
final List<MessageData> messagesSent = Lists.newArrayList();
for (int i = 0; i < 6; i++) {
final RawMessage msg = new RawMessage(i, BytesValue.wrap(new byte[i]));
messagesSent.add(msg);
messageTracker.send(msg);
verify(multicaster, times(1)).send(msg, emptyList());
}
reset(multicaster);
messageTracker.send(messagesSent.get(5));
verifyZeroInteractions(multicaster);
messageTracker.send(messagesSent.get(0));
verify(multicaster, times(1)).send(messagesSent.get(0), emptyList());
}
@Test
public void passedInBlackListIsPassedToUnderlyingValidator() {
List<Address> blackList =
Lists.newArrayList(AddressHelpers.ofValue(0), AddressHelpers.ofValue(1));
messageTracker.send(messageSent, blackList);
verify(multicaster, times(1)).send(messageSent, blackList);
}
}

@ -108,7 +108,7 @@ public class IbftControllerTest {
when(ibftFinalState.getValidators()).thenReturn(ImmutableList.of(validator)); when(ibftFinalState.getValidators()).thenReturn(ImmutableList.of(validator));
ibftController = ibftController =
new IbftController( new IbftController(
blockChain, ibftFinalState, blockHeightManagerFactory, futureMessages, ibftGossip); blockChain, ibftFinalState, blockHeightManagerFactory, ibftGossip, futureMessages);
when(chainHeadBlockHeader.getNumber()).thenReturn(1L); when(chainHeadBlockHeader.getNumber()).thenReturn(1L);
when(chainHeadBlockHeader.getHash()).thenReturn(Hash.ZERO); when(chainHeadBlockHeader.getHash()).thenReturn(Hash.ZERO);
@ -151,11 +151,11 @@ public class IbftControllerTest {
verify(blockHeightManager).start(); verify(blockHeightManager).start();
verify(blockHeightManager, never()).handleProposalPayload(signedProposal); verify(blockHeightManager, never()).handleProposalPayload(signedProposal);
verify(blockHeightManager).handlePreparePayload(signedPrepare); verify(blockHeightManager).handlePreparePayload(signedPrepare);
verify(ibftGossip).gossipMessage(prepareMessage); verify(ibftGossip).send(prepareMessage);
verify(blockHeightManager).handleCommitPayload(signedCommit); verify(blockHeightManager).handleCommitPayload(signedCommit);
verify(ibftGossip).gossipMessage(commitMessage); verify(ibftGossip).send(commitMessage);
verify(blockHeightManager).handleRoundChangePayload(signedRoundChange); verify(blockHeightManager).handleRoundChangePayload(signedRoundChange);
verify(ibftGossip).gossipMessage(roundChangeMessage); verify(ibftGossip).send(roundChangeMessage);
verify(blockHeightManager, never()).handleNewRoundPayload(signedNewRound); verify(blockHeightManager, never()).handleNewRoundPayload(signedNewRound);
} }
@ -181,15 +181,15 @@ public class IbftControllerTest {
verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager, atLeastOnce()).getChainHeight();
verify(blockHeightManager, times(2)).start(); // once at beginning, and again on newChainHead. verify(blockHeightManager, times(2)).start(); // once at beginning, and again on newChainHead.
verify(blockHeightManager).handleProposalPayload(signedProposal); verify(blockHeightManager).handleProposalPayload(signedProposal);
verify(ibftGossip).gossipMessage(proposalMessage); verify(ibftGossip).send(proposalMessage);
verify(blockHeightManager).handlePreparePayload(signedPrepare); verify(blockHeightManager).handlePreparePayload(signedPrepare);
verify(ibftGossip).gossipMessage(prepareMessage); verify(ibftGossip).send(prepareMessage);
verify(blockHeightManager).handleCommitPayload(signedCommit); verify(blockHeightManager).handleCommitPayload(signedCommit);
verify(ibftGossip).gossipMessage(commitMessage); verify(ibftGossip).send(commitMessage);
verify(blockHeightManager).handleRoundChangePayload(signedRoundChange); verify(blockHeightManager).handleRoundChangePayload(signedRoundChange);
verify(ibftGossip).gossipMessage(roundChangeMessage); verify(ibftGossip).send(roundChangeMessage);
verify(blockHeightManager).handleNewRoundPayload(signedNewRound); verify(blockHeightManager).handleNewRoundPayload(signedNewRound);
verify(ibftGossip).gossipMessage(newRoundMessage); verify(ibftGossip).send(newRoundMessage);
} }
@Test @Test
@ -239,7 +239,7 @@ public class IbftControllerTest {
assertThat(futureMessages).isEmpty(); assertThat(futureMessages).isEmpty();
verify(blockHeightManager).handleProposalPayload(signedProposal); verify(blockHeightManager).handleProposalPayload(signedProposal);
verify(ibftGossip).gossipMessage(proposalMessage); verify(ibftGossip).send(proposalMessage);
verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager, atLeastOnce()).getChainHeight();
verify(blockHeightManager).start(); verify(blockHeightManager).start();
verifyNoMoreInteractions(blockHeightManager); verifyNoMoreInteractions(blockHeightManager);
@ -253,7 +253,7 @@ public class IbftControllerTest {
assertThat(futureMessages).isEmpty(); assertThat(futureMessages).isEmpty();
verify(blockHeightManager).handlePreparePayload(signedPrepare); verify(blockHeightManager).handlePreparePayload(signedPrepare);
verify(ibftGossip).gossipMessage(prepareMessage); verify(ibftGossip).send(prepareMessage);
verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager, atLeastOnce()).getChainHeight();
verify(blockHeightManager).start(); verify(blockHeightManager).start();
verifyNoMoreInteractions(blockHeightManager); verifyNoMoreInteractions(blockHeightManager);
@ -267,7 +267,7 @@ public class IbftControllerTest {
assertThat(futureMessages).isEmpty(); assertThat(futureMessages).isEmpty();
verify(blockHeightManager).handleCommitPayload(signedCommit); verify(blockHeightManager).handleCommitPayload(signedCommit);
verify(ibftGossip).gossipMessage(commitMessage); verify(ibftGossip).send(commitMessage);
verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager, atLeastOnce()).getChainHeight();
verify(blockHeightManager).start(); verify(blockHeightManager).start();
verifyNoMoreInteractions(blockHeightManager); verifyNoMoreInteractions(blockHeightManager);
@ -282,7 +282,7 @@ public class IbftControllerTest {
assertThat(futureMessages).isEmpty(); assertThat(futureMessages).isEmpty();
verify(blockHeightManager).handleNewRoundPayload(signedNewRound); verify(blockHeightManager).handleNewRoundPayload(signedNewRound);
verify(ibftGossip).gossipMessage(newRoundMessage); verify(ibftGossip).send(newRoundMessage);
verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager, atLeastOnce()).getChainHeight();
verify(blockHeightManager).start(); verify(blockHeightManager).start();
verifyNoMoreInteractions(blockHeightManager); verifyNoMoreInteractions(blockHeightManager);
@ -297,7 +297,7 @@ public class IbftControllerTest {
assertThat(futureMessages).isEmpty(); assertThat(futureMessages).isEmpty();
verify(blockHeightManager).handleRoundChangePayload(signedRoundChange); verify(blockHeightManager).handleRoundChangePayload(signedRoundChange);
verify(ibftGossip).gossipMessage(roundChangeMessage); verify(ibftGossip).send(roundChangeMessage);
verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager, atLeastOnce()).getChainHeight();
verify(blockHeightManager).start(); verify(blockHeightManager).start();
verifyNoMoreInteractions(blockHeightManager); verifyNoMoreInteractions(blockHeightManager);

@ -26,9 +26,11 @@ import tech.pegasys.pantheon.consensus.ibft.EventMultiplexer;
import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface; import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface;
import tech.pegasys.pantheon.consensus.ibft.IbftContext; import tech.pegasys.pantheon.consensus.ibft.IbftContext;
import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue; import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.IbftProcessor; import tech.pegasys.pantheon.consensus.ibft.IbftProcessor;
import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule; import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer; import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.UniqueMessageMulticaster;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftMiningCoordinator; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftMiningCoordinator;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.ProposerSelector; import tech.pegasys.pantheon.consensus.ibft.blockcreation.ProposerSelector;
@ -96,7 +98,6 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
private final IbftProtocolManager ibftProtocolManager; private final IbftProtocolManager ibftProtocolManager;
private final KeyPair keyPair; private final KeyPair keyPair;
private final TransactionPool transactionPool; private final TransactionPool transactionPool;
private final IbftProcessor ibftProcessor;
private final Runnable closer; private final Runnable closer;
IbftPantheonController( IbftPantheonController(
@ -108,7 +109,6 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
final Synchronizer synchronizer, final Synchronizer synchronizer,
final KeyPair keyPair, final KeyPair keyPair,
final TransactionPool transactionPool, final TransactionPool transactionPool,
final IbftProcessor ibftProcessor,
final Runnable closer) { final Runnable closer) {
this.protocolSchedule = protocolSchedule; this.protocolSchedule = protocolSchedule;
@ -119,7 +119,6 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
this.synchronizer = synchronizer; this.synchronizer = synchronizer;
this.keyPair = keyPair; this.keyPair = keyPair;
this.transactionPool = transactionPool; this.transactionPool = transactionPool;
this.ibftProcessor = ibftProcessor;
this.closer = closer; this.closer = closer;
} }
@ -202,17 +201,20 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
new ProposerSelector(blockchain, voteTally, blockInterface, true); new ProposerSelector(blockchain, voteTally, blockInterface, true);
final ValidatorPeers peers = final ValidatorPeers peers =
new ValidatorPeers(protocolContext.getConsensusState().getVoteTally()); new ValidatorPeers(protocolContext.getConsensusState().getVoteTally());
final UniqueMessageMulticaster uniqueMessageMulticaster = new UniqueMessageMulticaster(peers);
final Subscribers<MinedBlockObserver> minedBlockObservers = new Subscribers<>(); final Subscribers<MinedBlockObserver> minedBlockObservers = new Subscribers<>();
minedBlockObservers.subscribe(ethProtocolManager); minedBlockObservers.subscribe(ethProtocolManager);
final IbftGossip gossiper = new IbftGossip(peers);
final IbftFinalState finalState = final IbftFinalState finalState =
new IbftFinalState( new IbftFinalState(
voteTally, voteTally,
nodeKeys, nodeKeys,
Util.publicKeyToAddress(nodeKeys.getPublicKey()), Util.publicKeyToAddress(nodeKeys.getPublicKey()),
proposerSelector, proposerSelector,
peers, uniqueMessageMulticaster,
new RoundTimer( new RoundTimer(
ibftEventQueue, ibftEventQueue,
ibftConfig.getRequestTimeoutSeconds(), ibftConfig.getRequestTimeoutSeconds(),
@ -237,7 +239,8 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
finalState, finalState,
new IbftRoundFactory( new IbftRoundFactory(
finalState, protocolContext, protocolSchedule, minedBlockObservers), finalState, protocolContext, protocolSchedule, minedBlockObservers),
messageValidatorFactory)); messageValidatorFactory),
gossiper);
ibftController.start(); ibftController.start();
final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController); final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);
@ -275,7 +278,6 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
synchronizer, synchronizer,
nodeKeys, nodeKeys,
transactionPool, transactionPool,
ibftProcessor,
closer); closer);
} }

Loading…
Cancel
Save