some more logic cleanup

pull/3405/head
Rongjian Lan 4 years ago
parent 8906679ad7
commit 49d7985f14
  1. 7
      api/service/blockproposal/service.go
  2. 15
      consensus/consensus.go
  3. 6
      consensus/consensus_service.go
  4. 2
      consensus/consensus_test.go
  5. 20
      consensus/consensus_v2.go
  6. 5
      consensus/leader.go
  7. 49
      consensus/validator.go
  8. 2
      consensus/view_change.go
  9. 2
      internal/utils/singleton.go
  10. 2
      node/node.go
  11. 12
      node/node_newblock.go

@ -3,6 +3,7 @@ package blockproposal
import ( import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
@ -10,14 +11,14 @@ import (
type Service struct { type Service struct {
stopChan chan struct{} stopChan chan struct{}
stoppedChan chan struct{} stoppedChan chan struct{}
readySignal chan struct{} readySignal chan consensus.ProposalType
commitSigsChan chan []byte commitSigsChan chan []byte
messageChan chan *msg_pb.Message messageChan chan *msg_pb.Message
waitForConsensusReady func(readySignal chan struct{}, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})
} }
// New returns a block proposal service. // New returns a block proposal service.
func New(readySignal chan struct{}, commitSigsChan chan []byte, waitForConsensusReady func(readySignal chan struct{}, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})) *Service { func New(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})) *Service {
return &Service{readySignal: readySignal, commitSigsChan: commitSigsChan, waitForConsensusReady: waitForConsensusReady} return &Service{readySignal: readySignal, commitSigsChan: commitSigsChan, waitForConsensusReady: waitForConsensusReady}
} }

@ -27,6 +27,15 @@ const (
var errLeaderPriKeyNotFound = errors.New("getting leader private key from consensus public keys failed") var errLeaderPriKeyNotFound = errors.New("getting leader private key from consensus public keys failed")
// ProposalType is to indicate the type of signal for new block proposal
type ProposalType byte
// Constant of the top level Message Type exchanged among nodes
const (
SyncProposal ProposalType = iota
AsyncProposal
)
// BlockVerifierFunc is a function used to verify the block // BlockVerifierFunc is a function used to verify the block
type BlockVerifierFunc func(*types.Block) error type BlockVerifierFunc func(*types.Block) error
@ -79,8 +88,8 @@ type Consensus struct {
mutex sync.Mutex mutex sync.Mutex
// ViewChange struct // ViewChange struct
vc *viewChange vc *viewChange
// Signal channel for starting a new consensus process // Signal channel for proposing a new block and start new consensus
ReadySignal chan struct{} ReadySignal chan ProposalType
// Channel to send full commit signatures to finish new block proposal // Channel to send full commit signatures to finish new block proposal
CommitSigChannel chan []byte CommitSigChannel chan []byte
// The post-consensus processing func passed from Node object // The post-consensus processing func passed from Node object
@ -208,7 +217,7 @@ func New(
consensus.syncNotReadyChan = make(chan struct{}) consensus.syncNotReadyChan = make(chan struct{})
consensus.SlashChan = make(chan slash.Record) consensus.SlashChan = make(chan slash.Record)
consensus.commitFinishChan = make(chan uint64) consensus.commitFinishChan = make(chan uint64)
consensus.ReadySignal = make(chan struct{}) consensus.ReadySignal = make(chan ProposalType)
consensus.CommitSigChannel = make(chan []byte) consensus.CommitSigChannel = make(chan []byte)
// channel for receiving newly generated VDF // channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte) consensus.RndChannel = make(chan [vdfAndSeedSize]byte)

@ -451,7 +451,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.getLogger().Info(). consensus.getLogger().Info().
Str("myKey", myPubKeys.SerializeToHexStr()). Str("myKey", myPubKeys.SerializeToHexStr()).
Msg("[UpdateConsensusInformation] I am the New Leader") Msg("[UpdateConsensusInformation] I am the New Leader")
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- SyncProposal
}() }()
} }
return Normal return Normal
@ -573,9 +573,9 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
continue continue
} }
if _, err := consensus.Decider.SubmitVote( if _, err := consensus.Decider.AddNewVote(
quorum.Commit, quorum.Commit,
[]bls.SerializedPublicKey{key.Pub.Bytes}, []*bls_cosi.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload), key.Pri.SignHash(commitPayload),
common.BytesToHash(consensus.blockHash[:]), common.BytesToHash(consensus.blockHash[:]),
block.NumberU64(), block.NumberU64(),

@ -72,7 +72,7 @@ func TestConsensusInitialization(t *testing.T) {
assert.IsType(t, make(chan uint64), consensus.commitFinishChan) assert.IsType(t, make(chan uint64), consensus.commitFinishChan)
assert.NotNil(t, consensus.commitFinishChan) assert.NotNil(t, consensus.commitFinishChan)
assert.IsType(t, make(chan struct{}), consensus.ReadySignal) assert.IsType(t, make(chan ProposalType), consensus.ReadySignal)
assert.NotNil(t, consensus.ReadySignal) assert.NotNil(t, consensus.ReadySignal)
assert.IsType(t, make(chan [vdfAndSeedSize]byte), consensus.RndChannel) assert.IsType(t, make(chan [vdfAndSeedSize]byte), consensus.RndChannel)

@ -123,6 +123,9 @@ func (consensus *Consensus) finalCommit() {
msgToSend, FBFTMsg := msgToSend, FBFTMsg :=
network.Bytes, network.Bytes,
network.FBFTMsg network.FBFTMsg
consensus.getLogger().Warn().
Str("bitmap", hex.EncodeToString(FBFTMsg.Payload[:])).
Msg("[finalCommit] BITMAP")
commitSigAndBitmap := FBFTMsg.Payload // this may not needed commitSigAndBitmap := FBFTMsg.Payload // this may not needed
consensus.FBFTLog.AddMessage(FBFTMsg) consensus.FBFTLog.AddMessage(FBFTMsg)
// find correct block content // find correct block content
@ -151,7 +154,11 @@ func (consensus *Consensus) finalCommit() {
// have the full commit signatures for new block // have the full commit signatures for new block
// For now, the leader don't need to send immediately as the committed sig will be // For now, the leader don't need to send immediately as the committed sig will be
// included in the next block and sent in next prepared message. // included in the next block and sent in next prepared message.
sendImmediately := true
sendImmediately := false
if !consensus.IsLeader() {
sendImmediately = true
}
if err := consensus.msgSender.SendWithRetry( if err := consensus.msgSender.SendWithRetry(
block.NumberU64(), block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{ msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
@ -246,7 +253,7 @@ func (consensus *Consensus) Start(
<-startChannel <-startChannel
toStart <- struct{}{} toStart <- struct{}{}
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal")
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- SyncProposal
}() }()
} }
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started") consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started")
@ -502,6 +509,9 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
network.Bytes, network.Bytes,
network.FBFTMsg network.FBFTMsg
consensus.FBFTLog.AddMessage(FBFTMsg) consensus.FBFTLog.AddMessage(FBFTMsg)
consensus.getLogger().Warn().
Str("bitmap", hex.EncodeToString(FBFTMsg.Payload[:])).
Msg("[finalCommit] BITMAP")
blk.SetCurrentCommitSig(FBFTMsg.Payload) blk.SetCurrentCommitSig(FBFTMsg.Payload)
if err := consensus.OnConsensusDone(blk); err != nil { if err := consensus.OnConsensusDone(blk); err != nil {
@ -509,6 +519,8 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
return err return err
} }
// If I am still the leader
//if consensus.IsLeader() {
// if leader success finalize the block, send committed message to validators // if leader success finalize the block, send committed message to validators
if err := consensus.msgSender.SendWithRetry( if err := consensus.msgSender.SendWithRetry(
blk.NumberU64(), blk.NumberU64(),
@ -528,7 +540,9 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] sending block proposal signal") consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] sending block proposal signal")
// TODO: make sure preCommit happens before finalCommit // TODO: make sure preCommit happens before finalCommit
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- AsyncProposal
//}
consensus.getLogger().Warn().Msg("[preCommitAndPropose] FULLY FINISHED")
return nil return nil
} }

@ -202,8 +202,8 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
func (consensus *Consensus) onCommit(msg *msg_pb.Message) { func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
utils.Logger().Info().Msgf("ViewChanging %d %d", consensus.GetCurBlockViewID(), consensus.GetViewChangingID()) utils.Logger().Info().Msgf("ViewChanging %d %d", consensus.GetCurBlockViewID(), consensus.GetViewChangingID())
if consensus.GetCurBlockViewID() == 10 { if consensus.GetCurBlockViewID()%7 == 0 {
//return return
} }
recvMsg, err := consensus.ParseFBFTMessage(msg) recvMsg, err := consensus.ParseFBFTMessage(msg)
if err != nil { if err != nil {
@ -307,6 +307,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
logger.Info().Msg("[OnCommit] 2/3 Enough commits received") logger.Info().Msg("[OnCommit] 2/3 Enough commits received")
go func() { go func() {
// TODO: make it a channel
consensus.preCommitAndPropose(blockObj) consensus.preCommitAndPropose(blockObj)
}() }()

@ -153,8 +153,16 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
if !consensus.onPreparedSanityChecks(&blockObj, recvMsg) { if !consensus.onPreparedSanityChecks(&blockObj, recvMsg) {
return return
} }
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnPrepared] Received OnPrepared message11111111")
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnPrepared] Received OnPrepared message222222")
if consensus.BlockVerifier == nil { if consensus.BlockVerifier == nil {
consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring") consensus.getLogger().Debug().Msg("[onPrepared] consensus received message before init. Ignoring")
@ -164,9 +172,21 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed") consensus.getLogger().Error().Err(err).Msg("[OnPrepared] Block verification failed")
return return
} }
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnPrepared] Received OnPrepared message3333")
consensus.FBFTLog.MarkBlockVerified(&blockObj) consensus.FBFTLog.MarkBlockVerified(&blockObj)
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnPrepared] Received OnPrepared message44444")
consensus.FBFTLog.AddBlock(&blockObj) consensus.FBFTLog.AddBlock(&blockObj)
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnPrepared] Received OnPrepared message555555")
// add block field // add block field
blockPayload := make([]byte, len(recvMsg.Block)) blockPayload := make([]byte, len(recvMsg.Block))
copy(blockPayload[:], recvMsg.Block[:]) copy(blockPayload[:], recvMsg.Block[:])
@ -247,6 +267,10 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
consensus.spinUpStateSync() consensus.spinUpStateSync()
} }
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message11111111")
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
if err != nil { if err != nil {
consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed") consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed")
@ -257,6 +281,13 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
return return
} }
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message222222")
// Must have the corresponding block to verify committed message. // Must have the corresponding block to verify committed message.
blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash)
if blockObj == nil { if blockObj == nil {
@ -267,6 +298,10 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
Msg("[OnCommitted] Failed finding a matching block for committed message") Msg("[OnCommitted] Failed finding a matching block for committed message")
return return
} }
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message333333")
commitPayload := signature.ConstructCommitPayload(consensus.Blockchain, commitPayload := signature.ConstructCommitPayload(consensus.Blockchain,
blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64()) blockObj.Epoch(), blockObj.Hash(), blockObj.NumberU64(), blockObj.Header().ViewID().Uint64())
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) { if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
@ -276,11 +311,21 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
return return
} }
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message444444")
consensus.FBFTLog.AddMessage(recvMsg) consensus.FBFTLog.AddMessage(recvMsg)
consensus.mutex.Lock() consensus.getLogger().Info().
defer consensus.mutex.Unlock() Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message555555")
consensus.getLogger().Info().
Uint64("MsgBlockNum", recvMsg.BlockNum).
Uint64("MsgViewID", recvMsg.ViewID).
Msg("[OnCommitted] Received committed message666666")
consensus.aggregatedCommitSig = aggSig consensus.aggregatedCommitSig = aggSig
consensus.commitBitmap = mask consensus.commitBitmap = mask

@ -399,7 +399,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
} }
go func() { go func() {
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- SyncProposal
}() }()
return return
} }

@ -49,7 +49,7 @@ func SetLogVerbosity(verbosity log.Lvl) {
if glogger != nil { if glogger != nil {
glogger.Verbosity(logVerbosity) glogger.Verbosity(logVerbosity)
} }
updateZeroLogLevel(int(verbosity)) updateZeroLogLevel(int(4))
} }
// AddLogFile creates a StreamHandler that outputs JSON logs // AddLogFile creates a StreamHandler that outputs JSON logs

@ -846,7 +846,7 @@ func (node *Node) Start() error {
} }
for e := range errChan { for e := range errChan {
utils.SampledLogger().Info(). utils.Logger().Info().
Interface("item", e.payload). Interface("item", e.payload).
Msgf("[p2p]: issue while handling incoming p2p message: %v", e.err) Msgf("[p2p]: issue while handling incoming p2p message: %v", e.err)
} }

@ -6,6 +6,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/bls"
staking "github.com/harmony-one/harmony/staking/types" staking "github.com/harmony-one/harmony/staking/types"
@ -25,7 +27,7 @@ const (
// WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus. // WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus.
// only leader will receive the ready signal // only leader will receive the ready signal
func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) { func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) {
go func() { go func() {
// Setup stoppedChan // Setup stoppedChan
defer close(stoppedChan) defer close(stoppedChan)
@ -41,7 +43,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsC
utils.Logger().Debug(). utils.Logger().Debug().
Msg("Consensus new block proposal: STOPPED!") Msg("Consensus new block proposal: STOPPED!")
return return
case <-readySignal: case proposalType := <-readySignal:
for node.Consensus != nil && node.Consensus.IsLeader() { for node.Consensus != nil && node.Consensus.IsLeader() {
time.Sleep(SleepPeriod) time.Sleep(SleepPeriod)
utils.Logger().Info(). utils.Logger().Info().
@ -52,12 +54,16 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, commitSigsC
newCommitSigsChan := make(chan []byte) newCommitSigsChan := make(chan []byte)
go func() { go func() {
waitTime := 0 * time.Second
if proposalType == consensus.AsyncProposal {
waitTime = 4 * time.Second
}
select { select {
case commitSigs := <-commitSigsChan: case commitSigs := <-commitSigsChan:
if len(commitSigs) > bls.BLSSignatureSizeInBytes { if len(commitSigs) > bls.BLSSignatureSizeInBytes {
newCommitSigsChan <- commitSigs newCommitSigsChan <- commitSigs
} }
case <-time.After(4 * time.Second): case <-time.After(waitTime):
sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64()) sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64())
if err != nil { if err != nil {

Loading…
Cancel
Save