clean up code

pull/3405/head
Rongjian Lan 4 years ago
parent ae8da18230
commit 3ba4620431
  1. 2
      consensus/consensus.go
  2. 1
      consensus/consensus_msg_sender.go
  3. 2
      consensus/consensus_service.go
  4. 21
      consensus/consensus_v2.go
  5. 8
      consensus/leader.go
  6. 4
      consensus/validator.go
  7. 4
      internal/utils/singleton.go
  8. 2
      node/node.go

@ -30,7 +30,7 @@ var errLeaderPriKeyNotFound = errors.New("getting leader private key from consen
// ProposalType is to indicate the type of signal for new block proposal // ProposalType is to indicate the type of signal for new block proposal
type ProposalType byte type ProposalType byte
// Constant of the top level Message Type exchanged among nodes // Constant of the type of new block proposal
const ( const (
SyncProposal ProposalType = iota SyncProposal ProposalType = iota
AsyncProposal AsyncProposal

@ -115,7 +115,6 @@ func (sender *MessageSender) StopRetry(msgType msg_pb.MessageType) {
if ok { if ok {
msgRetry := data.(*MessageRetry) msgRetry := data.(*MessageRetry)
atomic.StoreUint32(&msgRetry.isActive, 0) atomic.StoreUint32(&msgRetry.isActive, 0)
utils.Logger().Info().Str("type", msgType.String()).Uint32("isActive", msgRetry.isActive).Msg("STOPPING RETRY")
} }
} }

@ -311,7 +311,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
} }
} }
consensus.BlockPeriod = 4 * time.Second consensus.BlockPeriod = 5 * time.Second
// Enable aggregate sig at epoch 1000 for mainnet, at epoch 53000 for testnet, and always for other nets. // Enable aggregate sig at epoch 1000 for mainnet, at epoch 53000 for testnet, and always for other nets.
if (consensus.Blockchain.Config().ChainID == params.MainnetChainID && curEpoch.Cmp(big.NewInt(1000)) > 0) || if (consensus.Blockchain.Config().ChainID == params.MainnetChainID && curEpoch.Cmp(big.NewInt(1000)) > 0) ||

@ -123,7 +123,7 @@ func (consensus *Consensus) finalCommit() {
msgToSend, FBFTMsg := msgToSend, FBFTMsg :=
network.Bytes, network.Bytes,
network.FBFTMsg network.FBFTMsg
commitSigAndBitmap := FBFTMsg.Payload // this may not needed commitSigAndBitmap := FBFTMsg.Payload
consensus.FBFTLog.AddMessage(FBFTMsg) consensus.FBFTLog.AddMessage(FBFTMsg)
// find correct block content // find correct block content
curBlockHash := consensus.blockHash curBlockHash := consensus.blockHash
@ -135,8 +135,8 @@ func (consensus *Consensus) finalCommit() {
return return
} }
block.SetCurrentCommitSig(commitSigAndBitmap)
consensus.commitBlock(block, FBFTMsg) consensus.commitBlock(block, FBFTMsg)
consensus.Blockchain.WriteCommitSig(block.NumberU64(), commitSigAndBitmap)
if consensus.blockNum-beforeCatchupNum != 1 { if consensus.blockNum-beforeCatchupNum != 1 {
consensus.getLogger().Warn(). consensus.getLogger().Warn().
@ -145,13 +145,13 @@ func (consensus *Consensus) finalCommit() {
return return
} }
// if leader success finalize the block, send committed message to validators // if leader successfully finalizes the block, send committed message to validators
// TODO: once leader rotation is implemented, leader who is about to be switched out // TODO: once leader rotation is implemented, leader who is about to be switched out
// needs to send the committed message immediately so the next leader can // needs to send the committed message immediately so the next leader can
// have the full commit signatures for new block // have the full commit signatures for new block
// For now, the leader don't need to send immediately as the committed sig will be // For now, the leader don't need to send immediately as the committed sig will be
// included in the next block and sent in next prepared message. // included in the next block and sent in next prepared message. Unless the node
// won't be the leader anymore or it's the last block of the epoch (no pipelining).
sendImmediately := false sendImmediately := false
if !consensus.IsLeader() || block.IsLastBlockInEpoch() { if !consensus.IsLeader() || block.IsLastBlockInEpoch() {
sendImmediately = true sendImmediately = true
@ -495,9 +495,9 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
} }
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock()
network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey}) network, err := consensus.construct(msg_pb.MessageType_COMMITTED, nil, []*bls.PrivateKeyWrapper{leaderPriKey})
consensus.mutex.Unlock()
if err != nil { if err != nil {
return errors.Wrap(err, "[preCommitAndPropose] Unable to construct Committed message") return errors.Wrap(err, "[preCommitAndPropose] Unable to construct Committed message")
} }
@ -513,9 +513,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
return err return err
} }
// If I am still the leader // If it's not the epoch block, do pipelining and send committed message to validators now at 67% committed.
//if consensus.IsLeader() {
// if leader success finalize the block, send committed message to validators
if !blk.IsLastBlockInEpoch() { if !blk.IsLastBlockInEpoch() {
if err := consensus.msgSender.SendWithRetry( if err := consensus.msgSender.SendWithRetry(
blk.NumberU64(), blk.NumberU64(),
@ -535,10 +533,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
// Send signal to Node to propose the new block for consensus // Send signal to Node to propose the new block for consensus
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] sending block proposal signal") consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] sending block proposal signal")
// TODO: make sure preCommit happens before finalCommit
consensus.ReadySignal <- AsyncProposal consensus.ReadySignal <- AsyncProposal
//}
consensus.getLogger().Warn().Msg("[preCommitAndPropose] FULLY FINISHED")
return nil return nil
} }

@ -3,8 +3,10 @@ package consensus
import ( import (
"time" "time"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/consensus/signature" "github.com/harmony-one/harmony/consensus/signature"
@ -199,6 +201,10 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
} }
func (consensus *Consensus) onCommit(msg *msg_pb.Message) { func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
utils.Logger().Info().Msgf("ViewChanging %d %d", consensus.GetCurBlockViewID(), consensus.GetViewChangingID())
if consensus.GetCurBlockViewID()%7 == 0 {
return
}
recvMsg, err := consensus.ParseFBFTMessage(msg) recvMsg, err := consensus.ParseFBFTMessage(msg)
if err != nil { if err != nil {
consensus.getLogger().Debug().Err(err).Msg("[OnCommit] Parse pbft message failed") consensus.getLogger().Debug().Err(err).Msg("[OnCommit] Parse pbft message failed")

@ -239,10 +239,6 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
Msg("Wrong BlockNum Received, ignoring!") Msg("Wrong BlockNum Received, ignoring!")
return return
} }
if recvMsg.BlockNum > consensus.blockNum+1 {
consensus.getLogger().Info().Msg("[OnCommitted] low consensus block number. Spin up state sync")
consensus.spinUpStateSync()
}
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil { if err != nil {

@ -45,11 +45,11 @@ func SetLogContext(_port, _ip string) {
// SetLogVerbosity specifies the verbosity of global logger // SetLogVerbosity specifies the verbosity of global logger
func SetLogVerbosity(verbosity log.Lvl) { func SetLogVerbosity(verbosity log.Lvl) {
logVerbosity = 4 logVerbosity = verbosity
if glogger != nil { if glogger != nil {
glogger.Verbosity(logVerbosity) glogger.Verbosity(logVerbosity)
} }
updateZeroLogLevel(int(4)) updateZeroLogLevel(int(verbosity))
} }
// AddLogFile creates a StreamHandler that outputs JSON logs // AddLogFile creates a StreamHandler that outputs JSON logs

@ -864,7 +864,7 @@ func (node *Node) Start() error {
} }
for e := range errChan { for e := range errChan {
utils.Logger().Info(). utils.SampledLogger().Info().
Interface("item", e.payload). Interface("item", e.payload).
Msgf("[p2p]: issue while handling incoming p2p message: %v", e.err) Msgf("[p2p]: issue while handling incoming p2p message: %v", e.err)
} }

Loading…
Cancel
Save