Merge branch 'master' of github.com:harmony-one/harmony into rj_branch

pull/105/head
Rongjian Lan 6 years ago
commit 940ffaaa88
  1. 34
      consensus/consensus.go
  2. 21
      consensus/consensus_leader.go
  3. 2
      consensus/consensus_validator.go
  4. 3
      node/node.go
  5. 30
      node/node_handler.go
  6. 2
      node/node_test.go
  7. 26
      proto/node/pingpong.go
  8. 7
      proto/node/pingpong_test.go

@ -47,7 +47,7 @@ type Consensus struct {
// Leader // Leader
leader p2p.Peer leader p2p.Peer
// Public keys of the committee including leader and validators // Public keys of the committee including leader and validators
publicKeys []kyber.Point PublicKeys []kyber.Point
// private/public keys of current node // private/public keys of current node
priKey kyber.Scalar priKey kyber.Scalar
@ -137,7 +137,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *
if err != nil { if err != nil {
panic("Failed to create final mask") panic("Failed to create final mask")
} }
consensus.publicKeys = allPublicKeys consensus.PublicKeys = allPublicKeys
consensus.bitmap = mask consensus.bitmap = mask
consensus.finalBitmap = finalMask consensus.finalBitmap = finalMask
@ -208,8 +208,8 @@ func (consensus *Consensus) ResetState() {
consensus.responses = &map[uint16]kyber.Scalar{} consensus.responses = &map[uint16]kyber.Scalar{}
consensus.finalResponses = &map[uint16]kyber.Scalar{} consensus.finalResponses = &map[uint16]kyber.Scalar{}
mask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) mask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.PublicKeys, consensus.leader.PubKey)
finalMask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey) finalMask, _ := crypto.NewMask(crypto.Ed25519Curve, consensus.PublicKeys, consensus.leader.PubKey)
consensus.bitmap = mask consensus.bitmap = mask
consensus.finalBitmap = finalMask consensus.finalBitmap = finalMask
consensus.bitmap.SetMask([]byte{}) consensus.bitmap.SetMask([]byte{})
@ -236,6 +236,7 @@ func (consensus *Consensus) String() string {
// and add the public keys // and add the public keys
func (consensus *Consensus) AddPeers(peers []p2p.Peer) int { func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
count := 0 count := 0
for _, peer := range peers { for _, peer := range peers {
_, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer)) _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer))
if !ok { if !ok {
@ -243,41 +244,28 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId()) peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId())
} }
consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer)
consensus.publicKeys = append(consensus.publicKeys, peer.PubKey) consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
count++
}
}
if count > 0 {
// regenerate bitmaps
mask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey)
if err != nil {
panic("Failed to create mask")
}
finalMask, err := crypto.NewMask(crypto.Ed25519Curve, consensus.publicKeys, consensus.leader.PubKey)
if err != nil {
panic("Failed to create final mask")
} }
consensus.bitmap = mask count++
consensus.finalBitmap = finalMask
} }
return count return count
} }
// RemovePeers will remove the peers from the validator list and publicKeys // RemovePeers will remove the peers from the validator list and PublicKeys
// It will be called when leader/node lost connection to peers // It will be called when leader/node lost connection to peers
func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int { func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// TODO (lc) we need to have a corresponding RemovePeers function // TODO (lc) we need to have a corresponding RemovePeers function
return 0 return 0
} }
// DebugPrintPublicKeys print all the publicKeys in string format in Consensus // DebugPrintPublicKeys print all the PublicKeys in string format in Consensus
func (consensus *Consensus) DebugPrintPublicKeys() { func (consensus *Consensus) DebugPrintPublicKeys() {
for _, k := range consensus.publicKeys { for _, k := range consensus.PublicKeys {
str := fmt.Sprintf("%s", k) str := fmt.Sprintf("%s", k)
consensus.Log.Debug("pk:", "string", str) consensus.Log.Debug("pk:", "string", str)
} }
consensus.Log.Debug("PublicKeys:", "#", len(consensus.publicKeys)) consensus.Log.Debug("PublicKeys:", "#", len(consensus.PublicKeys))
} }
// DebugPrintValidators print all validator ip/port/key in string format in Consensus // DebugPrintValidators print all validator ip/port/key in string format in Consensus

@ -29,13 +29,13 @@ var (
// WaitForNewBlock waits for the next new block to run consensus on // WaitForNewBlock waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) { func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
consensus.Log.Debug("Waiting for block", "consensus", consensus) consensus.Log.Debug("Waiting for block", "consensus", consensus)
backoff := p2p.NewExpBackoff(500*time.Millisecond, 30*time.Second, 2.0)
for { // keep waiting for new blocks for { // keep waiting for new blocks
newBlock := <-blockChannel newBlock := <-blockChannel
if !consensus.HasEnoughValidators() { if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.publicKeys)) consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(500 * time.Millisecond) backoff.Sleep()
continue
} }
// TODO: think about potential race condition // TODO: think about potential race condition
@ -43,6 +43,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime) consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime)
for consensus.state == Finished { for consensus.state == Finished {
// time.Sleep(500 * time.Millisecond) // time.Sleep(500 * time.Millisecond)
consensus.ResetState()
consensus.startConsensus(&newBlock) consensus.startConsensus(&newBlock)
break break
} }
@ -204,7 +205,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
// proceed only when the message is not received before // proceed only when the message is not received before
_, ok = (*commitments)[validatorID] _, ok = (*commitments)[validatorID]
shouldProcess := !ok shouldProcess := !ok
if len((*commitments)) >= ((len(consensus.publicKeys)*2)/3 + 1) { if len((*commitments)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
shouldProcess = false shouldProcess = false
} }
if shouldProcess { if shouldProcess {
@ -221,7 +222,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
return return
} }
if len((*commitments)) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state < targetState { if len((*commitments)) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state < targetState {
consensus.Log.Debug("Enough commitments received with signatures", "num", len(*commitments), "state", consensus.state) consensus.Log.Debug("Enough commitments received with signatures", "num", len(*commitments), "state", consensus.state)
// Broadcast challenge // Broadcast challenge
@ -341,7 +342,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S
_, ok = (*responses)[validatorID] _, ok = (*responses)[validatorID]
shouldProcess = shouldProcess && !ok shouldProcess = shouldProcess && !ok
if len((*responses)) >= ((len(consensus.publicKeys)*2)/3 + 1) { if len((*responses)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
shouldProcess = false shouldProcess = false
} }
@ -366,8 +367,8 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S
return return
} }
if len(*responses) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state != targetState { if len(*responses) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state != targetState {
if len(*responses) >= ((len(consensus.publicKeys)*2)/3+1) && consensus.state != targetState { if len(*responses) >= ((len(consensus.PublicKeys)*2)/3+1) && consensus.state != targetState {
consensus.Log.Debug("Enough responses received with signatures", "num", len(*responses), "state", consensus.state) consensus.Log.Debug("Enough responses received with signatures", "num", len(*responses), "state", consensus.state)
// Aggregate responses // Aggregate responses
responseScalars := []kyber.Scalar{} responseScalars := []kyber.Scalar{}
@ -489,7 +490,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) {
"key": consensus.pubKey.String(), "key": consensus.pubKey.String(),
"tps": tps, "tps": tps,
"txCount": numOfTxs, "txCount": numOfTxs,
"nodeCount": len(consensus.publicKeys) + 1, "nodeCount": len(consensus.PublicKeys) + 1,
"latestBlockHash": hex.EncodeToString(consensus.blockHash[:]), "latestBlockHash": hex.EncodeToString(consensus.blockHash[:]),
"latestTxHashes": txHashes, "latestTxHashes": txHashes,
"blockLatency": int(timeElapsed / time.Millisecond), "blockLatency": int(timeElapsed / time.Millisecond),
@ -498,7 +499,7 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) {
} }
func (consensus *Consensus) HasEnoughValidators() bool { func (consensus *Consensus) HasEnoughValidators() bool {
if len(consensus.publicKeys) < consensus.MinPeers { if len(consensus.PublicKeys) < consensus.MinPeers {
return false return false
} }
return true return true

@ -338,7 +338,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) {
} }
// Verify collective signature // Verify collective signature
err := crypto.Verify(crypto.Ed25519Curve, consensus.publicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.publicKeys)/3)+1)) err := crypto.Verify(crypto.Ed25519Curve, consensus.PublicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.PublicKeys)/3)+1))
if err != nil { if err != nil {
consensus.Log.Warn("Failed to verify the collective sig message", "consensusID", consensusID, "err", err) consensus.Log.Warn("Failed to verify the collective sig message", "consensusID", consensusID, "err", err)
return return

@ -316,8 +316,7 @@ func (node *Node) AddPeers(peers []p2p.Peer) int {
} }
if count > 0 { if count > 0 {
c := node.Consensus.AddPeers(peers) node.Consensus.AddPeers(peers)
node.log.Info("Node.AddPeers", "#", c)
} }
return count return count
} }

@ -5,16 +5,17 @@ import (
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/pki"
"net" "net"
"os" "os"
"strconv" "strconv"
"time" "time"
"github.com/dedis/kyber"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/core/types"
hmy_crypto "github.com/harmony-one/harmony/crypto" hmy_crypto "github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/proto" "github.com/harmony-one/harmony/proto"
"github.com/harmony-one/harmony/proto/client" "github.com/harmony-one/harmony/proto/client"
@ -586,7 +587,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
// Send a Pong message back // Send a Pong message back
peers := node.Consensus.GetValidatorPeers() peers := node.Consensus.GetValidatorPeers()
pong := proto_node.NewPongMessage(peers) pong := proto_node.NewPongMessage(peers, node.Consensus.PublicKeys)
buffer := pong.ConstructPongMessage() buffer := pong.ConstructPongMessage()
for _, p := range peers { for _, p := range peers {
@ -620,7 +621,26 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
continue continue
} }
peers = append(peers, *peer) peers = append(peers, *peer)
}
count := node.AddPeers(peers)
// Reset Validator PublicKeys every time we receive PONG message from Leader
// The PublicKeys has to be idential across the shard on every node
// TODO (lc): we need to handle RemovePeer situation
node.Consensus.PublicKeys = make([]kyber.Point, 0)
// Create the the PubKey from the []byte sent from leader
for _, k := range pong.PubKeys {
key := hmy_crypto.Ed25519Curve.Point()
err = key.UnmarshalBinary(k[:])
if err != nil {
node.log.Error("UnmarshalBinary Failed PubKeys", "error", err)
continue
}
node.Consensus.PublicKeys = append(node.Consensus.PublicKeys, key)
} }
return node.AddPeers(peers) return count
} }

@ -130,7 +130,7 @@ func sendPongMessage(leader p2p.Peer) {
PubKey: pubKey2, PubKey: pubKey2,
} }
pong1 := proto_node.NewPongMessage([]p2p.Peer{p1, p2}) pong1 := proto_node.NewPongMessage([]p2p.Peer{p1, p2}, nil)
buf1 := pong1.ConstructPongMessage() buf1 := pong1.ConstructPongMessage()
fmt.Println("waiting for 10 seconds ...") fmt.Println("waiting for 10 seconds ...")

@ -16,6 +16,7 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/dedis/kyber"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/proto" "github.com/harmony-one/harmony/proto"
) )
@ -38,6 +39,7 @@ type PingMessageType struct {
type PongMessageType struct { type PongMessageType struct {
Version uint16 // version of the protocol Version uint16 // version of the protocol
Peers []nodeInfo Peers []nodeInfo
PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders
} }
func (p PingMessageType) String() string { func (p PingMessageType) String() string {
@ -45,10 +47,7 @@ func (p PingMessageType) String() string {
} }
func (p PongMessageType) String() string { func (p PongMessageType) String() string {
str := fmt.Sprintf("pong:%v=>length:%v\n", p.Version, len(p.Peers)) str := fmt.Sprintf("pong:%v=>length:%v, keys:%v\n", p.Version, len(p.Peers), len(p.PubKeys))
for _, p := range p.Peers {
str = fmt.Sprintf("%v%v:%v:%v/%v\n", str, p.IP, p.Port, p.ValidatorID, p.PubKey)
}
return str return str
} }
@ -63,15 +62,16 @@ func NewPingMessage(peer p2p.Peer) *PingMessageType {
ping.Node.PubKey, err = peer.PubKey.MarshalBinary() ping.Node.PubKey, err = peer.PubKey.MarshalBinary()
if err != nil { if err != nil {
fmt.Printf("Error Marshall PubKey: %v", err) fmt.Printf("Error Marshal PubKey: %v", err)
return nil return nil
} }
return ping return ping
} }
func NewPongMessage(peers []p2p.Peer) *PongMessageType { func NewPongMessage(peers []p2p.Peer, pubKeys []kyber.Point) *PongMessageType {
pong := new(PongMessageType) pong := new(PongMessageType)
pong.PubKeys = make([][]byte, 0)
pong.Version = PROTOCOL_VERSION pong.Version = PROTOCOL_VERSION
pong.Peers = make([]nodeInfo, 0) pong.Peers = make([]nodeInfo, 0)
@ -84,12 +84,22 @@ func NewPongMessage(peers []p2p.Peer) *PongMessageType {
n.ValidatorID = p.ValidatorID n.ValidatorID = p.ValidatorID
n.PubKey, err = p.PubKey.MarshalBinary() n.PubKey, err = p.PubKey.MarshalBinary()
if err != nil { if err != nil {
fmt.Printf("Error Marshall PubKey: %v", err) fmt.Printf("Error Marshal PubKey: %v", err)
continue continue
} }
pong.Peers = append(pong.Peers, n) pong.Peers = append(pong.Peers, n)
} }
for _, p := range pubKeys {
key, err := p.MarshalBinary()
if err != nil {
fmt.Printf("Error Marshal PublicKeys: %v", err)
continue
}
pong.PubKeys = append(pong.PubKeys, key)
}
return pong return pong
} }
@ -111,6 +121,8 @@ func GetPingMessage(payload []byte) (*PingMessageType, error) {
// Deserialize Pong Message // Deserialize Pong Message
func GetPongMessage(payload []byte) (*PongMessageType, error) { func GetPongMessage(payload []byte) (*PongMessageType, error) {
pong := new(PongMessageType) pong := new(PongMessageType)
pong.Peers = make([]nodeInfo, 0)
pong.PubKeys = make([][]byte, 0)
r := bytes.NewBuffer(payload) r := bytes.NewBuffer(payload)
decoder := gob.NewDecoder(r) decoder := gob.NewDecoder(r)

@ -5,6 +5,7 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/dedis/kyber"
"github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -43,6 +44,8 @@ var (
} }
e2 = "pong:1=>length:2" e2 = "pong:1=>length:2"
pubKeys = []kyber.Point{pubKey1, pubKey2}
buf1 []byte buf1 []byte
buf2 []byte buf2 []byte
) )
@ -57,7 +60,7 @@ func TestString(test *testing.T) {
fmt.Println(r1) fmt.Println(r1)
} }
pong1 := NewPongMessage(p2) pong1 := NewPongMessage(p2, pubKeys)
r2 := fmt.Sprintf("%v", *pong1) r2 := fmt.Sprintf("%v", *pong1)
if !strings.HasPrefix(r2, e2) { if !strings.HasPrefix(r2, e2) {
@ -72,7 +75,7 @@ func TestSerialize(test *testing.T) {
buf1 = ping1.ConstructPingMessage() buf1 = ping1.ConstructPingMessage()
fmt.Printf("buf ping: %v\n", buf1) fmt.Printf("buf ping: %v\n", buf1)
pong1 := NewPongMessage(p2) pong1 := NewPongMessage(p2, pubKeys)
buf2 = pong1.ConstructPongMessage() buf2 = pong1.ConstructPongMessage()
fmt.Printf("buf pong: %v\n", buf2) fmt.Printf("buf pong: %v\n", buf2)
} }

Loading…
Cancel
Save