Merge pull request #126 from harmony-one/HAR-78_BeaconChain
HAR-78_Candidate_Node_Connects_BeaconChainpull/128/head
commit
40e13172c0
@ -0,0 +1,23 @@ |
||||
package beaconchain |
||||
|
||||
import "testing" |
||||
|
||||
func TestNewNode(t *testing.T) { |
||||
var ip, port string |
||||
ip = "127.0.0.1" |
||||
port = "8080" |
||||
numshards := 2 |
||||
bc := New(numshards, ip, port) |
||||
|
||||
if bc.PubKey == nil { |
||||
t.Error("beacon chain public key not initialized") |
||||
} |
||||
|
||||
if bc.NumberOfNodesAdded != 0 { |
||||
t.Error("beacon chain number of nodes starting with is not zero! (should be zero)") |
||||
} |
||||
|
||||
if bc.NumberOfShards != numshards { |
||||
t.Error("beacon chain number of shards not initialized to given number of desired shards") |
||||
} |
||||
} |
@ -0,0 +1,191 @@ |
||||
package newnode |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net" |
||||
"os" |
||||
"strconv" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/dedis/kyber" |
||||
"github.com/harmony-one/harmony/crypto" |
||||
"github.com/harmony-one/harmony/log" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/proto/bcconn" |
||||
proto_identity "github.com/harmony-one/harmony/proto/identity" |
||||
"github.com/harmony-one/harmony/utils" |
||||
) |
||||
|
||||
// Service is the server for listening.
|
||||
type Service struct { |
||||
ch chan bool |
||||
waitGroup *sync.WaitGroup |
||||
} |
||||
|
||||
//NewNode is ther struct for a candidate node
|
||||
type NewNode struct { |
||||
Role string |
||||
ShardID int |
||||
ValidatorID int // Validator ID in its shard.
|
||||
leader p2p.Peer |
||||
isLeader bool |
||||
Self p2p.Peer |
||||
peers []p2p.Peer |
||||
PubK kyber.Point |
||||
priK kyber.Scalar |
||||
log log.Logger |
||||
SetInfo bool |
||||
Service *Service |
||||
} |
||||
|
||||
// New candidatenode initialization
|
||||
func New(ip string, port string) *NewNode { |
||||
priKey, pubKey := utils.GenKey(ip, port) |
||||
var node NewNode |
||||
node.PubK = pubKey |
||||
node.priK = priKey |
||||
node.Self = p2p.Peer{IP: ip, Port: port, PubKey: pubKey, ValidatorID: -1} |
||||
node.log = log.New() |
||||
node.SetInfo = false |
||||
return &node |
||||
} |
||||
|
||||
type registerResponseRandomNumber struct { |
||||
NumberOfShards int |
||||
NumberOfNodesAdded int |
||||
Leaders []*bcconn.NodeInfo |
||||
} |
||||
|
||||
// NewService starts a newservice in the candidate node
|
||||
func (node *NewNode) NewService(ip, port string) *Service { |
||||
laddr, err := net.ResolveTCPAddr("tcp", ip+":"+port) |
||||
if nil != err { |
||||
node.log.Crit("cannot resolve the tcp address of the new node", err) |
||||
} |
||||
listener, err := net.ListenTCP("tcp", laddr) |
||||
if nil != err { |
||||
node.log.Crit("cannot start a listener for new node", err) |
||||
} |
||||
node.log.Debug("listening on", "address", laddr.String()) |
||||
node.Service = &Service{ |
||||
ch: make(chan bool), |
||||
waitGroup: &sync.WaitGroup{}, |
||||
} |
||||
node.Service.waitGroup.Add(1) |
||||
go node.Serve(listener) |
||||
return node.Service |
||||
} |
||||
|
||||
// Serve Accept connections and spawn a goroutine to serve each one. Stop listening
|
||||
// if anything is received on the service's channel.
|
||||
func (node *NewNode) Serve(listener *net.TCPListener) { |
||||
defer node.Service.waitGroup.Done() |
||||
for { |
||||
select { |
||||
case <-node.Service.ch: |
||||
node.log.Debug("stopping listening on", "address", listener.Addr()) |
||||
listener.Close() |
||||
node.log.Debug("stopped listening") |
||||
return |
||||
default: |
||||
} |
||||
listener.SetDeadline(time.Now().Add(1e9)) // This deadline is for 1 second to accept new connections.
|
||||
conn, err := listener.AcceptTCP() |
||||
if nil != err { |
||||
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { |
||||
continue |
||||
} |
||||
node.log.Error(err.Error()) |
||||
|
||||
} |
||||
node.Service.waitGroup.Add(1) |
||||
go node.NodeHandler(conn) |
||||
} |
||||
} |
||||
|
||||
// Stop the service by closing the service's channel. Block until the service
|
||||
// is really stopped.
|
||||
func (s *Service) Stop() { |
||||
close(s.ch) |
||||
s.waitGroup.Wait() |
||||
} |
||||
|
||||
func (node NewNode) String() string { |
||||
return fmt.Sprintf("idc: %v:%v and node infi %v", node.Self.IP, node.Self.Port, node.SetInfo) |
||||
} |
||||
|
||||
// ConnectBeaconChain connects to beacon chain
|
||||
func (node *NewNode) ConnectBeaconChain(BCPeer p2p.Peer) { |
||||
node.log.Info("connecting to beacon chain now ...") |
||||
pubk, err := node.PubK.MarshalBinary() |
||||
if err != nil { |
||||
node.log.Error("Could not Marshall public key into binary") |
||||
} |
||||
p := p2p.Peer{IP: node.Self.IP, Port: node.Self.Port} |
||||
nodeInfo := &bcconn.NodeInfo{Self: p, PubK: pubk} |
||||
msg := bcconn.SerializeNodeInfo(nodeInfo) |
||||
msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Register, msg) |
||||
gotShardInfo := false |
||||
timeout := time.After(300 * time.Second) |
||||
tick := time.Tick(5 * time.Second) |
||||
checkLoop: |
||||
for { |
||||
select { |
||||
case <-timeout: |
||||
gotShardInfo = false |
||||
break checkLoop |
||||
case <-tick: |
||||
if node.SetInfo { |
||||
gotShardInfo = true |
||||
break checkLoop |
||||
} else { |
||||
p2p.SendMessage(BCPeer, msgToSend) |
||||
} |
||||
} |
||||
} |
||||
if !gotShardInfo { |
||||
node.log.Crit("Could not get sharding info after 5 minutes") |
||||
os.Exit(1) |
||||
} |
||||
} |
||||
|
||||
// ProcessShardInfo
|
||||
func (node *NewNode) processShardInfo(msgPayload []byte) bool { |
||||
leadersInfo := bcconn.DeserializeRandomInfo(msgPayload) |
||||
leaders := leadersInfo.Leaders |
||||
shardNum, isLeader := utils.AllocateShard(leadersInfo.NumberOfNodesAdded, leadersInfo.NumberOfShards) |
||||
leaderNode := leaders[shardNum-1] //0 indexing.
|
||||
leaderPeer := p2p.Peer{IP: leaderNode.Self.IP, Port: leaderNode.Self.Port} |
||||
leaderPeer.PubKey = crypto.Ed25519Curve.Point() |
||||
err := leaderPeer.PubKey.UnmarshalBinary(leaderNode.PubK[:]) |
||||
if err != nil { |
||||
node.log.Info("Could not unmarshall leaders public key from binary to kyber.point") |
||||
} |
||||
node.leader = leaderPeer |
||||
node.isLeader = isLeader |
||||
node.ShardID = shardNum - 1 //0 indexing.
|
||||
node.SetInfo = true |
||||
node.log.Info("Shard information obtained ..") |
||||
return true |
||||
} |
||||
|
||||
// GetShardID gives shardid of node
|
||||
func (node *NewNode) GetShardID() string { |
||||
return strconv.Itoa(node.ShardID) |
||||
} |
||||
|
||||
// GetLeader gives the leader of the node
|
||||
func (node *NewNode) GetLeader() p2p.Peer { |
||||
return node.leader |
||||
} |
||||
|
||||
// GetClientPeer gives the client of the node
|
||||
func (node *NewNode) GetClientPeer() *p2p.Peer { |
||||
return nil |
||||
} |
||||
|
||||
// GetSelfPeer gives the peer part of the node's own struct
|
||||
func (node *NewNode) GetSelfPeer() p2p.Peer { |
||||
return node.Self |
||||
} |
@ -0,0 +1,63 @@ |
||||
package newnode |
||||
|
||||
import ( |
||||
"net" |
||||
|
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/proto" |
||||
proto_identity "github.com/harmony-one/harmony/proto/identity" |
||||
) |
||||
|
||||
// NodeHandler handles a new incoming connection.
|
||||
func (node *NewNode) NodeHandler(conn net.Conn) { |
||||
defer conn.Close() |
||||
defer node.Service.waitGroup.Done() |
||||
content, err := p2p.ReadMessageContent(conn) |
||||
|
||||
if err != nil { |
||||
node.log.Error("Read p2p data failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
msgCategory, err := proto.GetMessageCategory(content) |
||||
if err != nil { |
||||
node.log.Error("Read node type failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
msgType, err := proto.GetMessageType(content) |
||||
if err != nil { |
||||
node.log.Error("Read action type failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
msgPayload, err := proto.GetMessagePayload(content) |
||||
if err != nil { |
||||
node.log.Error("Read message payload failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
identityMsgPayload, err := proto_identity.GetIdentityMessagePayload(msgPayload) |
||||
if err != nil { |
||||
node.log.Error("Read message payload failed") |
||||
return |
||||
} |
||||
switch msgCategory { |
||||
case proto.Identity: |
||||
actionType := proto_identity.IDMessageType(msgType) |
||||
switch actionType { |
||||
case proto_identity.Identity: |
||||
idMsgType, err := proto_identity.GetIdentityMessageType(msgPayload) |
||||
if err != nil { |
||||
node.log.Error("Error finding the identity message type", err) |
||||
} |
||||
switch idMsgType { |
||||
case proto_identity.Acknowledge: |
||||
node.processShardInfo(identityMsgPayload) |
||||
default: |
||||
panic("The identity message type is wrong/missing and newnode does not handle this identity message type") |
||||
} |
||||
default: |
||||
panic("The msgCategory is wrong/missing and newnode does not handle this protocol message type") |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,18 @@ |
||||
package newnode |
||||
|
||||
import "testing" |
||||
|
||||
func TestNewNode(t *testing.T) { |
||||
var ip, port string |
||||
ip = "127.0.0.1" |
||||
port = "8080" |
||||
nnode := New(ip, port) |
||||
|
||||
if nnode.PubK == nil { |
||||
t.Error("new node public key not initialized") |
||||
} |
||||
|
||||
if nnode.SetInfo { |
||||
t.Error("new node setinfo initialized to true! (should be false)") |
||||
} |
||||
} |
@ -0,0 +1,70 @@ |
||||
package bcconn |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/gob" |
||||
|
||||
"github.com/harmony-one/harmony/log" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
//NodeInfo struct exists to share information on the node
|
||||
type NodeInfo struct { //TODO: to be merged with Leo's nodeinfo.
|
||||
Self p2p.Peer |
||||
PubK []byte |
||||
} |
||||
|
||||
//ResponseRandomNumber struct for exchanging random information
|
||||
type ResponseRandomNumber struct { |
||||
NumberOfShards int |
||||
NumberOfNodesAdded int |
||||
Leaders []*NodeInfo |
||||
} |
||||
|
||||
// SerializeNodeInfo is for serializing nodeinfo
|
||||
func SerializeNodeInfo(nodeinfo *NodeInfo) []byte { |
||||
var result bytes.Buffer |
||||
encoder := gob.NewEncoder(&result) |
||||
err := encoder.Encode(nodeinfo) |
||||
if err != nil { |
||||
log.Error("Could not serialize node info", err) |
||||
} |
||||
return result.Bytes() |
||||
} |
||||
|
||||
// DeserializeNodeInfo deserializes the nodeinfo
|
||||
func DeserializeNodeInfo(d []byte) *NodeInfo { |
||||
var wn NodeInfo |
||||
r := bytes.NewBuffer(d) |
||||
decoder := gob.NewDecoder(r) |
||||
err := decoder.Decode(&wn) |
||||
if err != nil { |
||||
log.Error("Could not de-serialize node info", err) |
||||
} |
||||
return &wn |
||||
} |
||||
|
||||
// SerializeRandomInfo serializes random number informations
|
||||
func SerializeRandomInfo(response ResponseRandomNumber) []byte { |
||||
//Needs to escape the serialization of unexported fields
|
||||
var result bytes.Buffer |
||||
encoder := gob.NewEncoder(&result) |
||||
err := encoder.Encode(response) |
||||
if err != nil { |
||||
log.Crit("Could not serialize randomn number information", "error", err) |
||||
} |
||||
|
||||
return result.Bytes() |
||||
} |
||||
|
||||
// DeserializeRandomInfo deserializes the random informations
|
||||
func DeserializeRandomInfo(d []byte) ResponseRandomNumber { |
||||
var wn ResponseRandomNumber |
||||
r := bytes.NewBuffer(d) |
||||
decoder := gob.NewDecoder(r) |
||||
err := decoder.Decode(&wn) |
||||
if err != nil { |
||||
log.Crit("Could not de-serialize random number information") |
||||
} |
||||
return wn |
||||
} |
@ -0,0 +1,66 @@ |
||||
package bcconn |
||||
|
||||
import ( |
||||
"fmt" |
||||
"reflect" |
||||
"testing" |
||||
|
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/utils" |
||||
) |
||||
|
||||
func TestSerializeDeserializeNodeInfo(t *testing.T) { |
||||
var ip, port string |
||||
ip = "127.0.0.1" |
||||
port = "8080" |
||||
self := p2p.Peer{IP: ip, Port: port} |
||||
_, pk := utils.GenKey(ip, port) |
||||
pkb, err := pk.MarshalBinary() |
||||
if err != nil { |
||||
fmt.Println("problem marshalling binary from public key") |
||||
} |
||||
nodeInfo := &NodeInfo{Self: self, PubK: pkb} |
||||
serializedNI := SerializeNodeInfo(nodeInfo) |
||||
deserializedNI := DeserializeNodeInfo(serializedNI) |
||||
if !reflect.DeepEqual(nodeInfo, deserializedNI) { |
||||
t.Fatalf("serialized and deserializing nodeinfo does not lead to origina nodeinfo") |
||||
} |
||||
|
||||
} |
||||
|
||||
func TestSerializeDeserializeRandomInfo(t *testing.T) { |
||||
var ip, port string |
||||
|
||||
ip = "127.0.0.1" |
||||
port = "8080" |
||||
self := p2p.Peer{IP: ip, Port: port} |
||||
_, pk := utils.GenKey(ip, port) |
||||
pkb, err := pk.MarshalBinary() |
||||
if err != nil { |
||||
fmt.Println("problem marshalling binary from public key") |
||||
} |
||||
nodeInfo1 := &NodeInfo{Self: self, PubK: pkb} |
||||
|
||||
ip = "127.0.0.1" |
||||
port = "9080" |
||||
self2 := p2p.Peer{IP: ip, Port: port} |
||||
_, pk2 := utils.GenKey(ip, port) |
||||
pkb2, err := pk2.MarshalBinary() |
||||
if err != nil { |
||||
fmt.Println("problem marshalling binary from public key") |
||||
} |
||||
nodeInfo2 := &NodeInfo{Self: self2, PubK: pkb2} |
||||
|
||||
leaders := make([]*NodeInfo, 2) |
||||
leaders[0] = nodeInfo1 |
||||
leaders[1] = nodeInfo2 |
||||
|
||||
rrn := ResponseRandomNumber{NumberOfShards: 5, NumberOfNodesAdded: 10, Leaders: leaders} |
||||
serializedrrn := SerializeRandomInfo(rrn) |
||||
deserializedrrn := DeserializeRandomInfo(serializedrrn) |
||||
fmt.Println(rrn) |
||||
fmt.Println(deserializedrrn) |
||||
if !reflect.DeepEqual(rrn, deserializedrrn) { |
||||
t.Fatalf("serializin g and deserializing random response does not lead to original randominfo") |
||||
} |
||||
} |
@ -0,0 +1,16 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"flag" |
||||
|
||||
"github.com/harmony-one/harmony/beaconchain" |
||||
) |
||||
|
||||
func main() { |
||||
numShards := flag.Int("numShards", 1, "number of shards of identity chain") |
||||
ip := flag.String("ip", "127.0.0.1", "ip on which beaconchain listens") |
||||
port := flag.String("port", "8081", "port on which beaconchain listens") |
||||
flag.Parse() |
||||
bc := beaconchain.New(*numShards, *ip, *port) |
||||
bc.StartServer() |
||||
} |
Loading…
Reference in new issue