Merge pull request #103 from harmony-one/lc4pr

[HAR-5] populate the public keys to all nodes
pull/104/head
Leo Chen 6 years ago committed by GitHub
commit 73e4bdf2d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      consensus/consensus.go
  2. 21
      consensus/consensus_leader.go
  3. 2
      consensus/consensus_validator.go
  4. 3
      node/node.go
  5. 30
      node/node_handler.go
  6. 2
      node/node_test.go
  7. 26
      proto/node/pingpong.go
  8. 7
      proto/node/pingpong_test.go

@ -47,7 +47,7 @@ type Consensus struct {
// Leader
leader p2p.Peer
// Public keys of the committee including leader and validators
publicKeys []kyber.Point
PublicKeys []kyber.Point
// private/public keys of current node
priKey kyber.Scalar
@ -137,7 +137,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *
if err != nil {
panic("Failed to create final mask")
}
consensus.publicKeys = allPublicKeys
consensus.PublicKeys = allPublicKeys
consensus.bitmap = mask
consensus.finalBitmap = finalMask
@ -208,8 +208,8 @@ func (consensus *Consensus) ResetState() {
consensus.responses = &map[uint16]kyber.Scalar{}
consensus.finalResponses = &map[uint16]kyber.Scalar{}
mask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey)
finalMask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey)
mask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.PublicKeys, consensus.leader.PubKey)
finalMask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.PublicKeys, consensus.leader.PubKey)
consensus.bitmap = mask
consensus.finalBitmap = finalMask
consensus.bitmap.SetMask([]byte{})
@ -236,6 +236,7 @@ func (consensus *Consensus) String() string {
// and add the public keys
func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
count := 0
for _, peer := range peers {
_, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer))
if !ok {
@ -243,41 +244,28 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId())
}
consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer)
consensus.publicKeys = append(consensus.publicKeys, peer.PubKey)
count++
}
}
if count > 0 {
// regenerate bitmaps
mask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey)
if err != nil {
panic("Failed to create mask")
consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
}
finalMask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey)
if err != nil {
panic("Failed to create final mask")
}
consensus.bitmap = mask
consensus.finalBitmap = finalMask
count++
}
return count
}
// RemovePeers will remove the peers from the validator list and publicKeys
// RemovePeers will remove the peers from the validator list and PublicKeys
// It will be called when leader/node lost connection to peers
func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// TODO (lc) we need to have a corresponding RemovePeers function
return 0
}
// DebugPrintPublicKeys print all the publicKeys in string format in Consensus
// DebugPrintPublicKeys print all the PublicKeys in string format in Consensus
func (consensus *Consensus) DebugPrintPublicKeys() {
for _, k := range consensus.publicKeys {
for _, k := range consensus.PublicKeys {
str := fmt.Sprintf("%s", k)
consensus.Log.Debug("pk:", "string", str)
}
consensus.Log.Debug("PublicKeys:", "#", len(consensus.publicKeys))
consensus.Log.Debug("PublicKeys:", "#", len(consensus.PublicKeys))
}
// DebugPrintValidators print all validator ip/port/key in string format in Consensus

@ -29,13 +29,13 @@ var (
// WaitForNewBlock waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
consensus.Log.Debug("Waiting for block", "consensus", consensus)
backoff := p2p.NewExpBackoff(500*time.Millisecond, 30*time.Second, 2.0)
for { // keep waiting for new blocks
newBlock := <-blockChannel
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.publicKeys))
time.Sleep(500 * time.Millisecond)
continue
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
backoff.Sleep()
}
// TODO: think about potential race condition
@ -43,6 +43,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime)
for consensus.state == Finished {
// time.Sleep(500 * time.Millisecond)
consensus.ResetState()
consensus.startConsensus(&newBlock)
break
}
@ -204,7 +205,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
// proceed only when the message is not received before
_, ok = (*commitments)[validatorID]
shouldProcess := !ok
if len((*commitments)) >= ((len(consensus.publicKeys)*2)/3 + 1) {
if len((*commitments)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
shouldProcess = false
}
if shouldProcess {
@ -221,7 +222,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
return
}
if len((*commitments)) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state < targetState {
if len((*commitments)) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state < targetState {
consensus.Log.Debug("Enough commitments received with signatures", "num", len(*commitments), "state", consensus.state)
// Broadcast challenge
@ -341,7 +342,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S
_, ok = (*responses)[validatorID]
shouldProcess = shouldProcess && !ok
if len((*responses)) >= ((len(consensus.publicKeys)*2)/3 + 1) {
if len((*responses)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
shouldProcess = false
}
@ -366,8 +367,8 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S
return
}
if len(*responses) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state != targetState {
if len(*responses) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state != targetState {
if len(*responses) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state != targetState {
if len(*responses) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state != targetState {
consensus.Log.Debug("Enough responses received with signatures", "num", len(*responses), "state", consensus.state)
// Aggregate responses
responseScalars := []kyber.Scalar{}
@ -489,7 +490,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) {
"key": consensus.pubKey.String(),
"tps": tps,
"txCount": numOfTxs,
"nodeCount": len(consensus.publicKeys) + 1,
"nodeCount": len(consensus.PublicKeys) + 1,
"latestBlockHash": hex.EncodeToString(consensus.blockHash[:]),
"latestTxHashes": txHashes,
"blockLatency": int(timeElapsed / time.Millisecond),
@ -498,7 +499,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) {
}
func (consensus *Consensus) HasEnoughValidators() bool {
if len(consensus.publicKeys) < consensus.MinPeers {
if len(consensus.PublicKeys) < consensus.MinPeers {
return false
}
return true

@ -338,7 +338,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) {
}
// Verify collective signature
err := crypto.Verify(crypto.Ed25519Curve, consensus.publicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.publicKeys)/3)+1))
err := crypto.Verify(crypto.Ed25519Curve, consensus.PublicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.PublicKeys)/3)+1))
if err != nil {
consensus.Log.Warn("Failed to verify the collective sig message", "consensusID", consensusID, "err", err)
return

@ -316,8 +316,7 @@ func (node *Node) AddPeers(peers []p2p.Peer) int {
}
if count > 0 {
c := node.Consensus.AddPeers(peers)
node.log.Info("Node.AddPeers", "#", c)
node.Consensus.AddPeers(peers)
}
return count
}

@ -5,16 +5,17 @@ import (
"bytes"
"encoding/gob"
"fmt"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/pki"
"net"
"os"
"strconv"
"time"
"github.com/dedis/kyber"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/core/types"
hmy_crypto "github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/proto"
"github.com/harmony-one/harmony/proto/client"
@ -586,7 +587,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
// Send a Pong message back
peers := node.Consensus.GetValidatorPeers()
pong := proto_node.NewPongMessage(peers)
pong := proto_node.NewPongMessage(peers, node.Consensus.PublicKeys)
buffer := pong.ConstructPongMessage()
for _, p := range peers {
@ -620,7 +621,26 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
continue
}
peers = append(peers, *peer)
}
count := node.AddPeers(peers)
// Reset Validator PublicKeys every time we receive PONG message from Leader
// The PublicKeys has to be idential across the shard on every node
// TODO (lc): we need to handle RemovePeer situation
node.Consensus.PublicKeys = make([]kyber.Point, 0)
// Create the the PubKey from the []byte sent from leader
for _, k := range pong.PubKeys {
key := hmy_crypto.Ed25519Curve.Point()
err = key.UnmarshalBinary(k[:])
if err != nil {
node.log.Error("UnmarshalBinary Failed PubKeys", "error", err)
continue
}
node.Consensus.PublicKeys = append(node.Consensus.PublicKeys, key)
}
return node.AddPeers(peers)
return count
}

@ -130,7 +130,7 @@ func sendPongMessage(leader p2p.Peer) {
PubKey: pubKey2,
}
pong1 := proto_node.NewPongMessage([]p2p.Peer{p1, p2})
pong1 := proto_node.NewPongMessage([]p2p.Peer{p1, p2}, nil)
buf1 := pong1.ConstructPongMessage()
fmt.Println("waiting for 10 seconds ...")

@ -16,6 +16,7 @@ import (
"fmt"
"log"
"github.com/dedis/kyber"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/proto"
)
@ -38,6 +39,7 @@ type PingMessageType struct {
type PongMessageType struct {
Version uint16 // version of the protocol
Peers []nodeInfo
PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders
}
func (p PingMessageType) String() string {
@ -45,10 +47,7 @@ func (p PingMessageType) String() string {
}
func (p PongMessageType) String() string {
str := fmt.Sprintf("pong:%v=>length:%v\n", p.Version, len(p.Peers))
for _, p := range p.Peers {
str = fmt.Sprintf("%v%v:%v:%v/%v\n", str, p.IP, p.Port, p.ValidatorID, p.PubKey)
}
str := fmt.Sprintf("pong:%v=>length:%v, keys:%v\n", p.Version, len(p.Peers), len(p.PubKeys))
return str
}
@ -63,15 +62,16 @@ func NewPingMessage(peer p2p.Peer) *PingMessageType {
ping.Node.PubKey, err = peer.PubKey.MarshalBinary()
if err != nil {
fmt.Printf("Error Marshall PubKey: %v", err)
fmt.Printf("Error Marshal PubKey: %v", err)
return nil
}
return ping
}
func NewPongMessage(peers []p2p.Peer) *PongMessageType {
func NewPongMessage(peers []p2p.Peer, pubKeys []kyber.Point) *PongMessageType {
pong := new(PongMessageType)
pong.PubKeys = make([][]byte, 0)
pong.Version = PROTOCOL_VERSION
pong.Peers = make([]nodeInfo, 0)
@ -84,12 +84,22 @@ func NewPongMessage(peers []p2p.Peer) *PongMessageType {
n.ValidatorID = p.ValidatorID
n.PubKey, err = p.PubKey.MarshalBinary()
if err != nil {
fmt.Printf("Error Marshall PubKey: %v", err)
fmt.Printf("Error Marshal PubKey: %v", err)
continue
}
pong.Peers = append(pong.Peers, n)
}
for _, p := range pubKeys {
key, err := p.MarshalBinary()
if err != nil {
fmt.Printf("Error Marshal PublicKeys: %v", err)
continue
}
pong.PubKeys = append(pong.PubKeys, key)
}
return pong
}
@ -111,6 +121,8 @@ func GetPingMessage(payload []byte) (*PingMessageType, error) {
// Deserialize Pong Message
func GetPongMessage(payload []byte) (*PongMessageType, error) {
pong := new(PongMessageType)
pong.Peers = make([]nodeInfo, 0)
pong.PubKeys = make([][]byte, 0)
r := bytes.NewBuffer(payload)
decoder := gob.NewDecoder(r)

@ -5,6 +5,7 @@ import (
"strings"
"testing"
"github.com/dedis/kyber"
"github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/p2p"
@ -43,6 +44,8 @@ var (
}
e2 = "pong:1=>length:2"
pubKeys = []kyber.Point{pubKey1, pubKey2}
buf1 []byte
buf2 []byte
)
@ -57,7 +60,7 @@ func TestString(test *testing.T) {
fmt.Println(r1)
}
pong1 := NewPongMessage(p2)
pong1 := NewPongMessage(p2, pubKeys)
r2 := fmt.Sprintf("%v", *pong1)
if !strings.HasPrefix(r2, e2) {
@ -72,7 +75,7 @@ func TestSerialize(test *testing.T) {
buf1 = ping1.ConstructPingMessage()
fmt.Printf("buf ping: %v\n", buf1)
pong1 := NewPongMessage(p2)
pong1 := NewPongMessage(p2, pubKeys)
buf2 = pong1.ConstructPongMessage()
fmt.Printf("buf pong: %v\n", buf2)
}

Loading…
Cancel
Save