stabalize consensus process with pipelining

pull/3405/head
Rongjian Lan 4 years ago
parent 0819464f2b
commit 8906679ad7
  1. 2
      consensus/consensus_service.go
  2. 21
      consensus/consensus_v2.go
  3. 3
      consensus/quorum/one-node-staked-vote.go
  4. 17
      consensus/validator.go
  5. 2
      internal/utils/singleton.go
  6. 1
      node/node_cross_shard.go

@ -311,7 +311,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
}
}
consensus.BlockPeriod = 3 * time.Second
consensus.BlockPeriod = 4 * time.Second
// Enable aggregate sig at epoch 1000 for mainnet, at epoch 53000 for testnet, and always for other nets.
if (consensus.Blockchain.Config().ChainID == params.MainnetChainID && curEpoch.Cmp(big.NewInt(1000)) > 0) ||

@ -7,6 +7,8 @@ import (
"sync/atomic"
"time"
"github.com/harmony-one/harmony/internal/utils"
"github.com/rs/zerolog"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
@ -134,6 +136,7 @@ func (consensus *Consensus) finalCommit() {
}
consensus.commitBlock(block, FBFTMsg)
consensus.Blockchain.WriteCommitSig(block.NumberU64(), commitSigAndBitmap)
if consensus.blockNum-beforeCatchupNum != 1 {
consensus.getLogger().Warn().
@ -148,7 +151,7 @@ func (consensus *Consensus) finalCommit() {
// have the full commit signatures for new block
// For now, the leader don't need to send immediately as the committed sig will be
// included in the next block and sent in next prepared message.
sendImmediately := false
sendImmediately := true
if err := consensus.msgSender.SendWithRetry(
block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
@ -189,12 +192,17 @@ func (consensus *Consensus) finalCommit() {
// Sleep to wait for the full block time
consensus.getLogger().Info().Msg("[finalCommit] Waiting for Block Time")
<-time.After(time.Until(consensus.NextBlockDue))
// Send commit sig/bitmap to finish the new block proposal
consensus.CommitSigChannel <- commitSigAndBitmap
// Update time due for next block
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod)
// Send commit sig/bitmap to finish the new block proposal
go func() {
select {
case consensus.CommitSigChannel <- commitSigAndBitmap:
case <-time.After(6 * time.Second):
utils.Logger().Error().Err(err).Msg("[finalCommit] channel not received after 6s for commitSigAndBitmap")
}
}()
}
// BlockCommitSigs returns the byte array of aggregated
@ -517,6 +525,9 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
}
// Send signal to Node to propose the new block for consensus
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] sending block proposal signal")
// TODO: make sure preCommit happens before finalCommit
consensus.ReadySignal <- struct{}{}
return nil
}

@ -188,7 +188,8 @@ func (v *stakedVoteWeight) QuorumThreshold() numeric.Dec {
// IsAllSigsCollected ..
func (v *stakedVoteWeight) IsAllSigsCollected() bool {
return v.SignersCount(Commit) == v.ParticipantsCount()
utils.Logger().Info().Msgf("ALL SIGS %s", v.voteTally.Commit.tally)
return v.voteTally.Commit.tally.Equal(numeric.NewDec(1))
}
func (v *stakedVoteWeight) SetVoters(

@ -114,7 +114,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
Msg("Wrong BlockNum Received, ignoring!")
return
}
if recvMsg.BlockNum > consensus.blockNum {
if recvMsg.BlockNum > consensus.blockNum+1 {
consensus.getLogger().Warn().Msgf("[OnPrepared] low consensus block number. Spin sync")
consensus.spinUpStateSync()
}
@ -156,12 +156,6 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
// tryCatchup is also run in onCommitted(), so need to lock with commitMutex.
if consensus.current.Mode() != Normal {
// don't sign the block that is not verified
consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!")
return
}
if consensus.BlockVerifier == nil {
consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring")
return
@ -217,6 +211,13 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
copy(consensus.blockHash[:], blockHash[:])
}
// tryCatchup is also run in onCommitted(), so need to lock with commitMutex.
if consensus.current.Mode() != Normal {
// don't sign the block that is not verified
consensus.getLogger().Info().Msg("[OnPrepared] Not in normal mode, Exiting!!")
return
}
consensus.sendCommitMessages(&blockObj)
consensus.switchPhase("onPrepared", FBFTCommit)
}
@ -241,7 +242,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
Msg("Wrong BlockNum Received, ignoring!")
return
}
if recvMsg.BlockNum > consensus.blockNum {
if recvMsg.BlockNum > consensus.blockNum+1 {
consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync")
consensus.spinUpStateSync()
}

@ -45,7 +45,7 @@ func SetLogContext(_port, _ip string) {
// SetLogVerbosity specifies the verbosity of global logger
func SetLogVerbosity(verbosity log.Lvl) {
logVerbosity = verbosity
logVerbosity = 4
if glogger != nil {
glogger.Verbosity(logVerbosity)
}

@ -20,6 +20,7 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block) {
//#### Read payload data from committed msg
if len(commitSigAndBitmap) <= 96 {
utils.Logger().Debug().Int("commitSigAndBitmapLen", len(commitSigAndBitmap)).Msg("[BroadcastCXReceipts] commitSigAndBitmap Not Enough Length")
return
}
commitSig := make([]byte, 96)
commitBitmap := make([]byte, len(commitSigAndBitmap)-96)

Loading…
Cancel
Save