fix validatorID not populated to all validator issue

now unique set of validatorID are sync'ed to all validators

Signed-off-by: Leo Chen <leo@harmony.one>
pull/95/head
Leo Chen 6 years ago
parent 0978a9ade0
commit d8f0d504cc
  1. 28
      consensus/consensus.go
  2. 6
      node/node.go
  3. 22
      node/node_handler.go
  4. 2
      p2p/peer.go
  5. 8
      proto/node/pingpong_test.go

@ -239,7 +239,9 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
for _, peer := range peers { for _, peer := range peers {
_, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer)) _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer))
if !ok { if !ok {
if peer.ValidatorID == -1 {
peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId()) peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId())
}
consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer)
consensus.publicKeys = append(consensus.publicKeys, peer.PubKey) consensus.publicKeys = append(consensus.publicKeys, peer.PubKey)
count++ count++
@ -267,3 +269,29 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// TODO (lc) we need to have a corresponding RemovePeers function // TODO (lc) we need to have a corresponding RemovePeers function
return 0 return 0
} }
// DebugPrintPublicKeys print all the publicKeys in string format in Consensus
func (consensus *Consensus) DebugPrintPublicKeys() {
for _, k := range consensus.publicKeys {
str := fmt.Sprintf("%s", k)
consensus.Log.Debug("pk:", "string", str)
}
consensus.Log.Debug("PublicKeys:", "#", len(consensus.publicKeys))
}
// DebugPrintValidators print all validator ip/port/key in string format in Consensus
func (consensus *Consensus) DebugPrintValidators() {
count := 0
consensus.validators.Range(func(k, v interface{}) bool {
if p, ok := v.(p2p.Peer); ok {
str2 := fmt.Sprintf("%s", p.PubKey)
consensus.Log.Debug("validator:", "IP", p.Ip, "Port", p.Port, "VID", p.ValidatorID, "Key", str2)
count++
return true
} else {
return false
}
})
consensus.Log.Debug("Validators", "#", count)
}

@ -27,8 +27,6 @@ import (
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
proto_identity "github.com/harmony-one/harmony/proto/identity" proto_identity "github.com/harmony-one/harmony/proto/identity"
proto_node "github.com/harmony-one/harmony/proto/node" proto_node "github.com/harmony-one/harmony/proto/node"
"github.com/jinzhu/copier"
) )
type NodeState byte type NodeState byte
@ -262,9 +260,7 @@ func (node *Node) AddPeers(peers []p2p.Peer) int {
key := fmt.Sprintf("%v", p.PubKey) key := fmt.Sprintf("%v", p.PubKey)
_, ok := node.Neighbors.Load(key) _, ok := node.Neighbors.Load(key)
if !ok { if !ok {
np := new(p2p.Peer) node.Neighbors.Store(key, p)
copier.Copy(np, &p)
node.Neighbors.Store(key, *np)
count++ count++
} }
} }

@ -317,7 +317,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
select { select {
case <-readySignal: case <-readySignal:
time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up. time.Sleep(100 * time.Millisecond) // Delay a bit so validator is catched up.
case <-time.After(100 * time.Second): case <-time.After(200 * time.Second):
retry = true retry = true
node.Consensus.ResetState() node.Consensus.ResetState()
timeoutCount++ timeoutCount++
@ -524,11 +524,12 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
node.log.Error("Can't get Ping Message") node.log.Error("Can't get Ping Message")
return -1 return -1
} }
node.log.Info("Ping", "Msg", ping) // node.log.Info("Ping", "Msg", ping)
peer := new(p2p.Peer) peer := new(p2p.Peer)
peer.Ip = ping.Node.IP peer.Ip = ping.Node.IP
peer.Port = ping.Node.Port peer.Port = ping.Node.Port
peer.ValidatorID = ping.Node.ValidatorID
peer.PubKey = hmy_crypto.Ed25519Curve.Point() peer.PubKey = hmy_crypto.Ed25519Curve.Point()
err = peer.PubKey.UnmarshalBinary(ping.Node.PubKey[:]) err = peer.PubKey.UnmarshalBinary(ping.Node.PubKey[:])
@ -538,20 +539,10 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
} }
// Add to Node's peer list // Add to Node's peer list
count := node.AddPeers([]p2p.Peer{*peer}) node.AddPeers([]p2p.Peer{*peer})
// Send a Pong message back // Send a Pong message back
peers := make([]p2p.Peer, 0) peers := node.Consensus.GetValidatorPeers()
count = 0
node.Neighbors.Range(func(k, v interface{}) bool {
if p, ok := v.(p2p.Peer); ok {
peers = append(peers, p)
count++
return true
} else {
return false
}
})
pong := proto_node.NewPongMessage(peers) pong := proto_node.NewPongMessage(peers)
buffer := pong.ConstructPongMessage() buffer := pong.ConstructPongMessage()
@ -559,7 +550,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
p2p.SendMessage(p, buffer) p2p.SendMessage(p, buffer)
} }
return count return len(peers)
} }
func (node *Node) pongMessageHandler(msgPayload []byte) int { func (node *Node) pongMessageHandler(msgPayload []byte) int {
@ -577,6 +568,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
peer := new(p2p.Peer) peer := new(p2p.Peer)
peer.Ip = p.IP peer.Ip = p.IP
peer.Port = p.Port peer.Port = p.Port
peer.ValidatorID = p.ValidatorID
peer.PubKey = hmy_crypto.Ed25519Curve.Point() peer.PubKey = hmy_crypto.Ed25519Curve.Point()
err = peer.PubKey.UnmarshalBinary(p.PubKey[:]) err = peer.PubKey.UnmarshalBinary(p.PubKey[:])

@ -19,7 +19,7 @@ type Peer struct {
Port string // Port number of the peer Port string // Port number of the peer
PubKey kyber.Point // Public key of the peer PubKey kyber.Point // Public key of the peer
Ready bool // Ready is true if the peer is ready to join consensus. Ready bool // Ready is true if the peer is ready to join consensus.
ValidatorID int ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard
// TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available. // TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available.
} }

@ -17,10 +17,10 @@ var (
p1 = p2p.Peer{ p1 = p2p.Peer{
Ip: "127.0.0.1", Ip: "127.0.0.1",
Port: "9999", Port: "9999",
ValidatorID: 8888, ValidatorID: -1,
PubKey: pubKey1, PubKey: pubKey1,
} }
e1 = "ping:1=>127.0.0.1:9999:8888/[90 217 28 68 64 211 160 232 61 244 159 244 160 36 61 161 237 242 236 45 147 118 237 88 234 122 198 188 157 116 90 228]" e1 = "ping:1=>127.0.0.1:9999:-1/[90 217 28 68 64 211 160 232 61 244 159 244 160 36 61 161 237 242 236 45 147 118 237 88 234 122 198 188 157 116 90 228]"
priKey2 = crypto.Ed25519Curve.Scalar().SetInt64(int64(999)) priKey2 = crypto.Ed25519Curve.Scalar().SetInt64(int64(999))
pubKey2 = pki.GetPublicKeyFromScalar(priKey2) pubKey2 = pki.GetPublicKeyFromScalar(priKey2)
@ -31,14 +31,14 @@ var (
Port: "8888", Port: "8888",
PubKey: pubKey1, PubKey: pubKey1,
Ready: true, Ready: true,
ValidatorID: 1, ValidatorID: -1,
}, },
{ {
Ip: "127.0.0.1", Ip: "127.0.0.1",
Port: "9999", Port: "9999",
PubKey: pubKey2, PubKey: pubKey2,
Ready: false, Ready: false,
ValidatorID: 2, ValidatorID: -2,
}, },
} }
e2 = "pong:1=>length:2" e2 = "pong:1=>length:2"

Loading…
Cancel
Save