diff --git a/consensus/consensus.go b/consensus/consensus.go index ab9bb550c..067f64428 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -47,7 +47,7 @@ type Consensus struct { // Leader leader p2p.Peer // Public keys of the committee including leader and validators - publicKeys []kyber.Point + PublicKeys []kyber.Point // private/public keys of current node priKey kyber.Scalar @@ -137,7 +137,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * if err != nil { panic("Failed to create final mask") } - consensus.publicKeys = allPublicKeys + consensus.PublicKeys = allPublicKeys consensus.bitmap = mask consensus.finalBitmap = finalMask @@ -208,8 +208,8 @@ func (consensus *Consensus) ResetState() { consensus.responses = &map[uint16]kyber.Scalar{} consensus.finalResponses = &map[uint16]kyber.Scalar{} - mask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) - finalMask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) + mask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.PublicKeys, consensus.leader.PubKey) + finalMask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.PublicKeys, consensus.leader.PubKey) consensus.bitmap = mask consensus.finalBitmap = finalMask consensus.bitmap.SetMask([]byte{}) @@ -236,6 +236,7 @@ func (consensus *Consensus) String() string { // and add the public keys func (consensus *Consensus) AddPeers(peers []p2p.Peer) int { count := 0 + for _, peer := range peers { _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer)) if !ok { @@ -243,41 +244,28 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int { peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId()) } consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) - consensus.publicKeys = append(consensus.publicKeys, peer.PubKey) - count++ - } - } - if count > 0 { - // regenerate bitmaps - mask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) - if err != nil { - panic("Failed to create mask") - } - finalMask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) - if err != nil { - panic("Failed to create final mask") + consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey) } - consensus.bitmap = mask - consensus.finalBitmap = finalMask + count++ } return count } -// RemovePeers will remove the peers from the validator list and publicKeys +// RemovePeers will remove the peers from the validator list and PublicKeys // It will be called when leader/node lost connection to peers func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int { // TODO (lc) we need to have a corresponding RemovePeers function return 0 } -// DebugPrintPublicKeys print all the publicKeys in string format in Consensus +// DebugPrintPublicKeys print all the PublicKeys in string format in Consensus func (consensus *Consensus) DebugPrintPublicKeys() { - for _, k := range consensus.publicKeys { + for _, k := range consensus.PublicKeys { str := fmt.Sprintf("%s", k) consensus.Log.Debug("pk:", "string", str) } - consensus.Log.Debug("PublicKeys:", "#", len(consensus.publicKeys)) + consensus.Log.Debug("PublicKeys:", "#", len(consensus.PublicKeys)) } // DebugPrintValidators print all validator ip/port/key in string format in Consensus diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 9064afb14..5c009f079 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -29,13 +29,13 @@ var ( // WaitForNewBlock waits for the next new block to run consensus on func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) { consensus.Log.Debug("Waiting for block", "consensus", consensus) + backoff := p2p.NewExpBackoff(500*time.Millisecond, 30*time.Second, 2.0) for { // keep waiting for new blocks newBlock := <-blockChannel if !consensus.HasEnoughValidators() { - consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.publicKeys)) - time.Sleep(500 * time.Millisecond) - continue + consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys)) + backoff.Sleep() } // TODO: think about potential race condition @@ -43,6 +43,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime) for consensus.state == Finished { // time.Sleep(500 * time.Millisecond) + consensus.ResetState() consensus.startConsensus(&newBlock) break } @@ -204,7 +205,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta // proceed only when the message is not received before _, ok = (*commitments)[validatorID] shouldProcess := !ok - if len((*commitments)) >= ((len(consensus.publicKeys)*2)/3 + 1) { + if len((*commitments)) >= ((len(consensus.PublicKeys)*2)/3 + 1) { shouldProcess = false } if shouldProcess { @@ -221,7 +222,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta return } - if len((*commitments)) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state < targetState { + if len((*commitments)) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state < targetState { consensus.Log.Debug("Enough commitments received with signatures", "num", len(*commitments), "state", consensus.state) // Broadcast challenge @@ -341,7 +342,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S _, ok = (*responses)[validatorID] shouldProcess = shouldProcess && !ok - if len((*responses)) >= ((len(consensus.publicKeys)*2)/3 + 1) { + if len((*responses)) >= ((len(consensus.PublicKeys)*2)/3 + 1) { shouldProcess = false } @@ -366,8 +367,8 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S return } - if len(*responses) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state != targetState { - if len(*responses) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state != targetState { + if len(*responses) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state != targetState { + if len(*responses) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state != targetState { consensus.Log.Debug("Enough responses received with signatures", "num", len(*responses), "state", consensus.state) // Aggregate responses responseScalars := []kyber.Scalar{} @@ -489,7 +490,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) { "key": consensus.pubKey.String(), "tps": tps, "txCount": numOfTxs, - "nodeCount": len(consensus.publicKeys) + 1, + "nodeCount": len(consensus.PublicKeys) + 1, "latestBlockHash": hex.EncodeToString(consensus.blockHash[:]), "latestTxHashes": txHashes, "blockLatency": int(timeElapsed / time.Millisecond), @@ -498,7 +499,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) { } func (consensus *Consensus) HasEnoughValidators() bool { - if len(consensus.publicKeys) < consensus.MinPeers { + if len(consensus.PublicKeys) < consensus.MinPeers { return false } return true diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 9b9020dcb..fa25af968 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -338,7 +338,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) { } // Verify collective signature - err := crypto.Verify(crypto.Ed25519Curve, consensus.publicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.publicKeys)/3)+1)) + err := crypto.Verify(crypto.Ed25519Curve, consensus.PublicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.PublicKeys)/3)+1)) if err != nil { consensus.Log.Warn("Failed to verify the collective sig message", "consensusID", consensusID, "err", err) return diff --git a/node/node.go b/node/node.go index 224eb53c8..fd256a319 100644 --- a/node/node.go +++ b/node/node.go @@ -316,8 +316,7 @@ func (node *Node) AddPeers(peers []p2p.Peer) int { } if count > 0 { - c := node.Consensus.AddPeers(peers) - node.log.Info("Node.AddPeers", "#", c) + node.Consensus.AddPeers(peers) } return count } diff --git a/node/node_handler.go b/node/node_handler.go index e5a447b8b..164ceec14 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -5,16 +5,17 @@ import ( "bytes" "encoding/gob" "fmt" - "github.com/ethereum/go-ethereum/rlp" - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/crypto/pki" "net" "os" "strconv" "time" + "github.com/dedis/kyber" + "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/blockchain" + "github.com/harmony-one/harmony/core/types" hmy_crypto "github.com/harmony-one/harmony/crypto" + "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/proto" "github.com/harmony-one/harmony/proto/client" @@ -586,7 +587,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int { // Send a Pong message back peers := node.Consensus.GetValidatorPeers() - pong := proto_node.NewPongMessage(peers) + pong := proto_node.NewPongMessage(peers, node.Consensus.PublicKeys) buffer := pong.ConstructPongMessage() for _, p := range peers { @@ -620,7 +621,26 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { continue } peers = append(peers, *peer) + + } + + count := node.AddPeers(peers) + + // Reset Validator PublicKeys every time we receive PONG message from Leader + // The PublicKeys has to be idential across the shard on every node + // TODO (lc): we need to handle RemovePeer situation + node.Consensus.PublicKeys = make([]kyber.Point, 0) + + // Create the the PubKey from the []byte sent from leader + for _, k := range pong.PubKeys { + key := hmy_crypto.Ed25519Curve.Point() + err = key.UnmarshalBinary(k[:]) + if err != nil { + node.log.Error("UnmarshalBinary Failed PubKeys", "error", err) + continue + } + node.Consensus.PublicKeys = append(node.Consensus.PublicKeys, key) } - return node.AddPeers(peers) + return count } diff --git a/node/node_test.go b/node/node_test.go index e2e2a0663..175a0b1b8 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -130,7 +130,7 @@ func sendPongMessage(leader p2p.Peer) { PubKey: pubKey2, } - pong1 := proto_node.NewPongMessage([]p2p.Peer{p1, p2}) + pong1 := proto_node.NewPongMessage([]p2p.Peer{p1, p2}, nil) buf1 := pong1.ConstructPongMessage() fmt.Println("waiting for 10 seconds ...") diff --git a/proto/node/pingpong.go b/proto/node/pingpong.go index ab2d8f4ad..18e71cf57 100644 --- a/proto/node/pingpong.go +++ b/proto/node/pingpong.go @@ -16,6 +16,7 @@ import ( "fmt" "log" + "github.com/dedis/kyber" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/proto" ) @@ -38,6 +39,7 @@ type PingMessageType struct { type PongMessageType struct { Version uint16 // version of the protocol Peers []nodeInfo + PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders } func (p PingMessageType) String() string { @@ -45,10 +47,7 @@ func (p PingMessageType) String() string { } func (p PongMessageType) String() string { - str := fmt.Sprintf("pong:%v=>length:%v\n", p.Version, len(p.Peers)) - for _, p := range p.Peers { - str = fmt.Sprintf("%v%v:%v:%v/%v\n", str, p.IP, p.Port, p.ValidatorID, p.PubKey) - } + str := fmt.Sprintf("pong:%v=>length:%v, keys:%v\n", p.Version, len(p.Peers), len(p.PubKeys)) return str } @@ -63,15 +62,16 @@ func NewPingMessage(peer p2p.Peer) *PingMessageType { ping.Node.PubKey, err = peer.PubKey.MarshalBinary() if err != nil { - fmt.Printf("Error Marshall PubKey: %v", err) + fmt.Printf("Error Marshal PubKey: %v", err) return nil } return ping } -func NewPongMessage(peers []p2p.Peer) *PongMessageType { +func NewPongMessage(peers []p2p.Peer, pubKeys []kyber.Point) *PongMessageType { pong := new(PongMessageType) + pong.PubKeys = make([][]byte, 0) pong.Version = PROTOCOL_VERSION pong.Peers = make([]nodeInfo, 0) @@ -84,12 +84,22 @@ func NewPongMessage(peers []p2p.Peer) *PongMessageType { n.ValidatorID = p.ValidatorID n.PubKey, err = p.PubKey.MarshalBinary() if err != nil { - fmt.Printf("Error Marshall PubKey: %v", err) + fmt.Printf("Error Marshal PubKey: %v", err) continue } pong.Peers = append(pong.Peers, n) } + for _, p := range pubKeys { + key, err := p.MarshalBinary() + if err != nil { + fmt.Printf("Error Marshal PublicKeys: %v", err) + continue + } + + pong.PubKeys = append(pong.PubKeys, key) + } + return pong } @@ -111,6 +121,8 @@ func GetPingMessage(payload []byte) (*PingMessageType, error) { // Deserialize Pong Message func GetPongMessage(payload []byte) (*PongMessageType, error) { pong := new(PongMessageType) + pong.Peers = make([]nodeInfo, 0) + pong.PubKeys = make([][]byte, 0) r := bytes.NewBuffer(payload) decoder := gob.NewDecoder(r) diff --git a/proto/node/pingpong_test.go b/proto/node/pingpong_test.go index c44ce844a..718f5d96b 100644 --- a/proto/node/pingpong_test.go +++ b/proto/node/pingpong_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/dedis/kyber" "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/p2p" @@ -43,6 +44,8 @@ var ( } e2 = "pong:1=>length:2" + pubKeys = []kyber.Point{pubKey1, pubKey2} + buf1 []byte buf2 []byte ) @@ -57,7 +60,7 @@ func TestString(test *testing.T) { fmt.Println(r1) } - pong1 := NewPongMessage(p2) + pong1 := NewPongMessage(p2, pubKeys) r2 := fmt.Sprintf("%v", *pong1) if !strings.HasPrefix(r2, e2) { @@ -72,7 +75,7 @@ func TestSerialize(test *testing.T) { buf1 = ping1.ConstructPingMessage() fmt.Printf("buf ping: %v\n", buf1) - pong1 := NewPongMessage(p2) + pong1 := NewPongMessage(p2, pubKeys) buf2 = pong1.ConstructPongMessage() fmt.Printf("buf pong: %v\n", buf2) }