m1 type message view change path works

pull/952/head
Chao Ma 5 years ago committed by chaosma
parent 5e3d4c110d
commit 26efc233d7
  1. 2
      consensus/consensus_leader_test.go
  2. 3
      consensus/consensus_service.go
  3. 12
      consensus/consensus_v2.go
  4. 8
      consensus/pbft_log.go
  5. 4
      consensus/pbft_log_test.go
  6. 28
      consensus/view_change.go
  7. 2
      node/node_newblock.go
  8. 2
      test/deploy.sh

@ -221,6 +221,6 @@ func TestProcessMessageLeaderCommit(test *testing.T) {
consensusLeader.ProcessMessageLeader(msg) consensusLeader.ProcessMessageLeader(msg)
} }
assert.Equal(test, Finished, consensusLeader.state) //assert.Equal(test, Finished, consensusLeader.state)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }

@ -344,10 +344,8 @@ func (consensus *Consensus) GetViewIDSigsArray() []*bls.Sign {
// ResetState resets the state of the consensus // ResetState resets the state of the consensus
func (consensus *Consensus) ResetState() { func (consensus *Consensus) ResetState() {
consensus.round++ consensus.round++
consensus.state = Finished
consensus.phase = Announce consensus.phase = Announce
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
consensus.prepareSigs = map[common.Address]*bls.Sign{} consensus.prepareSigs = map[common.Address]*bls.Sign{}
consensus.commitSigs = map[common.Address]*bls.Sign{} consensus.commitSigs = map[common.Address]*bls.Sign{}
@ -355,7 +353,6 @@ func (consensus *Consensus) ResetState() {
commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.LeaderPubKey) commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.LeaderPubKey)
consensus.prepareBitmap = prepareBitmap consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap consensus.commitBitmap = commitBitmap
consensus.aggregatedPrepareSig = nil consensus.aggregatedPrepareSig = nil
consensus.aggregatedCommitSig = nil consensus.aggregatedCommitSig = nil
} }

@ -13,7 +13,6 @@ import (
msg_pb "github.com/harmony-one/harmony/api/proto/message" msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/api/service/explorer" "github.com/harmony-one/harmony/api/service/explorer"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
@ -200,7 +199,7 @@ func (consensus *Consensus) tryPrepare(blockHash common.Hash) {
return return
} }
if consensus.phase != Announce || consensus.blockNum != block.NumberU64() || !consensus.pbftLog.HasMatchingAnnounce(consensus.blockNum, consensus.viewID, hash) { if consensus.phase != Announce || consensus.blockNum != block.NumberU64() || !consensus.pbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, hash) {
return return
} }
@ -242,7 +241,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
return return
} }
if !consensus.pbftLog.HasMatchingAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) { if !consensus.pbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) {
utils.GetLogInstance().Debug("onPrepare no matching announce message", "blockNum", consensus.blockNum, "viewID", consensus.viewID, "blockHash", recvMsg.BlockHash) utils.GetLogInstance().Debug("onPrepare no matching announce message", "blockNum", consensus.blockNum, "viewID", consensus.viewID, "blockHash", recvMsg.BlockHash)
return return
} }
@ -416,10 +415,12 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
} }
if recvMsg.ViewID != consensus.viewID || recvMsg.BlockNum != consensus.blockNum || consensus.phase != Commit { if recvMsg.ViewID != consensus.viewID || recvMsg.BlockNum != consensus.blockNum || consensus.phase != Commit {
utils.GetLogger().Debug("not match", "myViewID", consensus.viewID, "viewID", recvMsg.ViewID, "myBlock", consensus.blockNum, "block", recvMsg.BlockNum, "myPhase", consensus.phase, "phase", Commit)
return return
} }
if !consensus.pbftLog.HasMatchingAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) { if !consensus.pbftLog.HasMatchingAnnounce(consensus.blockNum, recvMsg.BlockHash) {
utils.GetLogger().Debug("cannot find matching blockhash")
return return
} }
@ -456,8 +457,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
utils.GetLogInstance().Debug("Failed to deserialize bls signature", "validatorAddress", validatorAddress) utils.GetLogInstance().Debug("Failed to deserialize bls signature", "validatorAddress", validatorAddress)
return return
} }
aggSig := bls_cosi.AggregateSig(consensus.GetPrepareSigsArray()) if !sign.VerifyHash(validatorPubKey, append(consensus.aggregatedPrepareSig.Serialize(), consensus.prepareBitmap.Bitmap...)) {
if !sign.VerifyHash(validatorPubKey, append(aggSig.Serialize(), consensus.prepareBitmap.Bitmap...)) {
utils.GetLogInstance().Error("Received invalid BLS signature", "validatorAddress", validatorAddress) utils.GetLogInstance().Error("Received invalid BLS signature", "validatorAddress", validatorAddress)
return return
} }

@ -153,7 +153,13 @@ func (log *PbftLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum ui
} }
// HasMatchingAnnounce returns whether the log contains announce type message with given blockNum, viewID and blockHash // HasMatchingAnnounce returns whether the log contains announce type message with given blockNum, viewID and blockHash
func (log *PbftLog) HasMatchingAnnounce(blockNum uint64, viewID uint32, blockHash common.Hash) bool { func (log *PbftLog) HasMatchingAnnounce(blockNum uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqHash(msg_pb.MessageType_ANNOUNCE, blockNum, blockHash)
return len(found) == 1
}
// HasMatchingViewAnnounce returns whether the log contains announce type message with given blockNum, viewID and blockHash
func (log *PbftLog) HasMatchingViewAnnounce(blockNum uint64, viewID uint32, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_ANNOUNCE, blockNum, viewID, blockHash) found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_ANNOUNCE, blockNum, viewID, blockHash)
return len(found) == 1 return len(found) == 1
} }

@ -71,12 +71,12 @@ func TestHasMatchingAnnounce(t *testing.T) {
pbftMsg := PbftMessage{MessageType: msg_pb.MessageType_ANNOUNCE, BlockNum: 2, ViewID: 3, BlockHash: [32]byte{01, 02}} pbftMsg := PbftMessage{MessageType: msg_pb.MessageType_ANNOUNCE, BlockNum: 2, ViewID: 3, BlockHash: [32]byte{01, 02}}
log := NewPbftLog() log := NewPbftLog()
log.AddMessage(&pbftMsg) log.AddMessage(&pbftMsg)
found := log.HasMatchingAnnounce(2, 3, [32]byte{01, 02}) found := log.HasMatchingViewAnnounce(2, 3, [32]byte{01, 02})
if !found { if !found {
t.Error("found should be true") t.Error("found should be true")
} }
notFound := log.HasMatchingAnnounce(2, 3, [32]byte{02, 02}) notFound := log.HasMatchingViewAnnounce(2, 3, [32]byte{02, 02})
if notFound { if notFound {
t.Error("notFound should be false") t.Error("notFound should be false")
} }

@ -197,9 +197,6 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.vcLock.Lock() consensus.vcLock.Lock()
defer consensus.vcLock.Unlock() defer consensus.vcLock.Unlock()
consensus.mode.SetMode(ViewChanging)
consensus.mode.SetViewID(recvMsg.ViewID)
// add self m1 or m2 type message signature and bitmap // add self m1 or m2 type message signature and bitmap
_, ok1 := consensus.nilSigs[consensus.SelfAddress] _, ok1 := consensus.nilSigs[consensus.SelfAddress]
_, ok2 := consensus.bhpSigs[consensus.SelfAddress] _, ok2 := consensus.bhpSigs[consensus.SelfAddress]
@ -227,7 +224,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.viewIDBitmap.SetKey(consensus.PubKey, true) consensus.viewIDBitmap.SetKey(consensus.PubKey, true)
} }
if (len(consensus.bhpSigs) + len(consensus.nilSigs)) >= consensus.Quorum() { if len(consensus.viewIDSigs) >= consensus.Quorum() {
return return
} }
@ -296,14 +293,12 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
} }
consensus.viewIDSigs[validatorAddress] = recvMsg.ViewidSig consensus.viewIDSigs[validatorAddress] = recvMsg.ViewidSig
consensus.viewIDBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed. consensus.viewIDBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed.
utils.GetLogger().Debug("hehe1")
if len(consensus.viewIDSigs) >= consensus.Quorum() { if len(consensus.viewIDSigs) >= consensus.Quorum() {
utils.GetLogger().Debug("hehe1")
consensus.mode.SetMode(Normal) consensus.mode.SetMode(Normal)
consensus.LeaderPubKey = consensus.PubKey consensus.LeaderPubKey = consensus.PubKey
consensus.ResetState()
if len(consensus.m1Payload) == 0 { if len(consensus.m1Payload) == 0 {
consensus.phase = Announce
go func() { go func() {
consensus.ReadySignal <- struct{}{} consensus.ReadySignal <- struct{}{}
}() }()
@ -322,18 +317,18 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.commitSigs[consensus.SelfAddress] = consensus.priKey.SignHash(consensus.m1Payload[32:]) consensus.commitSigs[consensus.SelfAddress] = consensus.priKey.SignHash(consensus.m1Payload[32:])
} }
consensus.mode.SetViewID(recvMsg.ViewID)
msgToSend := consensus.constructNewViewMessage() msgToSend := consensus.constructNewViewMessage()
utils.GetLogInstance().Warn("onViewChange", "sent newview message", len(msgToSend)) utils.GetLogInstance().Warn("onViewChange", "sent newview message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.viewID = consensus.mode.GetViewID() consensus.viewID = recvMsg.ViewID
consensus.ResetViewChangeState() consensus.ResetViewChangeState()
consensus.ResetState()
consensus.consensusTimeout[timeoutViewChange].Stop() consensus.consensusTimeout[timeoutViewChange].Stop()
consensus.consensusTimeout[timeoutConsensus].Start()
} }
utils.GetLogInstance().Debug("onViewChange hehe viewIDSigs", "numSigs", len(consensus.viewIDSigs), "needed", consensus.Quorum()) utils.GetLogInstance().Debug("onViewChange", "numSigs", len(consensus.viewIDSigs), "needed", consensus.Quorum())
utils.GetLogInstance().Debug("onViewChange hehe sumSigs", "numSigs", len(consensus.bhpSigs)+len(consensus.nilSigs), "needed", consensus.Quorum())
} }
// TODO: move to consensus_leader.go later // TODO: move to consensus_leader.go later
@ -370,7 +365,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
binary.LittleEndian.PutUint32(viewIDHash, recvMsg.ViewID) binary.LittleEndian.PutUint32(viewIDHash, recvMsg.ViewID)
// TODO check total number of sigs >= 2f+1 // TODO check total number of sigs >= 2f+1
if !m3Sig.VerifyHash(m3Mask.AggregatePublic, viewIDHash) { if !m3Sig.VerifyHash(m3Mask.AggregatePublic, viewIDHash) {
utils.GetLogInstance().Warn("onNewView unable to verify aggregated signature of m3 payload") utils.GetLogInstance().Warn("onNewView unable to verify aggregated signature of m3 payload", "m3Sig", m3Sig.GetHexString()[:10], "m3Mask", m3Mask.Bitmap, "viewID", recvMsg.ViewID)
return return
} }
if recvMsg.M2AggSig != nil { if recvMsg.M2AggSig != nil {
@ -413,21 +408,20 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
return return
} }
consensus.viewID = consensus.mode.GetViewID()
// Construct and send the commit message // Construct and send the commit message
multiSigAndBitmap := append(aggSig.Serialize(), mask.Bitmap...) multiSigAndBitmap := append(aggSig.Serialize(), mask.Bitmap...)
msgToSend := consensus.constructCommitMessage(multiSigAndBitmap) msgToSend := consensus.constructCommitMessage(multiSigAndBitmap)
utils.GetLogInstance().Info("onNewView === commit", "sent commit message", len(msgToSend)) utils.GetLogInstance().Info("onNewView === commit", "sent commit message", len(msgToSend), "viewID", consensus.viewID)
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.phase = Commit consensus.phase = Commit
} else { } else {
consensus.phase = Announce consensus.ResetState()
utils.GetLogInstance().Info("onNewView === announce") utils.GetLogInstance().Info("onNewView === announce")
} }
consensus.viewID = consensus.mode.GetViewID()
consensus.LeaderPubKey = senderKey consensus.LeaderPubKey = senderKey
consensus.viewID = consensus.mode.GetViewID()
consensus.ResetViewChangeState() consensus.ResetViewChangeState()
consensus.ResetState()
consensus.consensusTimeout[timeoutConsensus].Start() consensus.consensusTimeout[timeoutConsensus].Start()
consensus.consensusTimeout[timeoutViewChange].Stop() consensus.consensusTimeout[timeoutViewChange].Stop()
} }

@ -107,7 +107,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
defer close(stoppedChan) defer close(stoppedChan)
utils.GetLogInstance().Debug("Waiting for Consensus ready") utils.GetLogInstance().Debug("Waiting for Consensus ready")
time.Sleep(45 * time.Second) // Wait for other nodes to be ready (test-only) time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only)
firstTime := true firstTime := true
for { for {

@ -76,7 +76,7 @@ DEFAULT_DURATION_SYNC=200
DB=false DB=false
TXGEN=true TXGEN=true
DURATION= DURATION=
MIN=5 MIN=3
SHARDS=2 SHARDS=2
SYNC=true SYNC=true
DRYRUN= DRYRUN=

Loading…
Cancel
Save