Ibft Height Manager (#418)

The IbftHeightManager is responsible for all things 'meta-round'
related, such things as:
* Handling RoundTimeout and starting a new round
* Handling NewRound messages and starting a new round
* Ensuring RoundChange messages are sent at the correct time with
  appropriate content.
* Collating RoundChange messages and starting a new round using the
  best prepared certificate in the collection.
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
tmohay 6 years ago committed by GitHub
parent 97085afdb9
commit bcdb431483
  1. 227
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java
  2. 35
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerFactory.java
  3. 10
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftFinalState.java
  4. 1
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundFactory.java
  5. 10
      consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManager.java
  6. 368
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java
  7. 4
      consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManagerTest.java

@ -12,35 +12,242 @@
*/ */
package tech.pegasys.pantheon.consensus.ibft.statemachine; package tech.pegasys.pantheon.consensus.ibft.statemachine;
import static tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManager.MessageAge.CURRENT_ROUND;
import static tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManager.MessageAge.FUTURE_ROUND;
import static tech.pegasys.pantheon.consensus.ibft.statemachine.IbftBlockHeightManager.MessageAge.PRIOR_ROUND;
import tech.pegasys.pantheon.consensus.ibft.BlockTimer;
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparedCertificate;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.ProposalPayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.ProposalPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangeCertificate;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData;
import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory;
import tech.pegasys.pantheon.consensus.ibft.validation.NewRoundMessageValidator;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import java.time.Clock;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/** This no-op version will be replaced with an implementation in another PR */ /**
* Responsible for starting/clearing Consensus rounds at a given block height. One of these is
* created when a new block is imported to the chain. It immediately then creates a Round-0 object,
* and sends a Proposal message. If the round times out prior to importing a block, this class is
* responsible for creating a RoundChange message and transmitting it.
*/
public class IbftBlockHeightManager { public class IbftBlockHeightManager {
public void handleProposalMessage(final SignedData<ProposalPayload> proposalMsg) {} protected enum MessageAge {
PRIOR_ROUND,
CURRENT_ROUND,
FUTURE_ROUND
}
private static final Logger LOG = LogManager.getLogger();
public void handlePrepareMessage(final SignedData<PreparePayload> prepareMsg) {} private final IbftRoundFactory roundFactory;
private final RoundChangeManager roundChangeManager;
private final BlockHeader parentHeader;
private final RoundTimer roundTimer;
private final BlockTimer blockTimer;
private final IbftMessageTransmitter transmitter;
private final MessageFactory messageFactory;
private final Map<Integer, RoundState> futureRoundStateBuffer = Maps.newHashMap();
private final NewRoundMessageValidator newRoundMessageValidator;
private final Clock clock;
private final Function<ConsensusRoundIdentifier, RoundState> roundStateCreator;
private final IbftFinalState finalState;
public void handleCommitMessage(final SignedData<CommitPayload> commitMsg) {} private Optional<PreparedCertificate> latestPreparedCertificate = Optional.empty();
public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIndentifier) {} private IbftRound currentRound;
public void handleRoundChangeMessage(final SignedData<RoundChangePayload> roundChangeMsg) {} public IbftBlockHeightManager(
final BlockHeader parentHeader,
final IbftFinalState finalState,
final RoundChangeManager roundChangeManager,
final IbftRoundFactory ibftRoundFactory,
final Clock clock,
final MessageValidatorFactory messageValidatorFactory) {
this.parentHeader = parentHeader;
this.roundFactory = ibftRoundFactory;
this.roundTimer = finalState.getRoundTimer();
this.blockTimer = finalState.getBlockTimer();
this.transmitter = finalState.getTransmitter();
this.messageFactory = finalState.getMessageFactory();
this.clock = clock;
this.roundChangeManager = roundChangeManager;
this.finalState = finalState;
public void handleNewRoundMessage(final SignedData<NewRoundPayload> newRoundMsg) {} newRoundMessageValidator = messageValidatorFactory.createNewRoundValidator(parentHeader);
public void start() {} roundStateCreator =
(roundIdentifier) ->
new RoundState(
roundIdentifier,
finalState.getQuorumSize(),
messageValidatorFactory.createMessageValidator(roundIdentifier, parentHeader));
}
public void start() {
startNewRound(0);
if (finalState.isLocalNodeProposerForRound(currentRound.getRoundIdentifier())) {
blockTimer.startTimer(currentRound.getRoundIdentifier(), parentHeader);
}
}
public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIdentifier) {
if (roundIdentifier.equals(currentRound.getRoundIdentifier())) {
currentRound.createAndSendProposalMessage(clock.millis() / 1000);
} else {
LOG.info(
"Block timer expired for a round ({}) other than current ({})",
roundIdentifier,
currentRound.getRoundIdentifier());
}
}
public void roundExpired(final RoundExpiry expire) {
if (!expire.getView().equals(currentRound.getRoundIdentifier())) {
LOG.debug("Ignoring Round timer expired which does not match current round.");
return;
}
LOG.info("Round has expired, creating PreparedCertificate and notifying peers.");
final Optional<PreparedCertificate> preparedCertificate =
currentRound.createPrepareCertificate();
if (preparedCertificate.isPresent()) {
latestPreparedCertificate = preparedCertificate;
}
startNewRound(currentRound.getRoundIdentifier().getRoundNumber() + 1);
final SignedData<RoundChangePayload> localRoundChange =
messageFactory.createSignedRoundChangePayload(
currentRound.getRoundIdentifier(), latestPreparedCertificate);
transmitter.multicastRoundChange(currentRound.getRoundIdentifier(), latestPreparedCertificate);
// Its possible the locally created RoundChange triggers the transmission of a NewRound
// message - so it must be handled accordingly.
handleRoundChangeMessage(localRoundChange);
}
public void handleProposalMessage(final SignedData<ProposalPayload> msg) {
LOG.info("Received a Proposal message.");
actionOrBufferMessage(msg, currentRound::handleProposalMessage, RoundState::setProposedBlock);
}
public void handlePrepareMessage(final SignedData<PreparePayload> msg) {
LOG.info("Received a prepare message.");
actionOrBufferMessage(msg, currentRound::handlePrepareMessage, RoundState::addPrepareMessage);
}
public void handleCommitMessage(final SignedData<CommitPayload> msg) {
LOG.info("Received a commit message.");
actionOrBufferMessage(msg, currentRound::handleCommitMessage, RoundState::addCommitMessage);
}
private <T extends Payload> void actionOrBufferMessage(
final SignedData<T> msg,
final Consumer<SignedData<T>> inRoundHandler,
final BiConsumer<RoundState, SignedData<T>> buffer) {
final Payload payload = msg.getPayload();
final MessageAge messageAge = determineAgeOfPayload(payload);
if (messageAge == CURRENT_ROUND) {
inRoundHandler.accept(msg);
} else if (messageAge == FUTURE_ROUND) {
final ConsensusRoundIdentifier msgRoundId = payload.getRoundIdentifier();
final RoundState roundstate =
futureRoundStateBuffer.computeIfAbsent(
msgRoundId.getRoundNumber(), k -> roundStateCreator.apply(msgRoundId));
buffer.accept(roundstate, msg);
}
}
public void handleRoundChangeMessage(final SignedData<RoundChangePayload> msg) {
final Optional<RoundChangeCertificate> result =
roundChangeManager.appendRoundChangeMessage(msg);
final MessageAge messageAge = determineAgeOfPayload(msg.getPayload());
if (messageAge == PRIOR_ROUND) {
LOG.info("Received RoundChange Message for a prior round.");
return;
}
ConsensusRoundIdentifier targetRound = msg.getPayload().getRoundIdentifier();
LOG.info("Received a RoundChange message for {}", targetRound.toString());
if (result.isPresent()) {
if (messageAge == FUTURE_ROUND) {
startNewRound(targetRound.getRoundNumber());
}
if (finalState.isLocalNodeProposerForRound(targetRound)) {
currentRound.startRoundWith(result.get(), clock.millis() / 1000);
}
}
}
private void startNewRound(final int roundNumber) {
LOG.info("Starting new round {}", roundNumber);
if (futureRoundStateBuffer.containsKey(roundNumber)) {
currentRound =
roundFactory.createNewRoundWithState(
parentHeader, futureRoundStateBuffer.get(roundNumber));
futureRoundStateBuffer.keySet().removeIf(k -> k <= roundNumber);
} else {
currentRound = roundFactory.createNewRound(parentHeader, roundNumber);
}
// discard roundChange messages from the current and previous rounds
roundChangeManager.discardRoundsPriorTo(currentRound.getRoundIdentifier());
roundTimer.startTimer(currentRound.getRoundIdentifier());
}
public void handleNewRoundMessage(final SignedData<NewRoundPayload> msg) {
final NewRoundPayload payload = msg.getPayload();
final MessageAge messageAge = determineAgeOfPayload(payload);
if (messageAge == PRIOR_ROUND) {
LOG.info("Received NewRound Message for a prior round.");
return;
}
LOG.info("Received NewRound Message for {}", payload.getRoundIdentifier().toString());
if (newRoundMessageValidator.validateNewRoundMessage(msg)) {
if (messageAge == FUTURE_ROUND) {
startNewRound(payload.getRoundIdentifier().getRoundNumber());
}
currentRound.handleProposalMessage(payload.getProposalPayload());
}
}
public long getChainHeight() { public long getChainHeight() {
return 0; return currentRound.getRoundIdentifier().getSequenceNumber();
} }
public void roundExpired(final RoundExpiry expired) {} private MessageAge determineAgeOfPayload(final Payload payload) {
int messageRoundNumber = payload.getRoundIdentifier().getRoundNumber();
int currentRoundNumber = currentRound.getRoundIdentifier().getRoundNumber();
if (messageRoundNumber > currentRoundNumber) {
return FUTURE_ROUND;
} else if (messageRoundNumber == currentRoundNumber) {
return CURRENT_ROUND;
}
return PRIOR_ROUND;
}
} }

@ -12,12 +12,43 @@
*/ */
package tech.pegasys.pantheon.consensus.ibft.statemachine; package tech.pegasys.pantheon.consensus.ibft.statemachine;
import tech.pegasys.pantheon.consensus.ibft.IbftContext;
import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeader;
/** This no-op version will be replaced with an implementation in another PR */ import java.time.Clock;
public class IbftBlockHeightManagerFactory { public class IbftBlockHeightManagerFactory {
private final IbftRoundFactory roundFactory;
private final IbftFinalState finalState;
private final ProtocolContext<IbftContext> protocolContext;
private final MessageValidatorFactory messageValidatorFactory;
public IbftBlockHeightManagerFactory(
final IbftFinalState finalState,
final IbftRoundFactory roundFactory,
final MessageValidatorFactory messageValidatorFactory,
final ProtocolContext<IbftContext> protocolContext) {
this.roundFactory = roundFactory;
this.finalState = finalState;
this.protocolContext = protocolContext;
this.messageValidatorFactory = messageValidatorFactory;
}
public IbftBlockHeightManager create(final BlockHeader parentHeader) { public IbftBlockHeightManager create(final BlockHeader parentHeader) {
return new IbftBlockHeightManager(); long nextChainHeight = parentHeader.getNumber() + 1;
return new IbftBlockHeightManager(
parentHeader,
finalState,
new RoundChangeManager(
nextChainHeight,
finalState.getValidators(),
(roundIdentifier) ->
messageValidatorFactory.createMessageValidator(roundIdentifier, parentHeader)),
roundFactory,
Clock.systemUTC(),
messageValidatorFactory);
} }
} }

@ -27,6 +27,7 @@ import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator;
import java.time.Clock;
import java.util.Collection; import java.util.Collection;
/** This is the full data set, or context, required for many of the aspects of the IBFT workflow. */ /** This is the full data set, or context, required for many of the aspects of the IBFT workflow. */
@ -42,6 +43,7 @@ public class IbftFinalState {
private final MessageFactory messageFactory; private final MessageFactory messageFactory;
private final BlockHeaderValidator<IbftContext> ibftContextBlockHeaderValidator; private final BlockHeaderValidator<IbftContext> ibftContextBlockHeaderValidator;
private final IbftMessageTransmitter messageTransmitter; private final IbftMessageTransmitter messageTransmitter;
private final Clock clock;
public IbftFinalState( public IbftFinalState(
final ValidatorProvider validatorProvider, final ValidatorProvider validatorProvider,
@ -53,7 +55,8 @@ public class IbftFinalState {
final BlockTimer blockTimer, final BlockTimer blockTimer,
final IbftBlockCreatorFactory blockCreatorFactory, final IbftBlockCreatorFactory blockCreatorFactory,
final MessageFactory messageFactory, final MessageFactory messageFactory,
final BlockHeaderValidator<IbftContext> ibftContextBlockHeaderValidator) { final BlockHeaderValidator<IbftContext> ibftContextBlockHeaderValidator,
final Clock clock) {
this.validatorProvider = validatorProvider; this.validatorProvider = validatorProvider;
this.nodeKeys = nodeKeys; this.nodeKeys = nodeKeys;
this.localAddress = localAddress; this.localAddress = localAddress;
@ -65,6 +68,7 @@ public class IbftFinalState {
this.messageFactory = messageFactory; this.messageFactory = messageFactory;
this.ibftContextBlockHeaderValidator = ibftContextBlockHeaderValidator; this.ibftContextBlockHeaderValidator = ibftContextBlockHeaderValidator;
this.messageTransmitter = new IbftMessageTransmitter(messageFactory, peers); this.messageTransmitter = new IbftMessageTransmitter(messageFactory, peers);
this.clock = clock;
} }
public int getQuorumSize() { public int getQuorumSize() {
@ -122,4 +126,8 @@ public class IbftFinalState {
public IbftMessageTransmitter getTransmitter() { public IbftMessageTransmitter getTransmitter() {
return messageTransmitter; return messageTransmitter;
} }
public Clock getClock() {
return clock;
}
} }

@ -46,7 +46,6 @@ public class IbftRoundFactory {
long nextBlockHeight = parentHeader.getNumber() + 1; long nextBlockHeight = parentHeader.getNumber() + 1;
final ConsensusRoundIdentifier roundIdentifier = final ConsensusRoundIdentifier roundIdentifier =
new ConsensusRoundIdentifier(nextBlockHeight, round); new ConsensusRoundIdentifier(nextBlockHeight, round);
final IbftBlockCreator blockCreator = blockCreatorFactory.create(parentHeader, round);
final RoundState roundState = final RoundState roundState =
new RoundState( new RoundState(

@ -137,14 +137,12 @@ public class RoundChangeManager {
* *
* @param completedRoundIdentifier round identifier that has been identified as superseded * @param completedRoundIdentifier round identifier that has been identified as superseded
*/ */
public void discardCompletedRound(final ConsensusRoundIdentifier completedRoundIdentifier) { public void discardRoundsPriorTo(final ConsensusRoundIdentifier completedRoundIdentifier) {
roundChangeCache roundChangeCache.keySet().removeIf(k -> isAnEarlierRound(k, completedRoundIdentifier));
.entrySet()
.removeIf(entry -> isAnEarlierOrEqualRound(entry.getKey(), completedRoundIdentifier));
} }
private boolean isAnEarlierOrEqualRound( private boolean isAnEarlierRound(
final ConsensusRoundIdentifier left, final ConsensusRoundIdentifier right) { final ConsensusRoundIdentifier left, final ConsensusRoundIdentifier right) {
return left.getRoundNumber() <= right.getRoundNumber(); return left.getRoundNumber() < right.getRoundNumber();
} }
} }

@ -0,0 +1,368 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.statemachine;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.consensus.ibft.TestHelpers.createFrom;
import tech.pegasys.pantheon.consensus.common.VoteTally;
import tech.pegasys.pantheon.consensus.ibft.BlockTimer;
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.IbftContext;
import tech.pegasys.pantheon.consensus.ibft.IbftExtraData;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreator;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparedCertificate;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangeCertificate;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData;
import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidator;
import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidatorFactory;
import tech.pegasys.pantheon.consensus.ibft.validation.NewRoundMessageValidator;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.crypto.SECP256K1.Signature;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockBody;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.math.BigInteger;
import java.time.Clock;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class IbftBlockHeightManagerTest {
private KeyPair localNodeKeys = KeyPair.generate();
private final MessageFactory messageFactory = new MessageFactory(localNodeKeys);
private final BlockHeaderTestFixture headerTestFixture = new BlockHeaderTestFixture();
@Mock private IbftFinalState finalState;
@Mock private IbftMessageTransmitter messageTransmitter;
@Mock private RoundChangeManager roundChangeManager;
@Mock private IbftRoundFactory roundFactory;
@Mock private Clock clock;
@Mock private MessageValidatorFactory messageValidatorFactory;
@Mock private IbftBlockCreator blockCreator;
@Mock private BlockImporter<IbftContext> blockImporter;
@Mock private BlockTimer blockTimer;
@Mock private RoundTimer roundTimer;
@Mock private NewRoundMessageValidator newRoundMessageValidator;
@Captor private ArgumentCaptor<Optional<PreparedCertificate>> preparedCaptor;
private final List<KeyPair> validatorKeys = Lists.newArrayList();
private List<Address> validators = Lists.newArrayList();
private List<MessageFactory> validatorMessageFactory = Lists.newArrayList();
private ProtocolContext<IbftContext> protocolContext;
private ConsensusRoundIdentifier roundIdentifier = new ConsensusRoundIdentifier(1, 0);
private Block createdBlock;
private void buildCreatedBlock() {
IbftExtraData extraData =
new IbftExtraData(
BytesValue.wrap(new byte[32]), emptyList(), Optional.empty(), 0, validators);
headerTestFixture.extraData(extraData.encode());
final BlockHeader header = headerTestFixture.buildHeader();
createdBlock = new Block(header, new BlockBody(emptyList(), emptyList()));
}
@Before
public void setup() {
for (int i = 0; i < 3; i++) {
final KeyPair key = KeyPair.generate();
validatorKeys.add(key);
validators.add(Util.publicKeyToAddress(key.getPublicKey()));
validatorMessageFactory.add(new MessageFactory(key));
}
buildCreatedBlock();
final MessageValidator messageValidator = mock(MessageValidator.class);
when(messageValidator.addSignedProposalPayload(any())).thenReturn(true);
when(messageValidator.validateCommmitMessage(any())).thenReturn(true);
when(messageValidator.validatePrepareMessage(any())).thenReturn(true);
when(finalState.getTransmitter()).thenReturn(messageTransmitter);
when(finalState.getBlockTimer()).thenReturn(blockTimer);
when(finalState.getRoundTimer()).thenReturn(roundTimer);
when(finalState.getQuorumSize()).thenReturn(3);
when(finalState.getMessageFactory()).thenReturn(messageFactory);
when(blockCreator.createBlock(anyLong())).thenReturn(createdBlock);
when(newRoundMessageValidator.validateNewRoundMessage(any())).thenReturn(true);
when(messageValidatorFactory.createNewRoundValidator(any()))
.thenReturn(newRoundMessageValidator);
when(messageValidatorFactory.createMessageValidator(any(), any())).thenReturn(messageValidator);
protocolContext =
new ProtocolContext<>(null, null, new IbftContext(new VoteTally(validators), null));
// Ensure the created IbftRound has the valid ConsensusRoundIdentifier;
when(roundFactory.createNewRound(any(), anyInt()))
.thenAnswer(
invocation -> {
final int round = (int) invocation.getArgument(1);
final ConsensusRoundIdentifier roundId = new ConsensusRoundIdentifier(1, round);
final RoundState createdRoundState =
new RoundState(roundId, finalState.getQuorumSize(), messageValidator);
return new IbftRound(
createdRoundState,
blockCreator,
protocolContext,
blockImporter,
new Subscribers<>(),
localNodeKeys,
messageFactory,
messageTransmitter);
});
when(roundFactory.createNewRoundWithState(any(), any()))
.thenAnswer(
invocation -> {
final RoundState providedRoundState = invocation.getArgument(1);
return new IbftRound(
providedRoundState,
blockCreator,
protocolContext,
blockImporter,
new Subscribers<>(),
localNodeKeys,
messageFactory,
messageTransmitter);
});
}
@Test
public void startsABlockTimerOnStartIfLocalNodeIsTheProoserForRound() {
when(finalState.isLocalNodeProposerForRound(any())).thenReturn(true);
final IbftBlockHeightManager manager =
new IbftBlockHeightManager(
headerTestFixture.buildHeader(),
finalState,
roundChangeManager,
roundFactory,
clock,
messageValidatorFactory);
manager.start();
verify(blockTimer, times(1)).startTimer(any(), any());
}
@Test
public void onBlockTimerExpiryProposalMessageIsTransmitted() {
final IbftBlockHeightManager manager =
new IbftBlockHeightManager(
headerTestFixture.buildHeader(),
finalState,
roundChangeManager,
roundFactory,
clock,
messageValidatorFactory);
manager.start();
manager.handleBlockTimerExpiry(roundIdentifier);
verify(messageTransmitter, times(1)).multicastProposal(eq(roundIdentifier), any());
verify(messageTransmitter, never()).multicastPrepare(any(), any());
verify(messageTransmitter, never()).multicastPrepare(any(), any());
}
@Test
public void onRoundChangeReceptionRoundChangeManagerIsInvokedAndNewRoundStarted() {
final ConsensusRoundIdentifier futureRoundIdentifier = createFrom(roundIdentifier, 0, +2);
final SignedData<RoundChangePayload> roundChangePayload =
messageFactory.createSignedRoundChangePayload(futureRoundIdentifier, Optional.empty());
when(roundChangeManager.appendRoundChangeMessage(any()))
.thenReturn(Optional.of(new RoundChangeCertificate(singletonList(roundChangePayload))));
when(finalState.isLocalNodeProposerForRound(any())).thenReturn(false);
final IbftBlockHeightManager manager =
new IbftBlockHeightManager(
headerTestFixture.buildHeader(),
finalState,
roundChangeManager,
roundFactory,
clock,
messageValidatorFactory);
manager.start();
verify(roundFactory).createNewRound(any(), eq(0));
manager.handleRoundChangeMessage(roundChangePayload);
verify(roundChangeManager, times(1)).appendRoundChangeMessage(roundChangePayload);
verify(roundFactory, times(1))
.createNewRound(any(), eq(futureRoundIdentifier.getRoundNumber()));
}
@Test
public void onRoundTimerExpiryANewRoundIsCreatedWithAnIncrementedRoundNumber() {
final IbftBlockHeightManager manager =
new IbftBlockHeightManager(
headerTestFixture.buildHeader(),
finalState,
roundChangeManager,
roundFactory,
clock,
messageValidatorFactory);
manager.start();
verify(roundFactory).createNewRound(any(), eq(0));
manager.roundExpired(new RoundExpiry(roundIdentifier));
verify(roundFactory).createNewRound(any(), eq(1));
}
@Test
public void whenSufficientRoundChangesAreReceivedANewRoundMessageIsTransmitted() {
final ConsensusRoundIdentifier futureRoundIdentifier = createFrom(roundIdentifier, 0, +2);
final SignedData<RoundChangePayload> roundChangePayload =
messageFactory.createSignedRoundChangePayload(futureRoundIdentifier, Optional.empty());
final RoundChangeCertificate roundChangCert =
new RoundChangeCertificate(singletonList(roundChangePayload));
when(roundChangeManager.appendRoundChangeMessage(any()))
.thenReturn(Optional.of(roundChangCert));
when(finalState.isLocalNodeProposerForRound(any())).thenReturn(true);
final IbftBlockHeightManager manager =
new IbftBlockHeightManager(
headerTestFixture.buildHeader(),
finalState,
roundChangeManager,
roundFactory,
clock,
messageValidatorFactory);
manager.start();
manager.handleRoundChangeMessage(roundChangePayload);
verify(messageTransmitter, times(1))
.multicastNewRound(eq(futureRoundIdentifier), eq(roundChangCert), any());
}
@Test
public void messagesForFutureRoundsAreBufferedAndUsedToPreloadNewRoundWhenItIsStarted() {
final ConsensusRoundIdentifier futureRoundIdentifier = createFrom(roundIdentifier, 0, +2);
final IbftBlockHeightManager manager =
new IbftBlockHeightManager(
headerTestFixture.buildHeader(),
finalState,
roundChangeManager,
roundFactory,
clock,
messageValidatorFactory);
manager.start();
final SignedData<PreparePayload> preparePayload =
validatorMessageFactory
.get(0)
.createSignedPreparePayload(futureRoundIdentifier, Hash.fromHexStringLenient("0"));
final SignedData<CommitPayload> commitPayload =
validatorMessageFactory
.get(1)
.createSignedCommitPayload(
futureRoundIdentifier,
Hash.fromHexStringLenient("0"),
Signature.create(BigInteger.ONE, BigInteger.ONE, (byte) 1));
manager.handlePrepareMessage(preparePayload);
manager.handleCommitMessage(commitPayload);
// Force a new round to be started at new round number.
final SignedData<NewRoundPayload> newRound =
messageFactory.createSignedNewRoundPayload(
futureRoundIdentifier,
new RoundChangeCertificate(Collections.emptyList()),
messageFactory.createSignedProposalPayload(futureRoundIdentifier, createdBlock));
manager.handleNewRoundMessage(newRound);
// Final state sets the Quorum Size to 3, so should send a Prepare and also a commit
verify(messageTransmitter, times(1)).multicastPrepare(eq(futureRoundIdentifier), any());
verify(messageTransmitter, times(1)).multicastPrepare(eq(futureRoundIdentifier), any());
}
@Test
public void preparedCertificateIncludedInRoundChangeMessageOnRoundTimeoutExpired() {
final IbftBlockHeightManager manager =
new IbftBlockHeightManager(
headerTestFixture.buildHeader(),
finalState,
roundChangeManager,
roundFactory,
clock,
messageValidatorFactory);
manager.start();
manager.handleBlockTimerExpiry(roundIdentifier); // Trigger a Proposal creation.
final SignedData<PreparePayload> preparePayload =
validatorMessageFactory
.get(0)
.createSignedPreparePayload(roundIdentifier, Hash.fromHexStringLenient("0"));
final SignedData<PreparePayload> secondPreparePayload =
validatorMessageFactory
.get(1)
.createSignedPreparePayload(roundIdentifier, Hash.fromHexStringLenient("0"));
manager.handlePrepareMessage(preparePayload);
manager.handlePrepareMessage(secondPreparePayload);
manager.roundExpired(new RoundExpiry(roundIdentifier));
final ConsensusRoundIdentifier nextRound = createFrom(roundIdentifier, 0, +1);
verify(messageTransmitter, times(1))
.multicastRoundChange(eq(nextRound), preparedCaptor.capture());
final Optional<PreparedCertificate> preparedCert = preparedCaptor.getValue();
assertThat(preparedCert).isNotEmpty();
assertThat(preparedCert.get().getPreparePayloads())
.containsOnly(preparePayload, secondPreparePayload);
}
}

@ -158,7 +158,7 @@ public class RoundChangeManagerTest {
} }
@Test @Test
public void discardsPreviousRounds() { public void discardsRoundPreviousToThatRequested() {
SignedData<RoundChangePayload> roundChangeDataProposer = SignedData<RoundChangePayload> roundChangeDataProposer =
makeRoundChangeMessage(proposerKey, ri1); makeRoundChangeMessage(proposerKey, ri1);
SignedData<RoundChangePayload> roundChangeDataValidator1 = SignedData<RoundChangePayload> roundChangeDataValidator1 =
@ -171,7 +171,7 @@ public class RoundChangeManagerTest {
.isEqualTo(Optional.empty()); .isEqualTo(Optional.empty());
assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator2)) assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator2))
.isEqualTo(Optional.empty()); .isEqualTo(Optional.empty());
manager.discardCompletedRound(ri1); manager.discardRoundsPriorTo(ri2);
assertThat(manager.roundChangeCache.get(ri1)).isNull(); assertThat(manager.roundChangeCache.get(ri1)).isNull();
assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(1); assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(1);
assertThat(manager.roundChangeCache.get(ri3).receivedMessages.size()).isEqualTo(1); assertThat(manager.roundChangeCache.get(ri3).receivedMessages.size()).isEqualTo(1);

Loading…
Cancel
Save