Prevent duplicate ibft messages being processed by state machine (#811)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Jason Frame 6 years ago committed by GitHub
parent 8d64c44c40
commit a00160783b
  1. 5
      config/src/main/java/tech/pegasys/pantheon/config/IbftConfigOptions.java
  2. 20
      config/src/test/java/tech/pegasys/pantheon/config/IbftConfigOptionsTest.java
  3. 13
      consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java
  4. 38
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/MessageTracker.java
  5. 23
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticaster.java
  6. 25
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java
  7. 59
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/MessageTrackerTest.java
  8. 82
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticasterTest.java
  9. 38
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java
  10. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java

@ -25,6 +25,7 @@ public class IbftConfigOptions {
// protection for on a typical 20 node validator network with multiple rounds
private static final int DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;
private static final int DEFAULT_DUPLICATE_MESSAGE_LIMIT = 100;
private final JsonObject ibftConfigRoot;
@ -51,4 +52,8 @@ public class IbftConfigOptions {
public int getMessageQueueLimit() {
return ibftConfigRoot.getInteger("messagequeuelimit", DEFAULT_MESSAGE_QUEUE_LIMIT);
}
public int getDuplicateMessageLimit() {
return ibftConfigRoot.getInteger("duplicatemessagelimit", DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}
}

@ -28,6 +28,7 @@ public class IbftConfigOptionsTest {
private static final int EXPECTED_DEFAULT_REQUEST_TIMEOUT = 1;
private static final int EXPECTED_DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT = 100;
@Test
public void shouldGetEpochLengthFromConfig() {
@ -118,6 +119,25 @@ public class IbftConfigOptionsTest {
.isEqualTo(EXPECTED_DEFAULT_MESSAGE_QUEUE_LIMIT);
}
@Test
public void shouldGetDuplicateMessageLimitFromConfig() {
final IbftConfigOptions config = fromConfigOptions(singletonMap("DuplicateMessageLimit", 50));
assertThat(config.getDuplicateMessageLimit()).isEqualTo(50);
}
@Test
public void shouldFallbackToDefaultDuplicateMessageLimit() {
final IbftConfigOptions config = fromConfigOptions(emptyMap());
assertThat(config.getDuplicateMessageLimit())
.isEqualTo(EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}
@Test
public void shouldGetDefaultDuplicateMessageLimitFromDefaultConfig() {
assertThat(IbftConfigOptions.DEFAULT.getDuplicateMessageLimit())
.isEqualTo(EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}
private IbftConfigOptions fromConfigOptions(final Map<String, Object> ibftConfigOptions) {
return GenesisConfigFile.fromConfig(
new JsonObject(singletonMap("config", singletonMap("ibft", ibftConfigOptions))))

@ -69,7 +69,6 @@ import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
@ -112,11 +111,12 @@ public class TestContextBuilder {
public static final int EPOCH_LENGTH = 10_000;
public static final int BLOCK_TIMER_SEC = 3;
public static final int ROUND_TIMER_SEC = 12;
public static final int EVENT_QUEUE_SIZE = 1000;
public static final int SEEN_MESSAGE_SIZE = 100;
public static final int MESSAGE_QUEUE_LIMIT = 1000;
public static final int GOSSIPED_HISTORY_LIMIT = 100;
public static final int DUPLICATE_MESSAGE_LIMIT = 100;
private Clock clock = Clock.fixed(Instant.MIN, ZoneId.of("UTC"));
private IbftEventQueue ibftEventQueue = new IbftEventQueue(EVENT_QUEUE_SIZE);
private IbftEventQueue ibftEventQueue = new IbftEventQueue(MESSAGE_QUEUE_LIMIT);
private int validatorCount = 4;
private int indexOfFirstLocallyProposedBlock = 0; // Meaning first block is from remote peer.
private boolean useGossip = false;
@ -159,9 +159,8 @@ public class TestContextBuilder {
// Use a stubbed version of the multicaster, to prevent creating PeerConnections etc.
final StubValidatorMulticaster multicaster = new StubValidatorMulticaster();
final UniqueMessageMulticaster uniqueMulticaster =
new UniqueMessageMulticaster(multicaster, SEEN_MESSAGE_SIZE);
new UniqueMessageMulticaster(multicaster, GOSSIPED_HISTORY_LIMIT);
final Gossiper gossiper = useGossip ? new IbftGossip(uniqueMulticaster) : mock(Gossiper.class);
@ -308,7 +307,7 @@ public class TestContextBuilder {
messageValidatorFactory),
messageValidatorFactory),
gossiper,
new HashMap<>());
DUPLICATE_MESSAGE_LIMIT);
final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);
//////////////////////////// END IBFT PantheonController ////////////////////////////

@ -0,0 +1,38 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;
import static java.util.Collections.newSetFromMap;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import java.util.Set;
public class MessageTracker {
private final Set<Hash> seenMessages;
public MessageTracker(final int messageTrackingLimit) {
this.seenMessages = newSetFromMap(new SizeLimitedMap<>(messageTrackingLimit));
}
public void addSeenMessage(final MessageData message) {
final Hash uniqueID = Hash.hash(message.getData());
seenMessages.add(uniqueID);
}
public boolean hasSeenMessage(final MessageData message) {
final Hash uniqueID = Hash.hash(message.getData());
return seenMessages.contains(uniqueID);
}
}

@ -12,20 +12,18 @@
*/
package tech.pegasys.pantheon.consensus.ibft;
import static java.util.Collections.newSetFromMap;
import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
public class UniqueMessageMulticaster implements ValidatorMulticaster {
private final ValidatorMulticaster multicaster;
private final Set<Hash> seenMessages;
private final MessageTracker gossipedMessageTracker;
/**
* Constructor that attaches gossip logic to a set of multicaster
@ -36,8 +34,14 @@ public class UniqueMessageMulticaster implements ValidatorMulticaster {
public UniqueMessageMulticaster(
final ValidatorMulticaster multicaster, final int gossipHistoryLimit) {
this.multicaster = multicaster;
// Set that starts evicting members when it hits capacity
this.seenMessages = newSetFromMap(new SizeLimitedMap<>(gossipHistoryLimit));
this.gossipedMessageTracker = new MessageTracker(gossipHistoryLimit);
}
@VisibleForTesting
public UniqueMessageMulticaster(
final ValidatorMulticaster multicaster, final MessageTracker gossipedMessageTracker) {
this.multicaster = multicaster;
this.gossipedMessageTracker = gossipedMessageTracker;
}
@Override
@ -47,11 +51,10 @@ public class UniqueMessageMulticaster implements ValidatorMulticaster {
@Override
public void send(final MessageData message, final Collection<Address> blackList) {
final Hash uniqueID = Hash.hash(message.getData());
if (seenMessages.contains(uniqueID)) {
if (gossipedMessageTracker.hasSeenMessage(message)) {
return;
}
multicaster.send(message, blackList);
seenMessages.add(uniqueID);
gossipedMessageTracker.addSeenMessage(message);
}
}

@ -16,7 +16,7 @@ import static java.util.Collections.emptyList;
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.Gossiper;
import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.MessageTracker;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
@ -45,7 +45,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class IbftController {
private static final Logger LOG = LogManager.getLogger();
private final Blockchain blockchain;
private final IbftFinalState ibftFinalState;
@ -53,13 +52,21 @@ public class IbftController {
private final Map<Long, List<Message>> futureMessages;
private BlockHeightManager currentHeightManager;
private final Gossiper gossiper;
private final MessageTracker duplicateMessageTracker;
public IbftController(
final Blockchain blockchain,
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final IbftGossip gossiper) {
this(blockchain, ibftFinalState, ibftBlockHeightManagerFactory, gossiper, Maps.newHashMap());
final Gossiper gossiper,
final int duplicateMessageLimit) {
this(
blockchain,
ibftFinalState,
ibftBlockHeightManagerFactory,
gossiper,
Maps.newHashMap(),
new MessageTracker(duplicateMessageLimit));
}
@VisibleForTesting
@ -68,12 +75,14 @@ public class IbftController {
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final Gossiper gossiper,
final Map<Long, List<Message>> futureMessages) {
final Map<Long, List<Message>> futureMessages,
final MessageTracker duplicateMessageTracker) {
this.blockchain = blockchain;
this.ibftFinalState = ibftFinalState;
this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory;
this.futureMessages = futureMessages;
this.gossiper = gossiper;
this.duplicateMessageTracker = duplicateMessageTracker;
}
public void start() {
@ -81,7 +90,11 @@ public class IbftController {
}
public void handleMessageEvent(final IbftReceivedMessageEvent msg) {
handleMessage(msg.getMessage());
final MessageData data = msg.getMessage().getData();
if (!duplicateMessageTracker.hasSeenMessage(data)) {
duplicateMessageTracker.addSeenMessage(data);
handleMessage(msg.getMessage());
}
}
private void handleMessage(final Message message) {

@ -0,0 +1,59 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import org.junit.Test;
public class MessageTrackerTest {
private final MessageTracker messageTracker = new MessageTracker(5);
@Test
public void duplicateMessagesAreConsideredSeen() {
final MessageData arbitraryMessage_1 =
createAnonymousMessageData(BytesValue.wrap(new byte[4]), 1);
final MessageData arbitraryMessage_2 =
createAnonymousMessageData(BytesValue.wrap(new byte[4]), 1);
assertThat(messageTracker.hasSeenMessage(arbitraryMessage_1)).isFalse();
assertThat(messageTracker.hasSeenMessage(arbitraryMessage_2)).isFalse();
messageTracker.addSeenMessage(arbitraryMessage_1);
assertThat(messageTracker.hasSeenMessage(arbitraryMessage_2)).isTrue();
}
private MessageData createAnonymousMessageData(final BytesValue content, final int code) {
return new MessageData() {
@Override
public int getSize() {
return content.size();
}
@Override
public int getCode() {
return code;
}
@Override
public BytesValue getData() {
return content;
}
};
}
}

@ -18,11 +18,11 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.AddressHelpers;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@ -36,95 +36,43 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class UniqueMessageMulticasterTest {
private final MessageTracker messageTracker = mock(MessageTracker.class);
private final ValidatorMulticaster multicaster = mock(ValidatorMulticaster.class);
private final UniqueMessageMulticaster messageTracker =
new UniqueMessageMulticaster(multicaster, 5);
private final UniqueMessageMulticaster uniqueMessageMulticaster =
new UniqueMessageMulticaster(multicaster, messageTracker);
private final RawMessage messageSent = new RawMessage(5, BytesValue.wrap(new byte[5]));
@Test
public void previouslySentMessageIsNotSentAgain() {
messageTracker.send(messageSent);
when(messageTracker.hasSeenMessage(messageSent)).thenReturn(false);
uniqueMessageMulticaster.send(messageSent);
verify(multicaster, times(1)).send(messageSent, emptyList());
reset(multicaster);
messageTracker.send(messageSent);
messageTracker.send(messageSent, emptyList());
when(messageTracker.hasSeenMessage(messageSent)).thenReturn(true);
uniqueMessageMulticaster.send(messageSent);
uniqueMessageMulticaster.send(messageSent, emptyList());
verifyZeroInteractions(multicaster);
}
@Test
public void messagesSentWithABlackListAreNotRetransmitted() {
messageTracker.send(messageSent, emptyList());
when(messageTracker.hasSeenMessage(messageSent)).thenReturn(false);
uniqueMessageMulticaster.send(messageSent, emptyList());
verify(multicaster, times(1)).send(messageSent, emptyList());
reset(multicaster);
messageTracker.send(messageSent, emptyList());
messageTracker.send(messageSent);
verifyZeroInteractions(multicaster);
}
@Test
public void oldMessagesAreEvictedWhenFullAndCanThenBeRetransmitted() {
final List<MessageData> messagesSent = Lists.newArrayList();
for (int i = 0; i < 6; i++) {
final RawMessage msg = new RawMessage(i, BytesValue.wrap(new byte[i]));
messagesSent.add(msg);
messageTracker.send(msg);
verify(multicaster, times(1)).send(msg, emptyList());
}
reset(multicaster);
messageTracker.send(messagesSent.get(5));
when(messageTracker.hasSeenMessage(messageSent)).thenReturn(true);
uniqueMessageMulticaster.send(messageSent, emptyList());
uniqueMessageMulticaster.send(messageSent);
verifyZeroInteractions(multicaster);
messageTracker.send(messagesSent.get(0));
verify(multicaster, times(1)).send(messagesSent.get(0), emptyList());
}
@Test
public void passedInBlackListIsPassedToUnderlyingValidator() {
final List<Address> blackList =
Lists.newArrayList(AddressHelpers.ofValue(0), AddressHelpers.ofValue(1));
messageTracker.send(messageSent, blackList);
uniqueMessageMulticaster.send(messageSent, blackList);
verify(multicaster, times(1)).send(messageSent, blackList);
}
@Test
public void anonymousMessageDataClassesContainingTheSameDataAreConsideredIdentical() {
final MessageData arbitraryMessage_1 =
createAnonymousMessageData(BytesValue.wrap(new byte[4]), 1);
final MessageData arbitraryMessage_2 =
createAnonymousMessageData(BytesValue.wrap(new byte[4]), 1);
messageTracker.send(arbitraryMessage_1);
verify(multicaster, times(1)).send(arbitraryMessage_1, emptyList());
reset(multicaster);
messageTracker.send(arbitraryMessage_2);
verifyZeroInteractions(multicaster);
}
private MessageData createAnonymousMessageData(final BytesValue content, final int code) {
return new MessageData() {
@Override
public int getSize() {
return content.size();
}
@Override
public int getCode() {
return code;
}
@Override
public BytesValue getData() {
return content;
}
};
}
}

@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.MessageTracker;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
@ -39,11 +40,6 @@ import tech.pegasys.pantheon.consensus.ibft.messagewrappers.NewRound;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Prepare;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Proposal;
import tech.pegasys.pantheon.consensus.ibft.messagewrappers.RoundChange;
import tech.pegasys.pantheon.consensus.ibft.payload.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.NewRoundPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.PreparePayload;
import tech.pegasys.pantheon.consensus.ibft.payload.ProposalPayload;
import tech.pegasys.pantheon.consensus.ibft.payload.RoundChangePayload;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
@ -75,28 +71,24 @@ public class IbftControllerTest {
@Mock private Proposal proposal;
private Message proposalMessage;
@Mock private ProposalMessageData proposalMessageData;
@Mock private ProposalPayload proposalPayload;
@Mock private Prepare prepare;
private Message prepareMessage;
@Mock private PrepareMessageData prepareMessageData;
@Mock private PreparePayload preparePayload;
@Mock private Commit commit;
private Message commitMessage;
@Mock private CommitMessageData commitMessageData;
@Mock private CommitPayload commitPayload;
@Mock private NewRound newRound;
private Message newRoundMessage;
@Mock private NewRoundMessageData newRoundMessageData;
@Mock private NewRoundPayload newRoundPayload;
@Mock private RoundChange roundChange;
private Message roundChangeMessage;
@Mock private RoundChangeMessageData roundChangeMessageData;
@Mock private RoundChangePayload roundChangePayload;
@Mock private MessageTracker messageTracker;
private final Map<Long, List<Message>> futureMessages = new HashMap<>();
private final Address validator = Address.fromHexString("0x0");
private final Address unknownValidator = Address.fromHexString("0x2");
@ -112,7 +104,12 @@ public class IbftControllerTest {
when(ibftFinalState.getValidators()).thenReturn(ImmutableList.of(validator));
ibftController =
new IbftController(
blockChain, ibftFinalState, blockHeightManagerFactory, ibftGossip, futureMessages);
blockChain,
ibftFinalState,
blockHeightManagerFactory,
ibftGossip,
futureMessages,
messageTracker);
when(chainHeadBlockHeader.getNumber()).thenReturn(1L);
when(chainHeadBlockHeader.getHash()).thenReturn(Hash.ZERO);
@ -122,6 +119,7 @@ public class IbftControllerTest {
when(nextBlock.getNumber()).thenReturn(2L);
when(ibftFinalState.isLocalNodeValidator()).thenReturn(true);
when(messageTracker.hasSeenMessage(any())).thenReturn(false);
}
@Test
@ -432,6 +430,24 @@ public class IbftControllerTest {
verifyHasFutureMessages(new IbftReceivedMessageEvent(roundChangeMessage), expectedFutureMsgs);
}
@Test
public void duplicatedMessagesAreNotProcessed() {
when(messageTracker.hasSeenMessage(proposalMessageData)).thenReturn(true);
setupProposal(roundIdentifier, validator);
verifyNotHandledAndNoFutureMsgs(new IbftReceivedMessageEvent(proposalMessage));
verify(messageTracker, never()).addSeenMessage(proposalMessageData);
}
@Test
public void uniqueMessagesAreAddedAsSeen() {
when(messageTracker.hasSeenMessage(proposalMessageData)).thenReturn(false);
setupProposal(roundIdentifier, validator);
ibftController.start();
ibftController.handleMessageEvent(new IbftReceivedMessageEvent(proposalMessage));
verify(messageTracker).addSeenMessage(proposalMessageData);
}
private void verifyNotHandledAndNoFutureMsgs(final IbftReceivedMessageEvent msg) {
ibftController.start();
ibftController.handleMessageEvent(msg);

@ -251,7 +251,8 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
minedBlockObservers,
messageValidatorFactory),
messageValidatorFactory),
gossiper);
gossiper,
ibftConfig.getDuplicateMessageLimit());
ibftController.start();
final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);

Loading…
Cancel
Save