diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index 3ec4b398e..cfb4efa04 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -3,6 +3,7 @@ package blockproposal import ( "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/internal/utils" ) @@ -10,14 +11,14 @@ import ( type Service struct { stopChan chan struct{} stoppedChan chan struct{} - readySignal chan struct{} + readySignal chan consensus.ProposalType commitSigsChan chan []byte messageChan chan *msg_pb.Message - waitForConsensusReady func(readySignal chan struct{}, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) + waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) } // New returns a block proposal service. -func New(readySignal chan struct{}, commitSigsChan chan []byte, waitForConsensusReady func(readySignal chan struct{}, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})) *Service { +func New(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})) *Service { return &Service{readySignal: readySignal, commitSigsChan: commitSigsChan, waitForConsensusReady: waitForConsensusReady} } diff --git a/consensus/consensus.go b/consensus/consensus.go index b626bd324..21f544080 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -27,6 +27,15 @@ const ( var errLeaderPriKeyNotFound = errors.New("getting leader private key from consensus public keys failed") +// ProposalType is to indicate the type of signal for new block proposal +type ProposalType byte + +// Constant of the top level Message Type exchanged among nodes +const ( + SyncProposal ProposalType = iota + AsyncProposal +) + // BlockVerifierFunc is a function used to verify the block type BlockVerifierFunc func(*types.Block) error @@ -79,8 +88,8 @@ type Consensus struct { mutex sync.Mutex // ViewChange struct vc *viewChange - // Signal channel for starting a new consensus process - ReadySignal chan struct{} + // Signal channel for proposing a new block and start new consensus + ReadySignal chan ProposalType // Channel to send full commit signatures to finish new block proposal CommitSigChannel chan []byte // The post-consensus processing func passed from Node object @@ -208,7 +217,7 @@ func New( consensus.syncNotReadyChan = make(chan struct{}) consensus.SlashChan = make(chan slash.Record) consensus.commitFinishChan = make(chan uint64) - consensus.ReadySignal = make(chan struct{}) + consensus.ReadySignal = make(chan ProposalType) consensus.CommitSigChannel = make(chan []byte) // channel for receiving newly generated VDF consensus.RndChannel = make(chan [vdfAndSeedSize]byte) diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index e9968bc74..7b6fc7e01 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -451,7 +451,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode { consensus.getLogger().Info(). Str("myKey", myPubKeys.SerializeToHexStr()). Msg("[UpdateConsensusInformation] I am the New Leader") - consensus.ReadySignal <- struct{}{} + consensus.ReadySignal <- SyncProposal }() } return Normal @@ -573,9 +573,9 @@ func (consensus *Consensus) selfCommit(payload []byte) error { continue } - if _, err := consensus.Decider.SubmitVote( + if _, err := consensus.Decider.AddNewVote( quorum.Commit, - []bls.SerializedPublicKey{key.Pub.Bytes}, + []*bls_cosi.PublicKeyWrapper{key.Pub}, key.Pri.SignHash(commitPayload), common.BytesToHash(consensus.blockHash[:]), block.NumberU64(), diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 2cada40e4..43de0d5d8 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -72,7 +72,7 @@ func TestConsensusInitialization(t *testing.T) { assert.IsType(t, make(chan uint64), consensus.commitFinishChan) assert.NotNil(t, consensus.commitFinishChan) - assert.IsType(t, make(chan struct{}), consensus.ReadySignal) + assert.IsType(t, make(chan ProposalType), consensus.ReadySignal) assert.NotNil(t, consensus.ReadySignal) assert.IsType(t, make(chan [vdfAndSeedSize]byte), consensus.RndChannel) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index d53291cfd..f97cfb663 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -123,6 +123,9 @@ func (consensus *Consensus) finalCommit() { msgToSend, FBFTMsg := network.Bytes, network.FBFTMsg + consensus.getLogger().Warn(). + Str("bitmap", hex.EncodeToString(FBFTMsg.Payload[:])). + Msg("[finalCommit] BITMAP") commitSigAndBitmap := FBFTMsg.Payload // this may not needed consensus.FBFTLog.AddMessage(FBFTMsg) // find correct block content @@ -151,7 +154,11 @@ func (consensus *Consensus) finalCommit() { // have the full commit signatures for new block // For now, the leader don't need to send immediately as the committed sig will be // included in the next block and sent in next prepared message. - sendImmediately := true + + sendImmediately := false + if !consensus.IsLeader() { + sendImmediately = true + } if err := consensus.msgSender.SendWithRetry( block.NumberU64(), msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ @@ -246,7 +253,7 @@ func (consensus *Consensus) Start( <-startChannel toStart <- struct{}{} consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") - consensus.ReadySignal <- struct{}{} + consensus.ReadySignal <- SyncProposal }() } consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started") @@ -502,6 +509,9 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { network.Bytes, network.FBFTMsg consensus.FBFTLog.AddMessage(FBFTMsg) + consensus.getLogger().Warn(). + Str("bitmap", hex.EncodeToString(FBFTMsg.Payload[:])). + Msg("[finalCommit] BITMAP") blk.SetCurrentCommitSig(FBFTMsg.Payload) if err := consensus.OnConsensusDone(blk); err != nil { @@ -509,6 +519,8 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { return err } + // If I am still the leader + //if consensus.IsLeader() { // if leader success finalize the block, send committed message to validators if err := consensus.msgSender.SendWithRetry( blk.NumberU64(), @@ -528,7 +540,9 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] sending block proposal signal") // TODO: make sure preCommit happens before finalCommit - consensus.ReadySignal <- struct{}{} + consensus.ReadySignal <- AsyncProposal + //} + consensus.getLogger().Warn().Msg("[preCommitAndPropose] FULLY FINISHED") return nil } diff --git a/consensus/leader.go b/consensus/leader.go index fa9da37c5..e5b614c8b 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -202,8 +202,8 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { func (consensus *Consensus) onCommit(msg *msg_pb.Message) { utils.Logger().Info().Msgf("ViewChanging %d %d", consensus.GetCurBlockViewID(), consensus.GetViewChangingID()) - if consensus.GetCurBlockViewID() == 10 { - //return + if consensus.GetCurBlockViewID()%7 == 0 { + return } recvMsg, err := consensus.ParseFBFTMessage(msg) if err != nil { @@ -307,6 +307,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { logger.Info().Msg("[OnCommit] 2/3 Enough commits received") go func() { + // TODO: make it a channel consensus.preCommitAndPropose(blockObj) }() diff --git a/consensus/validator.go b/consensus/validator.go index d9d48bba2..dd805a313 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -153,8 +153,16 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { if !consensus.onPreparedSanityChecks(&blockObj, recvMsg) { return } + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnPrepared] Received OnPrepared message11111111") consensus.mutex.Lock() defer consensus.mutex.Unlock() + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnPrepared] Received OnPrepared message222222") if consensus.BlockVerifier == nil { consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring") @@ -164,9 +172,21 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed") return } + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnPrepared] Received OnPrepared message3333") consensus.FBFTLog.MarkBlockVerified(&blockObj) + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnPrepared] Received OnPrepared message44444") consensus.FBFTLog.AddBlock(&blockObj) + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnPrepared] Received OnPrepared message555555") // add block field blockPayload := make([]byte, len(recvMsg.Block)) copy(blockPayload[:], recvMsg.Block[:]) @@ -247,6 +267,10 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.spinUpStateSync() } + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnCommitted] Received committed message11111111") aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) if err != nil { consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed") @@ -257,6 +281,13 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { return } + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnCommitted] Received committed message222222") // Must have the corresponding block to verify committed message. blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) if blockObj == nil { @@ -267,6 +298,10 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { Msg("[OnCommitted] Failed finding a matching block for committed message") return } + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnCommitted] Received committed message333333") commitPayload := signature.ConstructCommitPayload(consensus.Blockchain, blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64()) if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { @@ -276,11 +311,21 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { return } + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnCommitted] Received committed message444444") consensus.FBFTLog.AddMessage(recvMsg) - consensus.mutex.Lock() - defer consensus.mutex.Unlock() + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnCommitted] Received committed message555555") + consensus.getLogger().Info(). + Uint64("MsgBlockNum", recvMsg.BlockNum). + Uint64("MsgViewID", recvMsg.ViewID). + Msg("[OnCommitted] Received committed message666666") consensus.aggregatedCommitSig = aggSig consensus.commitBitmap = mask diff --git a/consensus/view_change.go b/consensus/view_change.go index 63c8abfde..13f21916d 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -399,7 +399,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { } go func() { - consensus.ReadySignal <- struct{}{} + consensus.ReadySignal <- SyncProposal }() return } diff --git a/internal/utils/singleton.go b/internal/utils/singleton.go index e7773ae2e..ea5ba8f9b 100644 --- a/internal/utils/singleton.go +++ b/internal/utils/singleton.go @@ -49,7 +49,7 @@ func SetLogVerbosity(verbosity log.Lvl) { if glogger != nil { glogger.Verbosity(logVerbosity) } - updateZeroLogLevel(int(verbosity)) + updateZeroLogLevel(int(4)) } // AddLogFile creates a StreamHandler that outputs JSON logs diff --git a/node/node.go b/node/node.go index 2111fa0ee..2574e80d5 100644 --- a/node/node.go +++ b/node/node.go @@ -846,7 +846,7 @@ func (node *Node) Start() error { } for e := range errChan { - utils.SampledLogger().Info(). + utils.Logger().Info(). Interface("item", e.payload). Msgf("[p2p]: issue while handling incoming p2p message: %v", e.err) } diff --git a/node/node_newblock.go b/node/node_newblock.go index c9de062ea..7e61a207c 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/crypto/bls" staking "github.com/harmony-one/harmony/staking/types" @@ -25,7 +27,7 @@ const ( // WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus. // only leader will receive the ready signal -func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) { +func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) { go func() { // Setup stoppedChan defer close(stoppedChan) @@ -41,7 +43,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsC utils.Logger().Debug(). Msg("Consensus new block proposal: STOPPED!") return - case <-readySignal: + case proposalType := <-readySignal: for node.Consensus != nil && node.Consensus.IsLeader() { time.Sleep(SleepPeriod) utils.Logger().Info(). @@ -52,12 +54,16 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsC newCommitSigsChan := make(chan []byte) go func() { + waitTime := 0 * time.Second + if proposalType == consensus.AsyncProposal { + waitTime = 4 * time.Second + } select { case commitSigs := <-commitSigsChan: if len(commitSigs) > bls.BLSSignatureSizeInBytes { newCommitSigsChan <- commitSigs } - case <-time.After(4 * time.Second): + case <-time.After(waitTime): sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64()) if err != nil {