From bcdb431483a4674b146be7c952d34e1af67b7cba Mon Sep 17 00:00:00 2001 From: tmohay <37158202+rain-on@users.noreply.github.com> Date: Wed, 19 Dec 2018 15:40:07 +1100 Subject: [PATCH] Ibft Height Manager (#418) The IbftHeightManager is responsible for all things 'meta-round' related, such things as: * Handling RoundTimeout and starting a new round * Handling NewRound messages and starting a new round * Ensuring RoundChange messages are sent at the correct time with appropriate content. * Collating RoundChange messages and starting a new round using the best prepared certificate in the collection. Signed-off-by: Adrian Sutton --- .../statemachine/IbftBlockHeightManager.java | 227 ++++++++++- .../IbftBlockHeightManagerFactory.java | 35 +- .../ibft/statemachine/IbftFinalState.java | 10 +- .../ibft/statemachine/IbftRoundFactory.java | 1 - .../ibft/statemachine/RoundChangeManager.java | 10 +- .../IbftBlockHeightManagerTest.java | 368 ++++++++++++++++++ .../statemachine/RoundChangeManagerTest.java | 4 +- 7 files changed, 633 insertions(+), 22 deletions(-) create mode 100644 consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java index 556a7baf9b..8a0343272a 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java @@ -12,35 +12,242 @@ */ package tech.pegasys.pantheon.consensus.ibft.statemachine; +import static tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManager.MessageAge.CURRENT_ROUND; +import static tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManager.MessageAge.FUTURE_ROUND; +import static tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManager.MessageAge.PRIOR_ROUND; + +import tech.pegasys.pantheon.consensus.ibft.BlockTimer; import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; +import tech.pegasys.pantheon.consensus.ibft.RoundTimer; import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparedCertificate; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.ProposalPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangeCertificate; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; +import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory; +import tech.pegasys.pantheon.consensus.ibft.validation.NewRoundMessageValidator; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; + +import java.time.Clock; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; -/** This no-op version will be replaced with an implementation in another PR */ +/** + * Responsible for starting/clearing Consensus rounds at a given block height. One of these is + * created when a new block is imported to the chain. It immediately then creates a Round-0 object, + * and sends a Proposal message. If the round times out prior to importing a block, this class is + * responsible for creating a RoundChange message and transmitting it. + */ public class IbftBlockHeightManager { - public void handleProposalMessage(final SignedData proposalMsg) {} + protected enum MessageAge { + PRIOR_ROUND, + CURRENT_ROUND, + FUTURE_ROUND + } + + private static final Logger LOG = LogManager.getLogger(); - public void handlePrepareMessage(final SignedData prepareMsg) {} + private final IbftRoundFactory roundFactory; + private final RoundChangeManager roundChangeManager; + private final BlockHeader parentHeader; + private final RoundTimer roundTimer; + private final BlockTimer blockTimer; + private final IbftMessageTransmitter transmitter; + private final MessageFactory messageFactory; + private final Map futureRoundStateBuffer = Maps.newHashMap(); + private final NewRoundMessageValidator newRoundMessageValidator; + private final Clock clock; + private final Function roundStateCreator; + private final IbftFinalState finalState; - public void handleCommitMessage(final SignedData commitMsg) {} + private Optional latestPreparedCertificate = Optional.empty(); - public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIndentifier) {} + private IbftRound currentRound; - public void handleRoundChangeMessage(final SignedData roundChangeMsg) {} + public IbftBlockHeightManager( + final BlockHeader parentHeader, + final IbftFinalState finalState, + final RoundChangeManager roundChangeManager, + final IbftRoundFactory ibftRoundFactory, + final Clock clock, + final MessageValidatorFactory messageValidatorFactory) { + this.parentHeader = parentHeader; + this.roundFactory = ibftRoundFactory; + this.roundTimer = finalState.getRoundTimer(); + this.blockTimer = finalState.getBlockTimer(); + this.transmitter = finalState.getTransmitter(); + this.messageFactory = finalState.getMessageFactory(); + this.clock = clock; + this.roundChangeManager = roundChangeManager; + this.finalState = finalState; - public void handleNewRoundMessage(final SignedData newRoundMsg) {} + newRoundMessageValidator = messageValidatorFactory.createNewRoundValidator(parentHeader); - public void start() {} + roundStateCreator = + (roundIdentifier) -> + new RoundState( + roundIdentifier, + finalState.getQuorumSize(), + messageValidatorFactory.createMessageValidator(roundIdentifier, parentHeader)); + } + + public void start() { + startNewRound(0); + if (finalState.isLocalNodeProposerForRound(currentRound.getRoundIdentifier())) { + blockTimer.startTimer(currentRound.getRoundIdentifier(), parentHeader); + } + } + + public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIdentifier) { + if (roundIdentifier.equals(currentRound.getRoundIdentifier())) { + currentRound.createAndSendProposalMessage(clock.millis() / 1000); + } else { + LOG.info( + "Block timer expired for a round ({}) other than current ({})", + roundIdentifier, + currentRound.getRoundIdentifier()); + } + } + + public void roundExpired(final RoundExpiry expire) { + if (!expire.getView().equals(currentRound.getRoundIdentifier())) { + LOG.debug("Ignoring Round timer expired which does not match current round."); + return; + } + + LOG.info("Round has expired, creating PreparedCertificate and notifying peers."); + final Optional preparedCertificate = + currentRound.createPrepareCertificate(); + + if (preparedCertificate.isPresent()) { + latestPreparedCertificate = preparedCertificate; + } + + startNewRound(currentRound.getRoundIdentifier().getRoundNumber() + 1); + + final SignedData localRoundChange = + messageFactory.createSignedRoundChangePayload( + currentRound.getRoundIdentifier(), latestPreparedCertificate); + transmitter.multicastRoundChange(currentRound.getRoundIdentifier(), latestPreparedCertificate); + + // Its possible the locally created RoundChange triggers the transmission of a NewRound + // message - so it must be handled accordingly. + handleRoundChangeMessage(localRoundChange); + } + + public void handleProposalMessage(final SignedData msg) { + LOG.info("Received a Proposal message."); + actionOrBufferMessage(msg, currentRound::handleProposalMessage, RoundState::setProposedBlock); + } + + public void handlePrepareMessage(final SignedData msg) { + LOG.info("Received a prepare message."); + actionOrBufferMessage(msg, currentRound::handlePrepareMessage, RoundState::addPrepareMessage); + } + + public void handleCommitMessage(final SignedData msg) { + LOG.info("Received a commit message."); + actionOrBufferMessage(msg, currentRound::handleCommitMessage, RoundState::addCommitMessage); + } + + private void actionOrBufferMessage( + final SignedData msg, + final Consumer> inRoundHandler, + final BiConsumer> buffer) { + final Payload payload = msg.getPayload(); + final MessageAge messageAge = determineAgeOfPayload(payload); + if (messageAge == CURRENT_ROUND) { + inRoundHandler.accept(msg); + } else if (messageAge == FUTURE_ROUND) { + final ConsensusRoundIdentifier msgRoundId = payload.getRoundIdentifier(); + final RoundState roundstate = + futureRoundStateBuffer.computeIfAbsent( + msgRoundId.getRoundNumber(), k -> roundStateCreator.apply(msgRoundId)); + buffer.accept(roundstate, msg); + } + } + + public void handleRoundChangeMessage(final SignedData msg) { + final Optional result = + roundChangeManager.appendRoundChangeMessage(msg); + final MessageAge messageAge = determineAgeOfPayload(msg.getPayload()); + if (messageAge == PRIOR_ROUND) { + LOG.info("Received RoundChange Message for a prior round."); + return; + } + ConsensusRoundIdentifier targetRound = msg.getPayload().getRoundIdentifier(); + LOG.info("Received a RoundChange message for {}", targetRound.toString()); + + if (result.isPresent()) { + if (messageAge == FUTURE_ROUND) { + startNewRound(targetRound.getRoundNumber()); + } + + if (finalState.isLocalNodeProposerForRound(targetRound)) { + currentRound.startRoundWith(result.get(), clock.millis() / 1000); + } + } + } + + private void startNewRound(final int roundNumber) { + LOG.info("Starting new round {}", roundNumber); + if (futureRoundStateBuffer.containsKey(roundNumber)) { + currentRound = + roundFactory.createNewRoundWithState( + parentHeader, futureRoundStateBuffer.get(roundNumber)); + futureRoundStateBuffer.keySet().removeIf(k -> k <= roundNumber); + } else { + currentRound = roundFactory.createNewRound(parentHeader, roundNumber); + } + // discard roundChange messages from the current and previous rounds + roundChangeManager.discardRoundsPriorTo(currentRound.getRoundIdentifier()); + roundTimer.startTimer(currentRound.getRoundIdentifier()); + } + + public void handleNewRoundMessage(final SignedData msg) { + final NewRoundPayload payload = msg.getPayload(); + final MessageAge messageAge = determineAgeOfPayload(payload); + + if (messageAge == PRIOR_ROUND) { + LOG.info("Received NewRound Message for a prior round."); + return; + } + LOG.info("Received NewRound Message for {}", payload.getRoundIdentifier().toString()); + + if (newRoundMessageValidator.validateNewRoundMessage(msg)) { + if (messageAge == FUTURE_ROUND) { + startNewRound(payload.getRoundIdentifier().getRoundNumber()); + } + currentRound.handleProposalMessage(payload.getProposalPayload()); + } + } public long getChainHeight() { - return 0; + return currentRound.getRoundIdentifier().getSequenceNumber(); } - public void roundExpired(final RoundExpiry expired) {} + private MessageAge determineAgeOfPayload(final Payload payload) { + int messageRoundNumber = payload.getRoundIdentifier().getRoundNumber(); + int currentRoundNumber = currentRound.getRoundIdentifier().getRoundNumber(); + if (messageRoundNumber > currentRoundNumber) { + return FUTURE_ROUND; + } else if (messageRoundNumber == currentRoundNumber) { + return CURRENT_ROUND; + } + return PRIOR_ROUND; + } } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerFactory.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerFactory.java index b2fddd8e74..1b51e27b9c 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerFactory.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerFactory.java @@ -12,12 +12,43 @@ */ package tech.pegasys.pantheon.consensus.ibft.statemachine; +import tech.pegasys.pantheon.consensus.ibft.IbftContext; +import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory; +import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.core.BlockHeader; -/** This no-op version will be replaced with an implementation in another PR */ +import java.time.Clock; + public class IbftBlockHeightManagerFactory { + private final IbftRoundFactory roundFactory; + private final IbftFinalState finalState; + private final ProtocolContext protocolContext; + private final MessageValidatorFactory messageValidatorFactory; + + public IbftBlockHeightManagerFactory( + final IbftFinalState finalState, + final IbftRoundFactory roundFactory, + final MessageValidatorFactory messageValidatorFactory, + final ProtocolContext protocolContext) { + this.roundFactory = roundFactory; + this.finalState = finalState; + this.protocolContext = protocolContext; + this.messageValidatorFactory = messageValidatorFactory; + } + public IbftBlockHeightManager create(final BlockHeader parentHeader) { - return new IbftBlockHeightManager(); + long nextChainHeight = parentHeader.getNumber() + 1; + return new IbftBlockHeightManager( + parentHeader, + finalState, + new RoundChangeManager( + nextChainHeight, + finalState.getValidators(), + (roundIdentifier) -> + messageValidatorFactory.createMessageValidator(roundIdentifier, parentHeader)), + roundFactory, + Clock.systemUTC(), + messageValidatorFactory); } } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftFinalState.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftFinalState.java index f509366e58..e41c2db91b 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftFinalState.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftFinalState.java @@ -27,6 +27,7 @@ import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; +import java.time.Clock; import java.util.Collection; /** This is the full data set, or context, required for many of the aspects of the IBFT workflow. */ @@ -42,6 +43,7 @@ public class IbftFinalState { private final MessageFactory messageFactory; private final BlockHeaderValidator ibftContextBlockHeaderValidator; private final IbftMessageTransmitter messageTransmitter; + private final Clock clock; public IbftFinalState( final ValidatorProvider validatorProvider, @@ -53,7 +55,8 @@ public class IbftFinalState { final BlockTimer blockTimer, final IbftBlockCreatorFactory blockCreatorFactory, final MessageFactory messageFactory, - final BlockHeaderValidator ibftContextBlockHeaderValidator) { + final BlockHeaderValidator ibftContextBlockHeaderValidator, + final Clock clock) { this.validatorProvider = validatorProvider; this.nodeKeys = nodeKeys; this.localAddress = localAddress; @@ -65,6 +68,7 @@ public class IbftFinalState { this.messageFactory = messageFactory; this.ibftContextBlockHeaderValidator = ibftContextBlockHeaderValidator; this.messageTransmitter = new IbftMessageTransmitter(messageFactory, peers); + this.clock = clock; } public int getQuorumSize() { @@ -122,4 +126,8 @@ public class IbftFinalState { public IbftMessageTransmitter getTransmitter() { return messageTransmitter; } + + public Clock getClock() { + return clock; + } } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundFactory.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundFactory.java index 8d31f7ceca..ed8fda0549 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundFactory.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundFactory.java @@ -46,7 +46,6 @@ public class IbftRoundFactory { long nextBlockHeight = parentHeader.getNumber() + 1; final ConsensusRoundIdentifier roundIdentifier = new ConsensusRoundIdentifier(nextBlockHeight, round); - final IbftBlockCreator blockCreator = blockCreatorFactory.create(parentHeader, round); final RoundState roundState = new RoundState( diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManager.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManager.java index a8f7530024..4d0adc2b4c 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManager.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManager.java @@ -137,14 +137,12 @@ public class RoundChangeManager { * * @param completedRoundIdentifier round identifier that has been identified as superseded */ - public void discardCompletedRound(final ConsensusRoundIdentifier completedRoundIdentifier) { - roundChangeCache - .entrySet() - .removeIf(entry -> isAnEarlierOrEqualRound(entry.getKey(), completedRoundIdentifier)); + public void discardRoundsPriorTo(final ConsensusRoundIdentifier completedRoundIdentifier) { + roundChangeCache.keySet().removeIf(k -> isAnEarlierRound(k, completedRoundIdentifier)); } - private boolean isAnEarlierOrEqualRound( + private boolean isAnEarlierRound( final ConsensusRoundIdentifier left, final ConsensusRoundIdentifier right) { - return left.getRoundNumber() <= right.getRoundNumber(); + return left.getRoundNumber() < right.getRoundNumber(); } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java new file mode 100644 index 0000000000..8554420927 --- /dev/null +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java @@ -0,0 +1,368 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.statemachine; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static tech.pegasys.pantheon.consensus.ibft.TestHelpers.createFrom; + +import tech.pegasys.pantheon.consensus.common.VoteTally; +import tech.pegasys.pantheon.consensus.ibft.BlockTimer; +import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; +import tech.pegasys.pantheon.consensus.ibft.IbftContext; +import tech.pegasys.pantheon.consensus.ibft.IbftExtraData; +import tech.pegasys.pantheon.consensus.ibft.RoundTimer; +import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreator; +import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparedCertificate; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangeCertificate; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; +import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidator; +import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory; +import tech.pegasys.pantheon.consensus.ibft.validation.NewRoundMessageValidator; +import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; +import tech.pegasys.pantheon.crypto.SECP256K1.Signature; +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.Block; +import tech.pegasys.pantheon.ethereum.core.BlockBody; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; +import tech.pegasys.pantheon.ethereum.core.BlockImporter; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.core.Util; +import tech.pegasys.pantheon.util.Subscribers; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.math.BigInteger; +import java.time.Clock; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import com.google.common.collect.Lists; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class IbftBlockHeightManagerTest { + + private KeyPair localNodeKeys = KeyPair.generate(); + private final MessageFactory messageFactory = new MessageFactory(localNodeKeys); + private final BlockHeaderTestFixture headerTestFixture = new BlockHeaderTestFixture(); + + @Mock private IbftFinalState finalState; + @Mock private IbftMessageTransmitter messageTransmitter; + @Mock private RoundChangeManager roundChangeManager; + @Mock private IbftRoundFactory roundFactory; + @Mock private Clock clock; + @Mock private MessageValidatorFactory messageValidatorFactory; + @Mock private IbftBlockCreator blockCreator; + @Mock private BlockImporter blockImporter; + @Mock private BlockTimer blockTimer; + @Mock private RoundTimer roundTimer; + @Mock private NewRoundMessageValidator newRoundMessageValidator; + + @Captor private ArgumentCaptor> preparedCaptor; + + private final List validatorKeys = Lists.newArrayList(); + private List
validators = Lists.newArrayList(); + private List validatorMessageFactory = Lists.newArrayList(); + + private ProtocolContext protocolContext; + private ConsensusRoundIdentifier roundIdentifier = new ConsensusRoundIdentifier(1, 0); + private Block createdBlock; + + private void buildCreatedBlock() { + + IbftExtraData extraData = + new IbftExtraData( + BytesValue.wrap(new byte[32]), emptyList(), Optional.empty(), 0, validators); + + headerTestFixture.extraData(extraData.encode()); + final BlockHeader header = headerTestFixture.buildHeader(); + createdBlock = new Block(header, new BlockBody(emptyList(), emptyList())); + } + + @Before + public void setup() { + for (int i = 0; i < 3; i++) { + final KeyPair key = KeyPair.generate(); + validatorKeys.add(key); + validators.add(Util.publicKeyToAddress(key.getPublicKey())); + validatorMessageFactory.add(new MessageFactory(key)); + } + + buildCreatedBlock(); + + final MessageValidator messageValidator = mock(MessageValidator.class); + when(messageValidator.addSignedProposalPayload(any())).thenReturn(true); + when(messageValidator.validateCommmitMessage(any())).thenReturn(true); + when(messageValidator.validatePrepareMessage(any())).thenReturn(true); + when(finalState.getTransmitter()).thenReturn(messageTransmitter); + when(finalState.getBlockTimer()).thenReturn(blockTimer); + when(finalState.getRoundTimer()).thenReturn(roundTimer); + when(finalState.getQuorumSize()).thenReturn(3); + when(finalState.getMessageFactory()).thenReturn(messageFactory); + when(blockCreator.createBlock(anyLong())).thenReturn(createdBlock); + when(newRoundMessageValidator.validateNewRoundMessage(any())).thenReturn(true); + when(messageValidatorFactory.createNewRoundValidator(any())) + .thenReturn(newRoundMessageValidator); + when(messageValidatorFactory.createMessageValidator(any(), any())).thenReturn(messageValidator); + + protocolContext = + new ProtocolContext<>(null, null, new IbftContext(new VoteTally(validators), null)); + + // Ensure the created IbftRound has the valid ConsensusRoundIdentifier; + when(roundFactory.createNewRound(any(), anyInt())) + .thenAnswer( + invocation -> { + final int round = (int) invocation.getArgument(1); + final ConsensusRoundIdentifier roundId = new ConsensusRoundIdentifier(1, round); + final RoundState createdRoundState = + new RoundState(roundId, finalState.getQuorumSize(), messageValidator); + return new IbftRound( + createdRoundState, + blockCreator, + protocolContext, + blockImporter, + new Subscribers<>(), + localNodeKeys, + messageFactory, + messageTransmitter); + }); + + when(roundFactory.createNewRoundWithState(any(), any())) + .thenAnswer( + invocation -> { + final RoundState providedRoundState = invocation.getArgument(1); + return new IbftRound( + providedRoundState, + blockCreator, + protocolContext, + blockImporter, + new Subscribers<>(), + localNodeKeys, + messageFactory, + messageTransmitter); + }); + } + + @Test + public void startsABlockTimerOnStartIfLocalNodeIsTheProoserForRound() { + when(finalState.isLocalNodeProposerForRound(any())).thenReturn(true); + + final IbftBlockHeightManager manager = + new IbftBlockHeightManager( + headerTestFixture.buildHeader(), + finalState, + roundChangeManager, + roundFactory, + clock, + messageValidatorFactory); + manager.start(); + + verify(blockTimer, times(1)).startTimer(any(), any()); + } + + @Test + public void onBlockTimerExpiryProposalMessageIsTransmitted() { + final IbftBlockHeightManager manager = + new IbftBlockHeightManager( + headerTestFixture.buildHeader(), + finalState, + roundChangeManager, + roundFactory, + clock, + messageValidatorFactory); + manager.start(); + + manager.handleBlockTimerExpiry(roundIdentifier); + verify(messageTransmitter, times(1)).multicastProposal(eq(roundIdentifier), any()); + verify(messageTransmitter, never()).multicastPrepare(any(), any()); + verify(messageTransmitter, never()).multicastPrepare(any(), any()); + } + + @Test + public void onRoundChangeReceptionRoundChangeManagerIsInvokedAndNewRoundStarted() { + final ConsensusRoundIdentifier futureRoundIdentifier = createFrom(roundIdentifier, 0, +2); + final SignedData roundChangePayload = + messageFactory.createSignedRoundChangePayload(futureRoundIdentifier, Optional.empty()); + when(roundChangeManager.appendRoundChangeMessage(any())) + .thenReturn(Optional.of(new RoundChangeCertificate(singletonList(roundChangePayload)))); + when(finalState.isLocalNodeProposerForRound(any())).thenReturn(false); + + final IbftBlockHeightManager manager = + new IbftBlockHeightManager( + headerTestFixture.buildHeader(), + finalState, + roundChangeManager, + roundFactory, + clock, + messageValidatorFactory); + manager.start(); + verify(roundFactory).createNewRound(any(), eq(0)); + + manager.handleRoundChangeMessage(roundChangePayload); + + verify(roundChangeManager, times(1)).appendRoundChangeMessage(roundChangePayload); + verify(roundFactory, times(1)) + .createNewRound(any(), eq(futureRoundIdentifier.getRoundNumber())); + } + + @Test + public void onRoundTimerExpiryANewRoundIsCreatedWithAnIncrementedRoundNumber() { + final IbftBlockHeightManager manager = + new IbftBlockHeightManager( + headerTestFixture.buildHeader(), + finalState, + roundChangeManager, + roundFactory, + clock, + messageValidatorFactory); + manager.start(); + verify(roundFactory).createNewRound(any(), eq(0)); + + manager.roundExpired(new RoundExpiry(roundIdentifier)); + verify(roundFactory).createNewRound(any(), eq(1)); + } + + @Test + public void whenSufficientRoundChangesAreReceivedANewRoundMessageIsTransmitted() { + final ConsensusRoundIdentifier futureRoundIdentifier = createFrom(roundIdentifier, 0, +2); + final SignedData roundChangePayload = + messageFactory.createSignedRoundChangePayload(futureRoundIdentifier, Optional.empty()); + final RoundChangeCertificate roundChangCert = + new RoundChangeCertificate(singletonList(roundChangePayload)); + + when(roundChangeManager.appendRoundChangeMessage(any())) + .thenReturn(Optional.of(roundChangCert)); + when(finalState.isLocalNodeProposerForRound(any())).thenReturn(true); + + final IbftBlockHeightManager manager = + new IbftBlockHeightManager( + headerTestFixture.buildHeader(), + finalState, + roundChangeManager, + roundFactory, + clock, + messageValidatorFactory); + manager.start(); + + manager.handleRoundChangeMessage(roundChangePayload); + + verify(messageTransmitter, times(1)) + .multicastNewRound(eq(futureRoundIdentifier), eq(roundChangCert), any()); + } + + @Test + public void messagesForFutureRoundsAreBufferedAndUsedToPreloadNewRoundWhenItIsStarted() { + final ConsensusRoundIdentifier futureRoundIdentifier = createFrom(roundIdentifier, 0, +2); + + final IbftBlockHeightManager manager = + new IbftBlockHeightManager( + headerTestFixture.buildHeader(), + finalState, + roundChangeManager, + roundFactory, + clock, + messageValidatorFactory); + manager.start(); + + final SignedData preparePayload = + validatorMessageFactory + .get(0) + .createSignedPreparePayload(futureRoundIdentifier, Hash.fromHexStringLenient("0")); + final SignedData commitPayload = + validatorMessageFactory + .get(1) + .createSignedCommitPayload( + futureRoundIdentifier, + Hash.fromHexStringLenient("0"), + Signature.create(BigInteger.ONE, BigInteger.ONE, (byte) 1)); + + manager.handlePrepareMessage(preparePayload); + manager.handleCommitMessage(commitPayload); + + // Force a new round to be started at new round number. + final SignedData newRound = + messageFactory.createSignedNewRoundPayload( + futureRoundIdentifier, + new RoundChangeCertificate(Collections.emptyList()), + messageFactory.createSignedProposalPayload(futureRoundIdentifier, createdBlock)); + + manager.handleNewRoundMessage(newRound); + + // Final state sets the Quorum Size to 3, so should send a Prepare and also a commit + verify(messageTransmitter, times(1)).multicastPrepare(eq(futureRoundIdentifier), any()); + verify(messageTransmitter, times(1)).multicastPrepare(eq(futureRoundIdentifier), any()); + } + + @Test + public void preparedCertificateIncludedInRoundChangeMessageOnRoundTimeoutExpired() { + final IbftBlockHeightManager manager = + new IbftBlockHeightManager( + headerTestFixture.buildHeader(), + finalState, + roundChangeManager, + roundFactory, + clock, + messageValidatorFactory); + manager.start(); + manager.handleBlockTimerExpiry(roundIdentifier); // Trigger a Proposal creation. + + final SignedData preparePayload = + validatorMessageFactory + .get(0) + .createSignedPreparePayload(roundIdentifier, Hash.fromHexStringLenient("0")); + final SignedData secondPreparePayload = + validatorMessageFactory + .get(1) + .createSignedPreparePayload(roundIdentifier, Hash.fromHexStringLenient("0")); + manager.handlePrepareMessage(preparePayload); + manager.handlePrepareMessage(secondPreparePayload); + + manager.roundExpired(new RoundExpiry(roundIdentifier)); + + final ConsensusRoundIdentifier nextRound = createFrom(roundIdentifier, 0, +1); + + verify(messageTransmitter, times(1)) + .multicastRoundChange(eq(nextRound), preparedCaptor.capture()); + final Optional preparedCert = preparedCaptor.getValue(); + + assertThat(preparedCert).isNotEmpty(); + + assertThat(preparedCert.get().getPreparePayloads()) + .containsOnly(preparePayload, secondPreparePayload); + } +} diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManagerTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManagerTest.java index a69752137f..c80f3d04ce 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManagerTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManagerTest.java @@ -158,7 +158,7 @@ public class RoundChangeManagerTest { } @Test - public void discardsPreviousRounds() { + public void discardsRoundPreviousToThatRequested() { SignedData roundChangeDataProposer = makeRoundChangeMessage(proposerKey, ri1); SignedData roundChangeDataValidator1 = @@ -171,7 +171,7 @@ public class RoundChangeManagerTest { .isEqualTo(Optional.empty()); assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator2)) .isEqualTo(Optional.empty()); - manager.discardCompletedRound(ri1); + manager.discardRoundsPriorTo(ri2); assertThat(manager.roundChangeCache.get(ri1)).isNull(); assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(1); assertThat(manager.roundChangeCache.get(ri3).receivedMessages.size()).isEqualTo(1);