do not sent committed at 67%

pull/3405/head
Rongjian Lan 4 years ago
parent 0af08d2c12
commit 4aef0f4132
  1. 2
      cmd/harmony/main.go
  2. 4
      consensus/consensus.go
  3. 7
      consensus/consensus_msg_sender.go
  4. 82
      consensus/consensus_v2.go
  5. 2
      consensus/engine/consensus_engine.go
  6. 13
      consensus/leader.go
  7. 1
      consensus/threshold.go
  8. 14
      consensus/validator.go
  9. 2
      consensus/view_change.go
  10. 10
      node/node_handler.go
  11. 3
      node/node_newblock.go

@ -653,7 +653,7 @@ func setupConsensusAndNode(hc harmonyConfig, nodeConfig *nodeconfig.ConfigType)
// Assign closure functions to the consensus object // Assign closure functions to the consensus object
currentConsensus.SetBlockVerifier(currentNode.VerifyNewBlock) currentConsensus.SetBlockVerifier(currentNode.VerifyNewBlock)
currentConsensus.OnConsensusDone = currentNode.PostConsensusProcessing currentConsensus.PostConsensusJob = currentNode.PostConsensusProcessing
// update consensus information based on the blockchain // update consensus information based on the blockchain
currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation()) currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation())
currentConsensus.NextBlockDue = time.Now() currentConsensus.NextBlockDue = time.Now()

@ -92,9 +92,9 @@ type Consensus struct {
ReadySignal chan ProposalType ReadySignal chan ProposalType
// Channel to send full commit signatures to finish new block proposal // Channel to send full commit signatures to finish new block proposal
CommitSigChannel chan []byte CommitSigChannel chan []byte
// The post-consensus processing func passed from Node object // The post-consensus job func passed from Node object
// Called when consensus on a new block is done // Called when consensus on a new block is done
OnConsensusDone func(*types.Block) error PostConsensusJob func(*types.Block) error
// The verifier func passed from Node object // The verifier func passed from Node object
BlockVerifier BlockVerifierFunc BlockVerifier BlockVerifierFunc
// verified block to state sync broadcast // verified block to state sync broadcast

@ -56,7 +56,7 @@ func (sender *MessageSender) Reset(blockNum uint64) {
} }
// SendWithRetry sends message with retry logic. // SendWithRetry sends message with retry logic.
func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte, immediate bool) error { func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) error {
if sender.retryTimes != 0 { if sender.retryTimes != 0 {
msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0} msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0}
atomic.StoreUint32(&msgRetry.isActive, 1) atomic.StoreUint32(&msgRetry.isActive, 1)
@ -65,10 +65,7 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa
sender.Retry(&msgRetry) sender.Retry(&msgRetry)
}() }()
} }
if immediate { return sender.host.SendMessageToGroups(groups, p2pMsg)
return sender.host.SendMessageToGroups(groups, p2pMsg)
}
return nil
} }
// SendWithoutRetry sends message without retry logic. // SendWithoutRetry sends message without retry logic.

@ -135,6 +135,9 @@ func (consensus *Consensus) finalCommit() {
return return
} }
consensus.getLogger().Info().Hex("new", commitSigAndBitmap).Msg("[finalCommit] Overriding commit signatures!!")
consensus.Blockchain.WriteCommitSig(block.NumberU64(), commitSigAndBitmap)
block.SetCurrentCommitSig(commitSigAndBitmap) block.SetCurrentCommitSig(commitSigAndBitmap)
err = consensus.commitBlock(block, FBFTMsg) err = consensus.commitBlock(block, FBFTMsg)
@ -146,22 +149,12 @@ func (consensus *Consensus) finalCommit() {
} }
// if leader successfully finalizes the block, send committed message to validators // if leader successfully finalizes the block, send committed message to validators
// TODO: once leader rotation is implemented, leader who is about to be switched out
// needs to send the committed message immediately so the next leader can
// have the full commit signatures for new block
// For now, the leader don't need to send immediately as the committed sig will be
// included in the next block and sent in next prepared message. Unless the node
// won't be the leader anymore or it's the last block of the epoch (no pipelining).
sendImmediately := false
if !consensus.IsLeader() || block.IsLastBlockInEpoch() {
sendImmediately = true
}
if err := consensus.msgSender.SendWithRetry( if err := consensus.msgSender.SendWithRetry(
block.NumberU64(), block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
}, },
p2p.ConstructMessage(msgToSend), sendImmediately); err != nil { p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[finalCommit] Cannot send committed message") consensus.getLogger().Warn().Err(err).Msg("[finalCommit] Cannot send committed message")
} else { } else {
consensus.getLogger().Info(). consensus.getLogger().Info().
@ -170,10 +163,6 @@ func (consensus *Consensus) finalCommit() {
Msg("[finalCommit] Sent Committed Message") Msg("[finalCommit] Sent Committed Message")
} }
if consensus.IsLeader() && block.IsLastBlockInEpoch() {
consensus.ReadySignal <- AsyncProposal
}
// Dump new block into level db // Dump new block into level db
// In current code, we add signatures in block in tryCatchup, the block dump to explorer does not contains signatures // In current code, we add signatures in block in tryCatchup, the block dump to explorer does not contains signatures
// but since explorer doesn't need signatures, it should be fine // but since explorer doesn't need signatures, it should be fine
@ -203,14 +192,17 @@ func (consensus *Consensus) finalCommit() {
// Update time due for next block // Update time due for next block
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod)
// Send commit sig/bitmap to finish the new block proposal // If still the leader, send commit sig/bitmap to finish the new block proposal,
go func() { // else, the block proposal will timeout by itself.
select { if consensus.IsLeader() {
case consensus.CommitSigChannel <- commitSigAndBitmap: go func() {
case <-time.After(6 * time.Second): select {
utils.Logger().Error().Err(err).Msg("[finalCommit] channel not received after 6s for commitSigAndBitmap") case consensus.CommitSigChannel <- commitSigAndBitmap:
} case <-time.After(6 * time.Second):
}() utils.Logger().Error().Err(err).Msg("[finalCommit] channel not received after 6s for commitSigAndBitmap")
}
}()
}
} }
// BlockCommitSigs returns the byte array of aggregated // BlockCommitSigs returns the byte array of aggregated
@ -493,46 +485,19 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
return errors.New("block to pre-commit is nil") return errors.New("block to pre-commit is nil")
} }
leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey()
if err != nil {
return err
}
consensus.mutex.Lock() consensus.mutex.Lock()
network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey}) bareMinimumCommit := consensus.constructQuorumSigAndBitmap(quorum.Commit)
consensus.mutex.Unlock() consensus.mutex.Unlock()
if err != nil { blk.SetCurrentCommitSig(bareMinimumCommit)
return errors.Wrap(err, "[preCommitAndPropose] Unable to construct Committed message")
}
msgToSend, FBFTMsg :=
network.Bytes,
network.FBFTMsg
consensus.FBFTLog.AddMessage(FBFTMsg)
blk.SetCurrentCommitSig(FBFTMsg.Payload) if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, true); err != nil {
if err := consensus.OnConsensusDone(blk); err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain") consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return err return err
} }
if err := consensus.msgSender.SendWithRetry(
blk.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend), true); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
} else {
consensus.getLogger().Info().
Str("blockHash", blk.Hash().Hex()).
Uint64("blockNum", consensus.blockNum).
Msg("[preCommitAndPropose] Sent Committed Message")
}
// Send signal to Node to propose the new block for consensus // Send signal to Node to propose the new block for consensus
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] sending block proposal signal") consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal")
consensus.ReadySignal <- AsyncProposal consensus.ReadySignal <- AsyncProposal
return nil return nil
@ -584,7 +549,8 @@ func (consensus *Consensus) tryCatchup() error {
func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error {
if consensus.Blockchain.CurrentBlock().NumberU64() < blk.NumberU64() { if consensus.Blockchain.CurrentBlock().NumberU64() < blk.NumberU64() {
if err := consensus.OnConsensusDone(blk); err != nil { if _, err := consensus.Blockchain.InsertChain([]*types.Block{blk}, true); err != nil {
consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain")
return err return err
} }
} }
@ -594,10 +560,16 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
return errIncorrectSender return errIncorrectSender
} }
consensus.PostConsensusJob(blk)
consensus.SetupForNewConsensus(blk, committedMsg) consensus.SetupForNewConsensus(blk, committedMsg)
consensus.FinishFinalityCount()
utils.Logger().Info().Uint64("blockNum", blk.NumberU64()).
Str("hash", blk.Header().Hash().Hex()).
Msg("Added New Block to Blockchain!!!")
return nil return nil
} }
// SetupForNewConsensus sets the state for new consensus
func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
atomic.AddUint64(&consensus.blockNum, 1) atomic.AddUint64(&consensus.blockNum, 1)
consensus.SetCurBlockViewID(committedMsg.ViewID + 1) consensus.SetCurBlockViewID(committedMsg.ViewID + 1)

@ -15,7 +15,7 @@ import (
staking "github.com/harmony-one/harmony/staking/types" staking "github.com/harmony-one/harmony/staking/types"
) )
// Blockchain defines a collection of methods needed to access the local // ChainReader defines a collection of methods needed to access the local
// blockchain during header and/or uncle verification. // blockchain during header and/or uncle verification.
// Note this reader interface is still in process of being integrated with the BFT consensus. // Note this reader interface is still in process of being integrated with the BFT consensus.
type ChainReader interface { type ChainReader interface {

@ -80,7 +80,7 @@ func (consensus *Consensus) announce(block *types.Block) {
if err := consensus.msgSender.SendWithRetry( if err := consensus.msgSender.SendWithRetry(
consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{ consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
}, p2p.ConstructMessage(msgToSend), true); err != nil { }, p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn(). consensus.getLogger().Warn().
Str("groupID", string(nodeconfig.NewGroupIDByShardID( Str("groupID", string(nodeconfig.NewGroupIDByShardID(
nodeconfig.ShardID(consensus.ShardID), nodeconfig.ShardID(consensus.ShardID),
@ -300,13 +300,10 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
if !quorumWasMet && quorumIsMet { if !quorumWasMet && quorumIsMet {
logger.Info().Msg("[OnCommit] 2/3 Enough commits received") logger.Info().Msg("[OnCommit] 2/3 Enough commits received")
// If it's not the epoch block, do pipelining and send committed message to validators now at 67% committed. go func() {
if !blockObj.IsLastBlockInEpoch() { // TODO: make it synchronized with commitFinishChan
go func() { consensus.preCommitAndPropose(blockObj)
// TODO: make it synchronized with commitFinishChan }()
consensus.preCommitAndPropose(blockObj)
}()
}
consensus.getLogger().Info().Msg("[OnCommit] Starting Grace Period") consensus.getLogger().Info().Msg("[OnCommit] Starting Grace Period")
go func(viewID uint64) { go func(viewID uint64) {

@ -74,7 +74,6 @@ func (consensus *Consensus) didReachPrepareQuorum() error {
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
}, },
p2p.ConstructMessage(msgToSend), p2p.ConstructMessage(msgToSend),
true,
); err != nil { ); err != nil {
consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message") consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message")
} else { } else {

@ -284,20 +284,6 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.aggregatedCommitSig = aggSig consensus.aggregatedCommitSig = aggSig
consensus.commitBitmap = mask consensus.commitBitmap = mask
// If we already have a committed signature received before, check whether the new one
// has more signatures and if yes, override the old data.
// Otherwise, simply write the commit signature in db.
commitSigBitmap, err := consensus.Blockchain.ReadCommitSig(blockObj.NumberU64())
if err == nil && len(commitSigBitmap) == len(recvMsg.Payload) {
new := mask.CountEnabled()
mask.SetMask(commitSigBitmap[bls.BLSSignatureSizeInBytes:])
cur := mask.CountEnabled()
if new > cur {
consensus.getLogger().Info().Hex("old", commitSigBitmap).Hex("new", recvMsg.Payload).Msg("[OnCommitted] Overriding commit signatures!!")
consensus.Blockchain.WriteCommitSig(blockObj.NumberU64(), recvMsg.Payload)
}
}
consensus.tryCatchup() consensus.tryCatchup()
if recvMsg.BlockNum > consensus.blockNum { if recvMsg.BlockNum > consensus.blockNum {
consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC") consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC")

@ -265,7 +265,6 @@ func (consensus *Consensus) startViewChange() {
[]nodeconfig.GroupID{ []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))},
p2p.ConstructMessage(msgToSend), p2p.ConstructMessage(msgToSend),
true,
); err != nil { ); err != nil {
consensus.getLogger().Err(err). consensus.getLogger().Err(err).
Msg("[startViewChange] could not send out the ViewChange message") Msg("[startViewChange] could not send out the ViewChange message")
@ -295,7 +294,6 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri
[]nodeconfig.GroupID{ []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))},
p2p.ConstructMessage(msgToSend), p2p.ConstructMessage(msgToSend),
true,
); err != nil { ); err != nil {
return errors.New("failed to send out the NewView message") return errors.New("failed to send out the NewView message")
} }

@ -353,16 +353,6 @@ func (node *Node) numSignaturesIncludedInBlock(block *types.Block) uint32 {
// 2. [leader] send new block to the client // 2. [leader] send new block to the client
// 3. [leader] send cross shard tx receipts to destination shard // 3. [leader] send cross shard tx receipts to destination shard
func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
if _, err := node.Blockchain().InsertChain([]*types.Block{newBlock}, true); err != nil {
return err
}
utils.Logger().Info().
Uint64("blockNum", newBlock.NumberU64()).
Str("hash", newBlock.Header().Hash().Hex()).
Msg("Added New Block to Blockchain!!!")
node.Consensus.FinishFinalityCount()
if node.Consensus.IsLeader() { if node.Consensus.IsLeader() {
if node.NodeConfig.ShardID == shard.BeaconChainShardID { if node.NodeConfig.ShardID == shard.BeaconChainShardID {
node.BroadcastNewBlock(newBlock) node.BroadcastNewBlock(newBlock)

@ -44,7 +44,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
Msg("Consensus new block proposal: STOPPED!") Msg("Consensus new block proposal: STOPPED!")
return return
case proposalType := <-readySignal: case proposalType := <-readySignal:
for node.Consensus != nil && node.Consensus.IsLeader() { if node.Consensus != nil && node.Consensus.IsLeader() {
time.Sleep(SleepPeriod) time.Sleep(SleepPeriod)
utils.Logger().Info(). utils.Logger().Info().
Uint64("blockNum", node.Blockchain().CurrentBlock().NumberU64()+1). Uint64("blockNum", node.Blockchain().CurrentBlock().NumberU64()+1).
@ -88,7 +88,6 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
// Send the new block to Consensus so it can be confirmed. // Send the new block to Consensus so it can be confirmed.
node.BlockChannel <- newBlock node.BlockChannel <- newBlock
break
} else { } else {
utils.Logger().Err(err).Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") utils.Logger().Err(err).Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!")
} }

Loading…
Cancel
Save