Merge pull request #280 from harmony-one/leochen_fix_peerstore_in_hostv2

fix peerstore in hostv2
pull/286/head
Leo Chen 6 years ago committed by GitHub
commit f7f4e800e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      api/proto/bcconn/bcconn.go
  2. 13
      api/proto/bcconn/bcconn_test.go
  3. 19
      api/proto/node/pingpong.go
  4. 3
      cmd/beaconchain/main.go
  5. 44
      cmd/client/txgen/main.go
  6. 3
      cmd/client/wallet/main.go
  7. 37
      cmd/harmony.go
  8. 10
      consensus/consensus_leader_msg_test.go
  9. 18
      consensus/consensus_leader_test.go
  10. 6
      consensus/consensus_test.go
  11. 4
      consensus/consensus_validator_msg_test.go
  12. 6
      consensus/consensus_validator_test.go
  13. 35
      internal/beaconchain/libs/beaconchain.go
  14. 12
      internal/beaconchain/libs/beaconchain_test.go
  15. 8
      internal/beaconchain/rpc/server.go
  16. 56
      internal/newnode/newnode.go
  17. 41
      internal/newnode/newnode_test.go
  18. 14
      internal/utils/utils.go
  19. 27
      internal/utils/utils_test.go
  20. 1
      node/node.go
  21. 2
      node/node_handler.go
  22. 6
      node/node_handler_test.go
  23. 12
      node/node_test.go
  24. 2
      p2p/helper.go
  25. 3
      p2p/host/host.go
  26. 15
      p2p/host/hostv1/hostv1.go
  27. 69
      p2p/host/hostv2/hostv2.go
  28. 23
      p2p/host/message_test.go
  29. 25
      p2p/host/mock/host_mock.go
  30. 14
      p2p/p2p.go
  31. 46
      p2p/p2pimpl/p2pimpl.go
  32. 9
      test/deploy.sh

@ -5,24 +5,19 @@ import (
"encoding/gob" "encoding/gob"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/p2p"
)
//NodeInfo struct exists to share information on the node "github.com/harmony-one/harmony/api/proto/node"
type NodeInfo struct { //TODO: to be merged with Leo's nodeinfo. )
Self p2p.Peer
PubK []byte
}
//ResponseRandomNumber struct for exchanging random information //ResponseRandomNumber struct for exchanging random information
type ResponseRandomNumber struct { type ResponseRandomNumber struct {
NumberOfShards int NumberOfShards int
NumberOfNodesAdded int NumberOfNodesAdded int
Leaders []*NodeInfo Leaders []*node.Info
} }
// SerializeNodeInfo is for serializing nodeinfo // SerializeNodeInfo is for serializing nodeinfo
func SerializeNodeInfo(nodeinfo *NodeInfo) []byte { func SerializeNodeInfo(nodeinfo *node.Info) []byte {
var result bytes.Buffer var result bytes.Buffer
encoder := gob.NewEncoder(&result) encoder := gob.NewEncoder(&result)
err := encoder.Encode(nodeinfo) err := encoder.Encode(nodeinfo)
@ -33,8 +28,8 @@ func SerializeNodeInfo(nodeinfo *NodeInfo) []byte {
} }
// DeserializeNodeInfo deserializes the nodeinfo // DeserializeNodeInfo deserializes the nodeinfo
func DeserializeNodeInfo(d []byte) *NodeInfo { func DeserializeNodeInfo(d []byte) *node.Info {
var wn NodeInfo var wn node.Info
r := bytes.NewBuffer(d) r := bytes.NewBuffer(d)
decoder := gob.NewDecoder(r) decoder := gob.NewDecoder(r)
err := decoder.Decode(&wn) err := decoder.Decode(&wn)

@ -5,21 +5,20 @@ import (
"reflect" "reflect"
"testing" "testing"
"github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
) )
func TestSerializeDeserializeNodeInfo(t *testing.T) { func TestSerializeDeserializeNodeInfo(t *testing.T) {
var ip, port string var ip, port string
ip = "127.0.0.1" ip = "127.0.0.1"
port = "8080" port = "8080"
self := p2p.Peer{IP: ip, Port: port}
_, pk := utils.GenKey(ip, port) _, pk := utils.GenKey(ip, port)
pkb, err := pk.MarshalBinary() pkb, err := pk.MarshalBinary()
if err != nil { if err != nil {
fmt.Println("problem marshalling binary from public key") fmt.Println("problem marshalling binary from public key")
} }
nodeInfo := &NodeInfo{Self: self, PubK: pkb} nodeInfo := &node.Info{IP: ip, Port: port, PubKey: pkb}
serializedNI := SerializeNodeInfo(nodeInfo) serializedNI := SerializeNodeInfo(nodeInfo)
deserializedNI := DeserializeNodeInfo(serializedNI) deserializedNI := DeserializeNodeInfo(serializedNI)
if !reflect.DeepEqual(nodeInfo, deserializedNI) { if !reflect.DeepEqual(nodeInfo, deserializedNI) {
@ -33,25 +32,23 @@ func TestSerializeDeserializeRandomInfo(t *testing.T) {
ip = "127.0.0.1" ip = "127.0.0.1"
port = "8080" port = "8080"
self := p2p.Peer{IP: ip, Port: port}
_, pk := utils.GenKey(ip, port) _, pk := utils.GenKey(ip, port)
pkb, err := pk.MarshalBinary() pkb, err := pk.MarshalBinary()
if err != nil { if err != nil {
fmt.Println("problem marshalling binary from public key") fmt.Println("problem marshalling binary from public key")
} }
nodeInfo1 := &NodeInfo{Self: self, PubK: pkb} nodeInfo1 := &node.Info{IP: ip, Port: port, PubKey: pkb}
ip = "127.0.0.1" ip = "127.0.0.1"
port = "9080" port = "9080"
self2 := p2p.Peer{IP: ip, Port: port}
_, pk2 := utils.GenKey(ip, port) _, pk2 := utils.GenKey(ip, port)
pkb2, err := pk2.MarshalBinary() pkb2, err := pk2.MarshalBinary()
if err != nil { if err != nil {
fmt.Println("problem marshalling binary from public key") fmt.Println("problem marshalling binary from public key")
} }
nodeInfo2 := &NodeInfo{Self: self2, PubK: pkb2} nodeInfo2 := &node.Info{IP: ip, Port: port, PubKey: pkb2}
leaders := make([]*NodeInfo, 2) leaders := make([]*node.Info, 2)
leaders[0] = nodeInfo1 leaders[0] = nodeInfo1
leaders[1] = nodeInfo2 leaders[1] = nodeInfo2

@ -19,6 +19,8 @@ import (
"github.com/dedis/kyber" "github.com/dedis/kyber"
"github.com/harmony-one/harmony/api/proto" "github.com/harmony-one/harmony/api/proto"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
peer "github.com/libp2p/go-libp2p-peer"
) )
// RoleType defines the role of the node // RoleType defines the role of the node
@ -40,27 +42,28 @@ func (r RoleType) String() string {
return "Unknown" return "Unknown"
} }
// refer to Peer struct in p2p/peer.go // Info refers to Peer struct in p2p/peer.go
// this is basically a simplified version of Peer // this is basically a simplified version of Peer
// for network transportation // for network transportation
type nodeInfo struct { type Info struct {
IP string IP string
Port string Port string
PubKey []byte PubKey []byte
ValidatorID int ValidatorID int
Role RoleType Role RoleType
PeerID peer.ID // Peerstore ID
} }
// PingMessageType defines the data structure of the Ping message // PingMessageType defines the data structure of the Ping message
type PingMessageType struct { type PingMessageType struct {
Version uint16 // version of the protocol Version uint16 // version of the protocol
Node nodeInfo Node Info
} }
// PongMessageType defines the data structure of the Pong message // PongMessageType defines the data structure of the Pong message
type PongMessageType struct { type PongMessageType struct {
Version uint16 // version of the protocol Version uint16 // version of the protocol
Peers []nodeInfo Peers []Info
PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders
} }
@ -81,6 +84,7 @@ func NewPingMessage(peer p2p.Peer) *PingMessageType {
ping.Version = ProtocolVersion ping.Version = ProtocolVersion
ping.Node.IP = peer.IP ping.Node.IP = peer.IP
ping.Node.Port = peer.Port ping.Node.Port = peer.Port
ping.Node.PeerID = peer.PeerID
ping.Node.ValidatorID = peer.ValidatorID ping.Node.ValidatorID = peer.ValidatorID
ping.Node.PubKey, err = peer.PubKey.MarshalBinary() ping.Node.PubKey, err = peer.PubKey.MarshalBinary()
ping.Node.Role = ValidatorRole ping.Node.Role = ValidatorRole
@ -99,14 +103,15 @@ func NewPongMessage(peers []p2p.Peer, pubKeys []kyber.Point) *PongMessageType {
pong.PubKeys = make([][]byte, 0) pong.PubKeys = make([][]byte, 0)
pong.Version = ProtocolVersion pong.Version = ProtocolVersion
pong.Peers = make([]nodeInfo, 0) pong.Peers = make([]Info, 0)
var err error var err error
for _, p := range peers { for _, p := range peers {
n := nodeInfo{} n := Info{}
n.IP = p.IP n.IP = p.IP
n.Port = p.Port n.Port = p.Port
n.ValidatorID = p.ValidatorID n.ValidatorID = p.ValidatorID
n.PeerID = p.PeerID
n.PubKey, err = p.PubKey.MarshalBinary() n.PubKey, err = p.PubKey.MarshalBinary()
if err != nil { if err != nil {
fmt.Printf("Error Marshal PubKey: %v", err) fmt.Printf("Error Marshal PubKey: %v", err)
@ -146,7 +151,7 @@ func GetPingMessage(payload []byte) (*PingMessageType, error) {
// GetPongMessage deserializes the Pong Message from a list of byte // GetPongMessage deserializes the Pong Message from a list of byte
func GetPongMessage(payload []byte) (*PongMessageType, error) { func GetPongMessage(payload []byte) (*PongMessageType, error) {
pong := new(PongMessageType) pong := new(PongMessageType)
pong.Peers = make([]nodeInfo, 0) pong.Peers = make([]Info, 0)
pong.PubKeys = make([][]byte, 0) pong.PubKeys = make([][]byte, 0)
r := bytes.NewBuffer(payload) r := bytes.NewBuffer(payload)

@ -48,6 +48,9 @@ func main() {
beaconchain.SetSaveFile(*resetFlag) beaconchain.SetSaveFile(*resetFlag)
bc = beaconchain.New(*numShards, *ip, *port) bc = beaconchain.New(*numShards, *ip, *port)
} }
fmt.Printf("Beacon Chain Started: /ip4/%s/tcp/%v/ipfs/%s\n", *ip, *port, bc.GetID().Pretty())
go bc.SupportRPC() go bc.SupportRPC()
bc.StartServer() bc.StartServer()
} }

@ -16,10 +16,12 @@ import (
"github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/newnode" "github.com/harmony-one/harmony/internal/newnode"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl" "github.com/harmony-one/harmony/p2p/p2pimpl"
peerstore "github.com/libp2p/go-libp2p-peerstore"
multiaddr "github.com/multiformats/go-multiaddr"
) )
var ( var (
@ -50,6 +52,8 @@ func main() {
bcIP := flag.String("bc", "127.0.0.1", "IP of the identity chain") bcIP := flag.String("bc", "127.0.0.1", "IP of the identity chain")
bcPort := flag.String("bc_port", "8081", "port of the identity chain") bcPort := flag.String("bc_port", "8081", "port of the identity chain")
bcAddr := flag.String("bc_addr", "", "MultiAddr of the identity chain")
flag.Parse() flag.Parse()
if *versionFlag { if *versionFlag {
@ -59,19 +63,35 @@ func main() {
// Add GOMAXPROCS to achieve max performance. // Add GOMAXPROCS to achieve max performance.
runtime.GOMAXPROCS(1024) runtime.GOMAXPROCS(1024)
var clientPeer *p2p.Peer var bcPeer *p2p.Peer
var shardIDLeaderMap map[uint32]p2p.Peer var shardIDLeaderMap map[uint32]p2p.Peer
if *bcAddr != "" {
// Turn the destination into a multiaddr.
maddr, err := multiaddr.NewMultiaddr(*bcAddr)
if err != nil {
panic(err)
}
// Extract the peer ID from the multiaddr.
info, err := peerstore.InfoFromP2pAddr(maddr)
if err != nil {
panic(err)
}
bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID}
} else {
bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort}
}
candidateNode := newnode.New(*ip, *port) candidateNode := newnode.New(*ip, *port)
BCPeer := p2p.Peer{IP: *bcIP, Port: *bcPort} candidateNode.AddPeer(bcPeer)
candidateNode.ContactBeaconChain(BCPeer) candidateNode.ContactBeaconChain(*bcPeer)
clientPeer = &p2p.Peer{IP: *ip, Port: *port} selfPeer := candidateNode.GetSelfPeer()
selfPeer.PubKey = candidateNode.PubK
shardIDLeaderMap = candidateNode.Leaders shardIDLeaderMap = candidateNode.Leaders
if clientPeer == nil {
panic("Client Peer is nil!")
}
debugPrintShardIDLeaderMap(shardIDLeaderMap) debugPrintShardIDLeaderMap(shardIDLeaderMap)
// Do cross shard tx if there are more than one shard // Do cross shard tx if there are more than one shard
@ -93,9 +113,10 @@ func main() {
// Nodes containing blockchain data to mirror the shards' data in the network // Nodes containing blockchain data to mirror the shards' data in the network
nodes := []*node.Node{} nodes := []*node.Node{}
_, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) host, err := p2pimpl.NewHost(&selfPeer)
clientPeer.PubKey = pubKey if err != nil {
host := p2pimpl.NewHost(*clientPeer) panic("unable to new host in txgen")
}
for shardID := range shardIDLeaderMap { for shardID := range shardIDLeaderMap {
node := node.New(host, &consensus.Consensus{ShardID: shardID}, nil) node := node.New(host, &consensus.Consensus{ShardID: shardID}, nil)
// Assign many fake addresses so we have enough address to play with at first // Assign many fake addresses so we have enough address to play with at first
@ -142,6 +163,7 @@ func main() {
for _, leader := range shardIDLeaderMap { for _, leader := range shardIDLeaderMap {
log.Debug("Client Join Shard", "leader", leader) log.Debug("Client Join Shard", "leader", leader)
clientNode.GetHost().AddPeer(&leader)
go clientNode.JoinShard(leader) go clientNode.JoinShard(leader)
// wait for 3 seconds for client to send ping message to leader // wait for 3 seconds for client to send ping message to leader
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)

@ -298,7 +298,8 @@ func CreateWalletNode() *node.Node {
} }
// dummy host for wallet // dummy host for wallet
host := p2pimpl.NewHost(p2p.Peer{IP: "127.0.0.1", Port: "6789"}) self := p2p.Peer{IP: "127.0.0.1", Port: "6789"}
host, _ := p2pimpl.NewHost(&self)
walletNode := node.New(host, nil, nil) walletNode := node.New(host, nil, nil)
walletNode.Client = client.NewClient(walletNode.GetHost(), &shardIDLeaderMap) walletNode.Client = client.NewClient(walletNode.GetHost(), &shardIDLeaderMap)
return walletNode return walletNode

@ -18,6 +18,9 @@ import (
"github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl" "github.com/harmony-one/harmony/p2p/p2pimpl"
peerstore "github.com/libp2p/go-libp2p-peerstore"
multiaddr "github.com/multiformats/go-multiaddr"
) )
var ( var (
@ -88,6 +91,7 @@ func main() {
//This IP belongs to jenkins.harmony.one //This IP belongs to jenkins.harmony.one
bcIP := flag.String("bc", "127.0.0.1", "IP of the identity chain") bcIP := flag.String("bc", "127.0.0.1", "IP of the identity chain")
bcPort := flag.String("bc_port", "8081", "port of the identity chain") bcPort := flag.String("bc_port", "8081", "port of the identity chain")
bcAddr := flag.String("bc_addr", "", "MultiAddr of the identity chain")
//Leader needs to have a minimal number of peers to start consensus //Leader needs to have a minimal number of peers to start consensus
minPeers := flag.Int("min_peers", 100, "Minimal number of Peers in shard") minPeers := flag.Int("min_peers", 100, "Minimal number of Peers in shard")
@ -109,10 +113,31 @@ func main() {
var leader p2p.Peer var leader p2p.Peer
var selfPeer p2p.Peer var selfPeer p2p.Peer
var clientPeer *p2p.Peer var clientPeer *p2p.Peer
var BCPeer *p2p.Peer
if *bcAddr != "" {
// Turn the destination into a multiaddr.
maddr, err := multiaddr.NewMultiaddr(*bcAddr)
if err != nil {
panic(err)
}
// Extract the peer ID from the multiaddr.
info, err := peerstore.InfoFromP2pAddr(maddr)
if err != nil {
panic(err)
}
BCPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID}
} else {
BCPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort}
}
//Use Peer Discovery to get shard/leader/peer/... //Use Peer Discovery to get shard/leader/peer/...
candidateNode := pkg_newnode.New(*ip, *port) candidateNode := pkg_newnode.New(*ip, *port)
BCPeer := p2p.Peer{IP: *bcIP, Port: *bcPort} candidateNode.AddPeer(BCPeer)
candidateNode.ContactBeaconChain(BCPeer) candidateNode.ContactBeaconChain(*BCPeer)
shardID = candidateNode.GetShardID() shardID = candidateNode.GetShardID()
leader = candidateNode.GetLeader() leader = candidateNode.GetLeader()
selfPeer = candidateNode.GetSelfPeer() selfPeer = candidateNode.GetSelfPeer()
@ -142,7 +167,13 @@ func main() {
ldb, _ = InitLDBDatabase(*ip, *port) ldb, _ = InitLDBDatabase(*ip, *port)
} }
host := p2pimpl.NewHost(selfPeer) host, err := p2pimpl.NewHost(&selfPeer)
if err != nil {
panic("unable to new host in harmony")
}
host.AddPeer(&leader)
// Consensus object. // Consensus object.
consensus := consensus.New(host, shardID, peers, leader) consensus := consensus.New(host, shardID, peers, leader)
consensus.MinPeers = *minPeers consensus.MinPeers = *minPeers

@ -14,7 +14,10 @@ import (
func TestConstructAnnounceMessage(test *testing.T) { func TestConstructAnnounceMessage(test *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"} leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"}
validator := p2p.Peer{IP: "127.0.0.1", Port: "55555"} validator := p2p.Peer{IP: "127.0.0.1", Port: "55555"}
host := p2pimpl.NewHost(leader) host, err := p2pimpl.NewHost(&leader)
if err != nil {
test.Fatalf("new host failed: %v", err)
}
consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := New(host, "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
msg := consensus.constructAnnounceMessage() msg := consensus.constructAnnounceMessage()
@ -36,7 +39,10 @@ func TestConstructChallengeMessage(test *testing.T) {
validatorPriKey.UnmarshalBinary(priKeyInBytes[:]) validatorPriKey.UnmarshalBinary(priKeyInBytes[:])
validatorPubKey := pki.GetPublicKeyFromScalar(leaderPriKey) validatorPubKey := pki.GetPublicKeyFromScalar(leaderPriKey)
validator := p2p.Peer{IP: "127.0.0.1", Port: "5555", PubKey: validatorPubKey} validator := p2p.Peer{IP: "127.0.0.1", Port: "5555", PubKey: validatorPubKey}
host := p2pimpl.NewHost(leader) host, err := p2pimpl.NewHost(&leader)
if err != nil {
test.Fatalf("new host failed: %v", err)
}
consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := New(host, "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
(*consensus.commitments)[0] = leaderPubKey (*consensus.commitments)[0] = leaderPubKey

@ -38,17 +38,20 @@ func TestProcessMessageLeaderCommit(test *testing.T) {
consensusLeader := New(m, "0", []p2p.Peer{validator1, validator2, validator3}, leader) consensusLeader := New(m, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
consensusLeader.blockHash = [32]byte{} consensusLeader.blockHash = [32]byte{}
consensusValidator1 := New(p2pimpl.NewHost(validator1), "0", []p2p.Peer{validator1, validator2, validator3}, leader) host1, _ := p2pimpl.NewHost(&validator1)
consensusValidator1 := New(host1, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
consensusValidator1.blockHash = [32]byte{} consensusValidator1.blockHash = [32]byte{}
_, msg := consensusValidator1.constructCommitMessage(consensus_proto.MessageType_COMMIT) _, msg := consensusValidator1.constructCommitMessage(consensus_proto.MessageType_COMMIT)
consensusLeader.ProcessMessageLeader(msg[1:]) consensusLeader.ProcessMessageLeader(msg[1:])
consensusValidator2 := New(p2pimpl.NewHost(validator2), "0", []p2p.Peer{validator1, validator2, validator3}, leader) host2, _ := p2pimpl.NewHost(&validator2)
consensusValidator2 := New(host2, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
consensusValidator2.blockHash = [32]byte{} consensusValidator2.blockHash = [32]byte{}
_, msg = consensusValidator2.constructCommitMessage(consensus_proto.MessageType_COMMIT) _, msg = consensusValidator2.constructCommitMessage(consensus_proto.MessageType_COMMIT)
consensusLeader.ProcessMessageLeader(msg[1:]) consensusLeader.ProcessMessageLeader(msg[1:])
consensusValidator3 := New(p2pimpl.NewHost(validator3), "0", []p2p.Peer{validator1, validator2, validator3}, leader) host3, _ := p2pimpl.NewHost(&validator3)
consensusValidator3 := New(host3, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
consensusValidator3.blockHash = [32]byte{} consensusValidator3.blockHash = [32]byte{}
_, msg = consensusValidator3.constructCommitMessage(consensus_proto.MessageType_COMMIT) _, msg = consensusValidator3.constructCommitMessage(consensus_proto.MessageType_COMMIT)
consensusLeader.ProcessMessageLeader(msg[1:]) consensusLeader.ProcessMessageLeader(msg[1:])
@ -81,17 +84,20 @@ func TestProcessMessageLeaderResponse(test *testing.T) {
consensusLeader := New(m, "0", []p2p.Peer{validator1, validator2, validator3}, leader) consensusLeader := New(m, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
consensusLeader.blockHash = [32]byte{} consensusLeader.blockHash = [32]byte{}
consensusValidator1 := New(p2pimpl.NewHost(validator1), "0", []p2p.Peer{validator1, validator2, validator3}, leader) host1, _ := p2pimpl.NewHost(&validator1)
consensusValidator1 := New(host1, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
consensusValidator1.blockHash = [32]byte{} consensusValidator1.blockHash = [32]byte{}
_, msg := consensusValidator1.constructCommitMessage(consensus_proto.MessageType_COMMIT) _, msg := consensusValidator1.constructCommitMessage(consensus_proto.MessageType_COMMIT)
consensusLeader.ProcessMessageLeader(msg[1:]) consensusLeader.ProcessMessageLeader(msg[1:])
consensusValidator2 := New(p2pimpl.NewHost(validator2), "0", []p2p.Peer{validator1, validator2, validator3}, leader) host2, _ := p2pimpl.NewHost(&validator2)
consensusValidator2 := New(host2, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
consensusValidator2.blockHash = [32]byte{} consensusValidator2.blockHash = [32]byte{}
_, msg = consensusValidator2.constructCommitMessage(consensus_proto.MessageType_COMMIT) _, msg = consensusValidator2.constructCommitMessage(consensus_proto.MessageType_COMMIT)
consensusLeader.ProcessMessageLeader(msg[1:]) consensusLeader.ProcessMessageLeader(msg[1:])
consensusValidator3 := New(p2pimpl.NewHost(validator3), "0", []p2p.Peer{validator1, validator2, validator3}, leader) host3, _ := p2pimpl.NewHost(&validator3)
consensusValidator3 := New(host3, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
consensusValidator3.blockHash = [32]byte{} consensusValidator3.blockHash = [32]byte{}
_, msg = consensusValidator3.constructCommitMessage(consensus_proto.MessageType_COMMIT) _, msg = consensusValidator3.constructCommitMessage(consensus_proto.MessageType_COMMIT)
consensusLeader.ProcessMessageLeader(msg[1:]) consensusLeader.ProcessMessageLeader(msg[1:])

@ -11,7 +11,7 @@ import (
func TestNew(test *testing.T) { func TestNew(test *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"}
validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"} validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := New(host, "0", []p2p.Peer{leader, validator}, leader)
if consensus.consensusID != 0 { if consensus.consensusID != 0 {
test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusID) test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusID)
@ -25,7 +25,7 @@ func TestNew(test *testing.T) {
test.Error("Consensus ReadySignal should be initialized") test.Error("Consensus ReadySignal should be initialized")
} }
if consensus.leader != leader { if consensus.leader.IP != leader.IP || consensus.leader.Port != leader.Port {
test.Error("Consensus Leader is set to wrong Peer") test.Error("Consensus Leader is set to wrong Peer")
} }
} }
@ -47,7 +47,7 @@ func TestRemovePeers(t *testing.T) {
peerRemove := []p2p.Peer{p1, p2} peerRemove := []p2p.Peer{p1, p2}
leader := p2p.Peer{IP: "127.0.0.1", Port: "9000", PubKey: pk5} leader := p2p.Peer{IP: "127.0.0.1", Port: "9000", PubKey: pk5}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := New(host, "0", peers, leader) consensus := New(host, "0", peers, leader)
// consensus.DebugPrintPublicKeys() // consensus.DebugPrintPublicKeys()

@ -13,7 +13,7 @@ import (
func TestConstructCommitMessage(test *testing.T) { func TestConstructCommitMessage(test *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "9992"} leader := p2p.Peer{IP: "127.0.0.1", Port: "9992"}
validator := p2p.Peer{IP: "127.0.0.1", Port: "9995"} validator := p2p.Peer{IP: "127.0.0.1", Port: "9995"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := New(host, "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
_, msg := consensus.constructCommitMessage(consensus_proto.MessageType_COMMIT) _, msg := consensus.constructCommitMessage(consensus_proto.MessageType_COMMIT)
@ -26,7 +26,7 @@ func TestConstructCommitMessage(test *testing.T) {
func TestConstructResponseMessage(test *testing.T) { func TestConstructResponseMessage(test *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"}
validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"} validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := New(host, "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
msg := consensus.constructResponseMessage(consensus_proto.MessageType_RESPONSE, crypto.Ed25519Curve.Scalar()) msg := consensus.constructResponseMessage(consensus_proto.MessageType_RESPONSE, crypto.Ed25519Curve.Scalar())

@ -36,7 +36,8 @@ func TestProcessMessageValidatorAnnounce(test *testing.T) {
m.EXPECT().GetSelfPeer().Return(leader) m.EXPECT().GetSelfPeer().Return(leader)
m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(1) m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(1)
consensusLeader := New(p2pimpl.NewHost(leader), "0", []p2p.Peer{validator1, validator2, validator3}, leader) host, _ := p2pimpl.NewHost(&leader)
consensusLeader := New(host, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
blockBytes, err := hex.DecodeString("f90461f90222a0f7007987c6f26b20cbd6384e3587445eca556beb6716f8eb6a2f590ce8ed3925940000000000000000000000000000000000000000a0f4f2f8416b65c98890630b105f016370abaab236c92faf7fc73a13d037958c52a0db025c6f785698feb447b509908fe488486062e4607afaae85c3336692445b01a03688be0d6b3d0651911204b4539e11096045cacbb676401e2655653823014c8cb90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008001850254a0e6f88303295d845c2e4f0e80a00000000000000000000000000000000000000000000000000000000000000000880000000000000000840000000080b842000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f90238f9023580808083081650808b069e10de76676d08000000b901db6080604052678ac7230489e8000060015560028054600160a060020a031916331790556101aa806100316000396000f3fe608060405260043610610045577c0100000000000000000000000000000000000000000000000000000000600035046327c78c42811461004a5780634ddd108a1461008c575b600080fd5b34801561005657600080fd5b5061008a6004803603602081101561006d57600080fd5b503573ffffffffffffffffffffffffffffffffffffffff166100b3565b005b34801561009857600080fd5b506100a1610179565b60408051918252519081900360200190f35b60025473ffffffffffffffffffffffffffffffffffffffff1633146100d757600080fd5b600154303110156100e757600080fd5b73ffffffffffffffffffffffffffffffffffffffff811660009081526020819052604090205460ff161561011a57600080fd5b73ffffffffffffffffffffffffffffffffffffffff8116600081815260208190526040808220805460ff1916600190811790915554905181156108fc0292818181858888f19350505050158015610175573d6000803e3d6000fd5b5050565b30319056fea165627a7a723058203e799228fee2fa7c5d15e71c04267a0cc2687c5eff3b48b98f21f355e1064ab300291ba0a87b9130f7f127af3a713a270610da48d56dedc9501e624bdfe04871859c88f3a05a94b087c05c6395825c5fc35d5ce96b2e61f0ce5f2d67b28f9b2d1178fa90f0c0") blockBytes, err := hex.DecodeString("f90461f90222a0f7007987c6f26b20cbd6384e3587445eca556beb6716f8eb6a2f590ce8ed3925940000000000000000000000000000000000000000a0f4f2f8416b65c98890630b105f016370abaab236c92faf7fc73a13d037958c52a0db025c6f785698feb447b509908fe488486062e4607afaae85c3336692445b01a03688be0d6b3d0651911204b4539e11096045cacbb676401e2655653823014c8cba0e6f88303295d845c2e4f0e80a00000000000000000000000000000000000000000000000000000000000000000880000000000000000840000000080b842000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f90238f9023580808083081650808b069e10de76676d08000000b901db6080604052678ac7230489e8000060015560028054600160a060020a031916331790556101aa806100316000396000f3fe608060405260043610610045577c0100000000000000000000000000000000000000000000000000000000600035046327c78c42811461004a5780634ddd108a1461008c575b600080fd5b34801561005657600080fd5b5061008a6004803603602081101561006d57600080fd5b503573ffffffffffffffffffffffffffffffffffffffff166100b3565b005b34801561009857600080fd5b506100a1610179565b60408051918252519081900360200190f35b60025473ffffffffffffffffffffffffffffffffffffffff1633146100d757600080fd5b600154303110156100e757600080fd5b73ffffffffffffffffffffffffffffffffffffffff811660009081526020819052604090205460ff161561011a57600080fd5b73ffffffffffffffffffffffffffffffffffffffff8116600081815260208190526040808220805460ff1916600190811790915554905181156108fc0292818181858888f19350505050158015610175573d6000803e3d6000fd5b5050565b30319056fea165627a7a723058203e799228fee2fa7c5d15e71c04267a0cc2687c5eff3b48b98f21f355e1064ab300291ba0a87b9130f7f127af3a713a270610da48d56dedc9501e624bdfe04871859c88f3a05a94b087c05c6395825c5fc35d5ce96b2e61f0ce5f2d67b28f9b2d1178fa90f0c0")
consensusLeader.block = blockBytes consensusLeader.block = blockBytes
hashBytes, err := hex.DecodeString("2e002b2b91a08b6e94d21200103828d9f2ae7cd9eb0c26d2679966699486dee1") hashBytes, err := hex.DecodeString("2e002b2b91a08b6e94d21200103828d9f2ae7cd9eb0c26d2679966699486dee1")
@ -85,7 +86,8 @@ func TestProcessMessageValidatorChallenge(test *testing.T) {
m.EXPECT().GetSelfPeer().Return(leader) m.EXPECT().GetSelfPeer().Return(leader)
m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(2) m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(2)
consensusLeader := New(p2pimpl.NewHost(leader), "0", []p2p.Peer{validator1, validator2, validator3}, leader) host, _ := p2pimpl.NewHost(&leader)
consensusLeader := New(host, "0", []p2p.Peer{validator1, validator2, validator3}, leader)
blockBytes, err := hex.DecodeString("f90461f90222a0f7007987c6f26b20cbd6384e3587445eca556beb6716f8eb6a2f590ce8ed3925940000000000000000000000000000000000000000a0f4f2f8416b65c98890630b105f016370abaab236c92faf7fc73a13d037958c52a0db025c6f785698feb447b509908fe488486062e4607afaae85c3336692445b01a03688be0d6b3d0651911204b4539e11096045cacbb676401e2655653823014c8cba0e6f88303295d845c2e4f0e80a00000000000000000000000000000000000000000000000000000000000000000880000000000000000840000000080b842000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f90238f9023580808083081650808b069e10de76676d08000000b901db6080604052678ac7230489e8000060015560028054600160a060020a031916331790556101aa806100316000396000f3fe608060405260043610610045577c0100000000000000000000000000000000000000000000000000000000600035046327c78c42811461004a5780634ddd108a1461008c575b600080fd5b34801561005657600080fd5b5061008a6004803603602081101561006d57600080fd5b503573ffffffffffffffffffffffffffffffffffffffff166100b3565b005b34801561009857600080fd5b506100a1610179565b60408051918252519081900360200190f35b60025473ffffffffffffffffffffffffffffffffffffffff1633146100d757600080fd5b600154303110156100e757600080fd5b73ffffffffffffffffffffffffffffffffffffffff811660009081526020819052604090205460ff161561011a57600080fd5b73ffffffffffffffffffffffffffffffffffffffff8116600081815260208190526040808220805460ff1916600190811790915554905181156108fc0292818181858888f19350505050158015610175573d6000803e3d6000fd5b5050565b30319056fea165627a7a723058203e799228fee2fa7c5d15e71c04267a0cc2687c5eff3b48b98f21f355e1064ab300291ba0a87b9130f7f127af3a713a270610da48d56dedc9501e624bdfe04871859c88f3a05a94b087c05c6395825c5fc35d5ce96b2e61f0ce5f2d67b28f9b2d1178fa90f0c0") blockBytes, err := hex.DecodeString("f90461f90222a0f7007987c6f26b20cbd6384e3587445eca556beb6716f8eb6a2f590ce8ed3925940000000000000000000000000000000000000000a0f4f2f8416b65c98890630b105f016370abaab236c92faf7fc73a13d037958c52a0db025c6f785698feb447b509908fe488486062e4607afaae85c3336692445b01a03688be0d6b3d0651911204b4539e11096045cacbb676401e2655653823014c8cba0e6f88303295d845c2e4f0e80a00000000000000000000000000000000000000000000000000000000000000000880000000000000000840000000080b842000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f90238f9023580808083081650808b069e10de76676d08000000b901db6080604052678ac7230489e8000060015560028054600160a060020a031916331790556101aa806100316000396000f3fe608060405260043610610045577c0100000000000000000000000000000000000000000000000000000000600035046327c78c42811461004a5780634ddd108a1461008c575b600080fd5b34801561005657600080fd5b5061008a6004803603602081101561006d57600080fd5b503573ffffffffffffffffffffffffffffffffffffffff166100b3565b005b34801561009857600080fd5b506100a1610179565b60408051918252519081900360200190f35b60025473ffffffffffffffffffffffffffffffffffffffff1633146100d757600080fd5b600154303110156100e757600080fd5b73ffffffffffffffffffffffffffffffffffffffff811660009081526020819052604090205460ff161561011a57600080fd5b73ffffffffffffffffffffffffffffffffffffffff8116600081815260208190526040808220805460ff1916600190811790915554905181156108fc0292818181858888f19350505050158015610175573d6000803e3d6000fd5b5050565b30319056fea165627a7a723058203e799228fee2fa7c5d15e71c04267a0cc2687c5eff3b48b98f21f355e1064ab300291ba0a87b9130f7f127af3a713a270610da48d56dedc9501e624bdfe04871859c88f3a05a94b087c05c6395825c5fc35d5ce96b2e61f0ce5f2d67b28f9b2d1178fa90f0c0")
consensusLeader.block = blockBytes consensusLeader.block = blockBytes
hashBytes, err := hex.DecodeString("2e002b2b91a08b6e94d21200103828d9f2ae7cd9eb0c26d2679966699486dee1") hashBytes, err := hex.DecodeString("2e002b2b91a08b6e94d21200103828d9f2ae7cd9eb0c26d2679966699486dee1")

@ -11,12 +11,14 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/api/proto/bcconn" "github.com/harmony-one/harmony/api/proto/bcconn"
proto_identity "github.com/harmony-one/harmony/api/proto/identity" proto_identity "github.com/harmony-one/harmony/api/proto/identity"
"github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/internal/beaconchain/rpc" "github.com/harmony-one/harmony/internal/beaconchain/rpc"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl" "github.com/harmony-one/harmony/p2p/p2pimpl"
peer "github.com/libp2p/go-libp2p-peer"
) )
//BCState keeps track of the state the beaconchain is in //BCState keeps track of the state the beaconchain is in
@ -30,8 +32,8 @@ const BeaconchainServicePortDiff = 4444
//BCInfo is the information that needs to be stored on the disk in order to allow for a restart. //BCInfo is the information that needs to be stored on the disk in order to allow for a restart.
type BCInfo struct { type BCInfo struct {
Leaders []*bcconn.NodeInfo `json:"leaders"` Leaders []*node.Info `json:"leaders"`
ShardLeaderMap map[int]*bcconn.NodeInfo `json:"shardLeaderMap"` ShardLeaderMap map[int]*node.Info `json:"shardLeaderMap"`
NumberOfShards int `json:"numShards"` NumberOfShards int `json:"numShards"`
NumberOfNodesAdded int `json:"numNodesAdded"` NumberOfNodesAdded int `json:"numNodesAdded"`
IP string `json:"ip"` IP string `json:"ip"`
@ -42,11 +44,13 @@ type BCInfo struct {
type BeaconChain struct { type BeaconChain struct {
BCInfo BCInfo BCInfo BCInfo
log log.Logger log log.Logger
ShardLeaderMap map[int]*bcconn.NodeInfo ShardLeaderMap map[int]*node.Info
PubKey kyber.Point PubKey kyber.Point
host host.Host host host.Host
state BCState state BCState
rpcServer *beaconchain.Server rpcServer *beaconchain.Server
Peer p2p.Peer
Self p2p.Peer // self Peer
} }
//SaveFile is to store the file in which beaconchain info will be stored. //SaveFile is to store the file in which beaconchain info will be stored.
@ -80,8 +84,8 @@ func (bc *BeaconChain) StartRPCServer() {
} }
// GetShardLeaderMap returns the map from shard id to leader. // GetShardLeaderMap returns the map from shard id to leader.
func (bc *BeaconChain) GetShardLeaderMap() map[int]*bcconn.NodeInfo { func (bc *BeaconChain) GetShardLeaderMap() map[int]*node.Info {
result := make(map[int]*bcconn.NodeInfo) result := make(map[int]*node.Info)
for i, leader := range bc.BCInfo.Leaders { for i, leader := range bc.BCInfo.Leaders {
result[i] = leader result[i] = leader
} }
@ -93,11 +97,12 @@ func New(numShards int, ip, port string) *BeaconChain {
bc := BeaconChain{} bc := BeaconChain{}
bc.log = log.New() bc.log = log.New()
bc.PubKey = generateBCKey() bc.PubKey = generateBCKey()
bc.host = p2pimpl.NewHost(p2p.Peer{IP: ip, Port: port}) bc.Self = p2p.Peer{IP: ip, Port: port}
bc.host, _ = p2pimpl.NewHost(&bc.Self)
bcinfo := &BCInfo{NumberOfShards: numShards, NumberOfNodesAdded: 0, bcinfo := &BCInfo{NumberOfShards: numShards, NumberOfNodesAdded: 0,
IP: ip, IP: ip,
Port: port, Port: port,
ShardLeaderMap: make(map[int]*bcconn.NodeInfo)} ShardLeaderMap: make(map[int]*node.Info)}
bc.BCInfo = *bcinfo bc.BCInfo = *bcinfo
return &bc return &bc
} }
@ -110,9 +115,12 @@ func generateBCKey() kyber.Point {
} }
//AcceptNodeInfo deserializes node information received via beaconchain handler //AcceptNodeInfo deserializes node information received via beaconchain handler
func (bc *BeaconChain) AcceptNodeInfo(b []byte) *bcconn.NodeInfo { func (bc *BeaconChain) AcceptNodeInfo(b []byte) *node.Info {
Node := bcconn.DeserializeNodeInfo(b) Node := bcconn.DeserializeNodeInfo(b)
bc.log.Info("New Node Connection", "IP", Node.Self.IP, "Port", Node.Self.Port) bc.log.Info("New Node Connection", "IP", Node.IP, "Port", Node.Port, "PeerID", Node.PeerID)
bc.Peer = p2p.Peer{IP: Node.IP, Port: Node.Port, PeerID: Node.PeerID}
bc.host.AddPeer(&bc.Peer)
bc.BCInfo.NumberOfNodesAdded = bc.BCInfo.NumberOfNodesAdded + 1 bc.BCInfo.NumberOfNodesAdded = bc.BCInfo.NumberOfNodesAdded + 1
shardNum, isLeader := utils.AllocateShard(bc.BCInfo.NumberOfNodesAdded, bc.BCInfo.NumberOfShards) shardNum, isLeader := utils.AllocateShard(bc.BCInfo.NumberOfNodesAdded, bc.BCInfo.NumberOfShards)
if isLeader { if isLeader {
@ -125,13 +133,13 @@ func (bc *BeaconChain) AcceptNodeInfo(b []byte) *bcconn.NodeInfo {
} }
//RespondRandomness sends a randomness beacon to the node inorder for it process what shard it will be in //RespondRandomness sends a randomness beacon to the node inorder for it process what shard it will be in
func (bc *BeaconChain) RespondRandomness(Node *bcconn.NodeInfo) { func (bc *BeaconChain) RespondRandomness(Node *node.Info) {
bci := bc.BCInfo bci := bc.BCInfo
response := bcconn.ResponseRandomNumber{NumberOfShards: bci.NumberOfShards, NumberOfNodesAdded: bci.NumberOfNodesAdded, Leaders: bci.Leaders} response := bcconn.ResponseRandomNumber{NumberOfShards: bci.NumberOfShards, NumberOfNodesAdded: bci.NumberOfNodesAdded, Leaders: bci.Leaders}
msg := bcconn.SerializeRandomInfo(response) msg := bcconn.SerializeRandomInfo(response)
msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, msg) msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, msg)
bc.log.Info("Sent Out Msg", "# Nodes", response.NumberOfNodesAdded) bc.log.Info("Sent Out Msg", "# Nodes", response.NumberOfNodesAdded)
host.SendMessage(bc.host, Node.Self, msgToSend, nil) host.SendMessage(bc.host, bc.Peer, msgToSend, nil)
bc.state = RandomInfoSent bc.state = RandomInfoSent
} }
@ -185,3 +193,8 @@ func BCItoBC(bci *BCInfo) *BeaconChain {
func SetSaveFile(path string) { func SetSaveFile(path string) {
SaveFile = path SaveFile = path
} }
//GetID return ID
func (bc *BeaconChain) GetID() peer.ID {
return bc.host.GetID()
}

@ -8,16 +8,16 @@ import (
"testing" "testing"
"github.com/harmony-one/harmony/api/proto/bcconn" "github.com/harmony-one/harmony/api/proto/bcconn"
"github.com/harmony-one/harmony/api/proto/node"
beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc" beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc"
"github.com/harmony-one/harmony/p2p"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
var ( var (
leader1 = &bcconn.NodeInfo{Self: p2p.Peer{IP: "127.0.0.1", Port: "1"}} leader1 = &node.Info{IP: "127.0.0.1", Port: "9981"}
leader2 = &bcconn.NodeInfo{Self: p2p.Peer{IP: "127.0.0.1", Port: "2"}} leader2 = &node.Info{IP: "127.0.0.1", Port: "9982"}
leaders = []*bcconn.NodeInfo{leader1, leader2} leaders = []*node.Info{leader1, leader2}
shardLeaderMap = map[int]*bcconn.NodeInfo{ shardLeaderMap = map[int]*node.Info{
0: leader1, 0: leader1,
1: leader2, 1: leader2,
} }
@ -69,7 +69,7 @@ func TestFetchLeaders(t *testing.T) {
bcClient := beaconchain.NewClient("127.0.0.1", strconv.Itoa(port+BeaconchainServicePortDiff)) bcClient := beaconchain.NewClient("127.0.0.1", strconv.Itoa(port+BeaconchainServicePortDiff))
response := bcClient.GetLeaders() response := bcClient.GetLeaders()
retleaders := response.GetLeaders() retleaders := response.GetLeaders()
if !(retleaders[0].GetIp() == leaders[0].Self.IP || retleaders[0].GetPort() == leaders[0].Self.Port || retleaders[1].GetPort() == leaders[1].Self.Port) { if !(retleaders[0].GetIp() == leaders[0].IP || retleaders[0].GetPort() == leaders[0].Port || retleaders[1].GetPort() == leaders[1].Port) {
t.Error("Fetch leaders response is not as expected") t.Error("Fetch leaders response is not as expected")
} }

@ -5,7 +5,7 @@ import (
"log" "log"
"net" "net"
"github.com/harmony-one/harmony/api/proto/bcconn" "github.com/harmony-one/harmony/api/proto/node"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -14,7 +14,7 @@ import (
// Server is the Server struct for beacon chain package. // Server is the Server struct for beacon chain package.
type Server struct { type Server struct {
shardLeaderMap func() map[int]*bcconn.NodeInfo shardLeaderMap func() map[int]*node.Info
} }
// FetchLeaders implements the FetchLeaders interface to return current leaders. // FetchLeaders implements the FetchLeaders interface to return current leaders.
@ -23,7 +23,7 @@ func (s *Server) FetchLeaders(ctx context.Context, request *proto.FetchLeadersRe
leaders := []*proto.FetchLeadersResponse_Leader{} leaders := []*proto.FetchLeadersResponse_Leader{}
for shardID, leader := range s.shardLeaderMap() { for shardID, leader := range s.shardLeaderMap() {
leaders = append(leaders, &proto.FetchLeadersResponse_Leader{Ip: leader.Self.IP, Port: leader.Self.Port, ShardId: uint32(shardID)}) leaders = append(leaders, &proto.FetchLeadersResponse_Leader{Ip: leader.IP, Port: leader.Port, ShardId: uint32(shardID)})
} }
log.Println(leaders) log.Println(leaders)
return &proto.FetchLeadersResponse{Leaders: leaders}, nil return &proto.FetchLeadersResponse{Leaders: leaders}, nil
@ -45,7 +45,7 @@ func (s *Server) Start(ip, port string) (*grpc.Server, error) {
} }
// NewServer creates new Server which implements BeaconChainServiceServer interface. // NewServer creates new Server which implements BeaconChainServiceServer interface.
func NewServer(shardLeaderMap func() map[int]*bcconn.NodeInfo) *Server { func NewServer(shardLeaderMap func() map[int]*node.Info) *Server {
s := &Server{shardLeaderMap} s := &Server{shardLeaderMap}
return s return s
} }

@ -12,11 +12,14 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/api/proto/bcconn" "github.com/harmony-one/harmony/api/proto/bcconn"
proto_identity "github.com/harmony-one/harmony/api/proto/identity" proto_identity "github.com/harmony-one/harmony/api/proto/identity"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl" "github.com/harmony-one/harmony/p2p/p2pimpl"
multiaddr "github.com/multiformats/go-multiaddr"
) )
//NewNode is ther struct for a candidate node //NewNode is ther struct for a candidate node
@ -31,7 +34,7 @@ type NewNode struct {
PubK kyber.Point PubK kyber.Point
priK kyber.Scalar priK kyber.Scalar
log log.Logger log log.Logger
SetInfo bool SetInfo chan bool
host host.Host host host.Host
} }
@ -39,12 +42,17 @@ type NewNode struct {
func New(ip string, port string) *NewNode { func New(ip string, port string) *NewNode {
priKey, pubKey := utils.GenKey(ip, port) priKey, pubKey := utils.GenKey(ip, port)
var node NewNode var node NewNode
var err error
node.PubK = pubKey node.PubK = pubKey
node.priK = priKey node.priK = priKey
node.Self = p2p.Peer{IP: ip, Port: port, PubKey: pubKey, ValidatorID: -1} node.Self = p2p.Peer{IP: ip, Port: port, PubKey: pubKey, ValidatorID: -1}
node.log = log.New() node.log = log.New()
node.SetInfo = false node.SetInfo = make(chan bool)
node.host = p2pimpl.NewHost(node.Self) node.host, err = p2pimpl.NewHost(&node.Self)
if err != nil {
node.log.Error("failed to create new host", "msg", err)
return nil
}
node.Leaders = map[uint32]p2p.Peer{} node.Leaders = map[uint32]p2p.Peer{}
return &node return &node
} }
@ -52,7 +60,7 @@ func New(ip string, port string) *NewNode {
type registerResponseRandomNumber struct { type registerResponseRandomNumber struct {
NumberOfShards int NumberOfShards int
NumberOfNodesAdded int NumberOfNodesAdded int
Leaders []*bcconn.NodeInfo Leaders []*proto_node.Info
} }
// ContactBeaconChain starts a newservice in the candidate node // ContactBeaconChain starts a newservice in the candidate node
@ -62,7 +70,7 @@ func (node *NewNode) ContactBeaconChain(BCPeer p2p.Peer) error {
} }
func (node NewNode) String() string { func (node NewNode) String() string {
return fmt.Sprintf("bc: %v:%v and node info %v", node.Self.IP, node.Self.Port, node.SetInfo) return fmt.Sprintf("bc: %v:%v => %v", node.Self.IP, node.Self.Port, node.Self.PeerID)
} }
// RequestBeaconChain requests beacon chain for identity data // RequestBeaconChain requests beacon chain for identity data
@ -72,13 +80,12 @@ func (node *NewNode) requestBeaconChain(BCPeer p2p.Peer) (err error) {
if err != nil { if err != nil {
node.log.Error("Could not Marshall public key into binary") node.log.Error("Could not Marshall public key into binary")
} }
p := p2p.Peer{IP: node.Self.IP, Port: node.Self.Port} nodeInfo := &proto_node.Info{IP: node.Self.IP, Port: node.Self.Port, PubKey: pubk, PeerID: node.host.GetID()}
nodeInfo := &bcconn.NodeInfo{Self: p, PubK: pubk}
msg := bcconn.SerializeNodeInfo(nodeInfo) msg := bcconn.SerializeNodeInfo(nodeInfo)
msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Register, msg) msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Register, msg)
gotShardInfo := false gotShardInfo := false
timeout := time.After(300 * time.Second) timeout := time.After(2 * time.Minute)
tick := time.Tick(5 * time.Second) tick := time.Tick(3 * time.Second)
checkLoop: checkLoop:
for { for {
select { select {
@ -86,18 +93,21 @@ checkLoop:
gotShardInfo = false gotShardInfo = false
break checkLoop break checkLoop
case <-tick: case <-tick:
if node.SetInfo { select {
case setinfo := <-node.SetInfo:
if setinfo {
gotShardInfo = true gotShardInfo = true
break checkLoop break checkLoop
} else { }
default:
host.SendMessage(node.host, BCPeer, msgToSend, nil) host.SendMessage(node.host, BCPeer, msgToSend, nil)
} }
} }
} }
if !gotShardInfo { if !gotShardInfo {
err = errors.New("could not create connection") err = errors.New("could not create connection")
node.log.Crit("Could not get sharding info after 5 minutes") node.log.Crit("Could not get sharding info after 2 minutes")
os.Exit(1) os.Exit(10)
} }
return return
} }
@ -108,9 +118,18 @@ func (node *NewNode) processShardInfo(msgPayload []byte) bool {
leaders := leadersInfo.Leaders leaders := leadersInfo.Leaders
shardNum, isLeader := utils.AllocateShard(leadersInfo.NumberOfNodesAdded, leadersInfo.NumberOfShards) shardNum, isLeader := utils.AllocateShard(leadersInfo.NumberOfNodesAdded, leadersInfo.NumberOfShards)
for n, v := range leaders { for n, v := range leaders {
leaderPeer := p2p.Peer{IP: v.Self.IP, Port: v.Self.Port} leaderPeer := p2p.Peer{IP: v.IP, Port: v.Port, PeerID: v.PeerID}
addr := fmt.Sprintf("/ip4/%s/tcp/%s", leaderPeer.IP, leaderPeer.Port)
targetAddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
log.Error("processShardInfo NewMultiaddr error", "error", err)
return false
}
leaderPeer.Addrs = append(leaderPeer.Addrs, targetAddr)
leaderPeer.PubKey = crypto.Ed25519Curve.Point() leaderPeer.PubKey = crypto.Ed25519Curve.Point()
err := leaderPeer.PubKey.UnmarshalBinary(v.PubK[:]) err = leaderPeer.PubKey.UnmarshalBinary(v.PubKey[:])
if err != nil { if err != nil {
node.log.Error("Could not unmarshall leaders public key from binary to kyber.point") node.log.Error("Could not unmarshall leaders public key from binary to kyber.point")
} }
@ -120,7 +139,7 @@ func (node *NewNode) processShardInfo(msgPayload []byte) bool {
node.leader = node.Leaders[uint32(shardNum-1)] node.leader = node.Leaders[uint32(shardNum-1)]
node.isLeader = isLeader node.isLeader = isLeader
node.ShardID = shardNum - 1 //0 indexing. node.ShardID = shardNum - 1 //0 indexing.
node.SetInfo = true node.SetInfo <- true
node.log.Info("Shard information obtained ..") node.log.Info("Shard information obtained ..")
return true return true
} }
@ -144,3 +163,8 @@ func (node *NewNode) GetClientPeer() *p2p.Peer {
func (node *NewNode) GetSelfPeer() p2p.Peer { func (node *NewNode) GetSelfPeer() p2p.Peer {
return node.Self return node.Self
} }
// AddPeer add new peer for newnode
func (node *NewNode) AddPeer(p *p2p.Peer) error {
return node.host.AddPeer(p)
}

@ -1,38 +1,59 @@
package newnode package newnode
import ( import (
"fmt"
"testing" "testing"
"time"
beaconchain "github.com/harmony-one/harmony/internal/beaconchain/libs" beaconchain "github.com/harmony-one/harmony/internal/beaconchain/libs"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
peerstore "github.com/libp2p/go-libp2p-peerstore"
multiaddr "github.com/multiformats/go-multiaddr"
) )
func TestNewNode(t *testing.T) { func TestNewNode(t *testing.T) {
var ip, port string var ip, port string
ip = "127.0.0.1" ip = "127.0.0.1"
port = "8080" port = "8088"
nnode := New(ip, port) nnode := New(ip, port)
if nnode.PubK == nil { if nnode.PubK == nil {
t.Error("new node public key not initialized") t.Error("new node public key not initialized")
} }
if nnode.SetInfo {
t.Error("new node setinfo initialized to true! (should be false)")
}
} }
func TestBeaconChainConnect(t *testing.T) { func TestBeaconChainConnect(t *testing.T) {
var ip, beaconport, nodeport string var ip, beaconport, bcma, nodeport string
ip = "127.0.0.1" ip = "127.0.0.1"
beaconport = "9080" beaconport = "8081"
nodeport = "9081" nodeport = "9081"
nnode := New(ip, nodeport) nnode := New(ip, nodeport)
bc := beaconchain.New(1, ip, beaconport) bc := beaconchain.New(1, ip, beaconport)
bcma = fmt.Sprintf("/ip4/%s/tcp/%s/ipfs/%s", bc.Self.IP, bc.Self.Port, bc.GetID().Pretty())
go bc.StartServer() go bc.StartServer()
BCPeer := p2p.Peer{IP: ip, Port: beaconport} time.Sleep(3 * time.Second)
err := nnode.ContactBeaconChain(BCPeer)
maddr, err := multiaddr.NewMultiaddr(bcma)
if err != nil { if err != nil {
t.Error("could not read from connection") t.Errorf("new multiaddr error: %v", err)
}
// Extract the peer ID from the multiaddr.
info, err2 := peerstore.InfoFromP2pAddr(maddr)
if err2 != nil {
t.Errorf("info from p2p addr error: %v", err2)
}
BCPeer := &p2p.Peer{IP: ip, Port: beaconport, Addrs: info.Addrs, PeerID: info.ID}
nnode.AddPeer(BCPeer)
err3 := nnode.ContactBeaconChain(*BCPeer)
if err3 != nil {
t.Errorf("could not read from connection: %v", err3)
} }
} }

@ -6,11 +6,14 @@ import (
"encoding/json" "encoding/json"
"io" "io"
"log" "log"
mrand "math/rand"
"os" "os"
"regexp" "regexp"
"strconv" "strconv"
"sync" "sync"
p2p_crypto "github.com/libp2p/go-libp2p-crypto"
"github.com/dedis/kyber" "github.com/dedis/kyber"
"github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/crypto/pki"
@ -70,6 +73,17 @@ func GenKey(ip, port string) (kyber.Scalar, kyber.Point) {
return priKey, pubKey return priKey, pubKey
} }
// GenKeyP2P generates a pair of RSA keys used in libp2p host
func GenKeyP2P(ip, port string) (p2p_crypto.PrivKey, p2p_crypto.PubKey, error) {
r := mrand.New(mrand.NewSource(int64(GetUniqueIDFromIPPort(ip, port))))
return p2p_crypto.GenerateKeyPairWithReader(p2p_crypto.RSA, 2048, r)
}
// GenKeyP2PRand generates a pair of RSA keys used in libp2p host, using random seed
func GenKeyP2PRand() (p2p_crypto.PrivKey, p2p_crypto.PubKey, error) {
return p2p_crypto.GenerateKeyPair(p2p_crypto.RSA, 2048)
}
// AllocateShard uses the number of current nodes and number of shards // AllocateShard uses the number of current nodes and number of shards
// to return the shardNum a new node belongs to, it also tells whether the node is a leader // to return the shardNum a new node belongs to, it also tells whether the node is a leader
func AllocateShard(numOfAddedNodes, numOfShards int) (int, bool) { func AllocateShard(numOfAddedNodes, numOfShards int) (int, bool) {

@ -4,6 +4,7 @@ import (
"testing" "testing"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
crypto "github.com/libp2p/go-libp2p-crypto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -47,6 +48,32 @@ func TestGenKey(t *testing.T) {
GenKey("3.3.3.3", "3456") GenKey("3.3.3.3", "3456")
} }
// Test for GenKeyP2P, noted the length of private key can be random
// thus we don't test it here.
func TestGenKeyP2P(t *testing.T) {
_, pb, err := GenKeyP2P("127.0.0.1", "8888")
if err != nil {
t.Errorf("GenKeyP2p Error: %v", err)
}
kpb, _ := crypto.MarshalPublicKey(pb)
if len(kpb) != 299 {
t.Errorf("Length of Public Key Error: %v, expected 299", len(kpb))
}
}
// Test for GenKeyP2PRand, noted the length of private key can be random
// thus we don't test it here.
func TestGenKeyP2PRand(t *testing.T) {
_, pb, err := GenKeyP2PRand()
if err != nil {
t.Errorf("GenKeyP2PRand Error: %v", err)
}
kpb, _ := crypto.MarshalPublicKey(pb)
if len(kpb) != 299 {
t.Errorf("Length of Public Key Error: %v, expected 299", len(kpb))
}
}
// Test for GetUniqueIDFromPeer // Test for GetUniqueIDFromPeer
func TestGetUniqueIDFromPeer(t *testing.T) { func TestGetUniqueIDFromPeer(t *testing.T) {
peer := p2p.Peer{IP: "1.1.1.1", Port: "123"} peer := p2p.Peer{IP: "1.1.1.1", Port: "123"}

@ -300,6 +300,7 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int {
if !ok { if !ok {
node.Neighbors.Store(key, *p) node.Neighbors.Store(key, *p)
count++ count++
node.host.AddPeer(p)
continue continue
} }
if node.SelfPeer.ValidatorID == -1 && p.IP == node.SelfPeer.IP && p.Port == node.SelfPeer.Port { if node.SelfPeer.ValidatorID == -1 && p.IP == node.SelfPeer.IP && p.Port == node.SelfPeer.Port {

@ -290,6 +290,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
peer := new(p2p.Peer) peer := new(p2p.Peer)
peer.IP = ping.Node.IP peer.IP = ping.Node.IP
peer.Port = ping.Node.Port peer.Port = ping.Node.Port
peer.PeerID = ping.Node.PeerID
peer.ValidatorID = ping.Node.ValidatorID peer.ValidatorID = ping.Node.ValidatorID
peer.PubKey = hmy_crypto.Ed25519Curve.Point() peer.PubKey = hmy_crypto.Ed25519Curve.Point()
@ -345,6 +346,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
peer.IP = p.IP peer.IP = p.IP
peer.Port = p.Port peer.Port = p.Port
peer.ValidatorID = p.ValidatorID peer.ValidatorID = p.ValidatorID
peer.PeerID = p.PeerID
peer.PubKey = hmy_crypto.Ed25519Curve.Point() peer.PubKey = hmy_crypto.Ed25519Curve.Point()
err = peer.PubKey.UnmarshalBinary(p.PubKey[:]) err = peer.PubKey.UnmarshalBinary(p.PubKey[:])

@ -13,7 +13,7 @@ func TestNodeStreamHandler(t *testing.T) {
_, pubKey := utils.GenKey("1", "2") _, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey} leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey}
validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"} validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
node := New(host, consensus, nil) node := New(host, consensus, nil)
@ -33,7 +33,7 @@ func TestAddNewBlock(t *testing.T) {
_, pubKey := utils.GenKey("1", "2") _, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", PubKey: pubKey} leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", PubKey: pubKey}
validator := p2p.Peer{IP: "127.0.0.1", Port: "9885"} validator := p2p.Peer{IP: "127.0.0.1", Port: "9885"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
node := New(host, consensus, nil) node := New(host, consensus, nil)
@ -52,7 +52,7 @@ func TestVerifyNewBlock(t *testing.T) {
_, pubKey := utils.GenKey("1", "2") _, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey} leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey}
validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"} validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
node := New(host, consensus, nil) node := New(host, consensus, nil)

@ -19,7 +19,7 @@ func TestNewNode(t *testing.T) {
_, pubKey := utils.GenKey("1", "2") _, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey} leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey}
validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"} validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
node := New(host, consensus, nil) node := New(host, consensus, nil)
if node.Consensus == nil { if node.Consensus == nil {
@ -39,7 +39,7 @@ func TestGetSyncingPeers(t *testing.T) {
_, pubKey := utils.GenKey("1", "2") _, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey} leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey}
validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"} validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
node := New(host, consensus, nil) node := New(host, consensus, nil)
@ -51,8 +51,8 @@ func TestGetSyncingPeers(t *testing.T) {
if len(res) != 1 || !(res[0].IP == peer.IP || res[0].IP == peer2.IP) { if len(res) != 1 || !(res[0].IP == peer.IP || res[0].IP == peer2.IP) {
t.Error("GetSyncingPeers should return list of {peer, peer2}") t.Error("GetSyncingPeers should return list of {peer, peer2}")
} }
if len(res) != 1 || res[0].Port != "5000" { if len(res) != 1 || (res[0].Port != "5000" && res[0].Port != "5001") {
t.Error("Syncing ports should be 5000") t.Errorf("Syncing ports should be 5000, got %v", res[0].Port)
} }
} }
@ -82,7 +82,7 @@ func TestAddPeers(t *testing.T) {
_, pubKey := utils.GenKey("1", "2") _, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "127.0.0.1", Port: "8982", PubKey: pubKey} leader := p2p.Peer{IP: "127.0.0.1", Port: "8982", PubKey: pubKey}
validator := p2p.Peer{IP: "127.0.0.1", Port: "8985"} validator := p2p.Peer{IP: "127.0.0.1", Port: "8985"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
node := New(host, consensus, nil) node := New(host, consensus, nil)
@ -155,7 +155,7 @@ func TestPingPongHandler(test *testing.T) {
_, pubKey := utils.GenKey("127.0.0.1", "8881") _, pubKey := utils.GenKey("127.0.0.1", "8881")
leader := p2p.Peer{IP: "127.0.0.1", Port: "8881", PubKey: pubKey} leader := p2p.Peer{IP: "127.0.0.1", Port: "8881", PubKey: pubKey}
// validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"} // validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"}
host := p2pimpl.NewHost(leader) host, _ := p2pimpl.NewHost(&leader)
consensus := consensus.New(host, "0", []p2p.Peer{leader}, leader) consensus := consensus.New(host, "0", []p2p.Peer{leader}, leader)
node := New(host, consensus, nil) node := New(host, consensus, nil)
//go sendPingMessage(leader) //go sendPingMessage(leader)

@ -39,7 +39,7 @@ func ReadMessageContent(s Stream) ([]byte, error) {
_, err := r.ReadByte() _, err := r.ReadByte()
switch err { switch err {
case io.EOF: case io.EOF:
log.Error("Error reading the p2p message type field", "msg", err) log.Error("Error reading the p2p message type field", "io.EOF", err)
return contentBuf.Bytes(), err return contentBuf.Bytes(), err
case nil: case nil:
//log.Printf("Received p2p message type: %x\n", msgType) //log.Printf("Received p2p message type: %x\n", msgType)

@ -2,6 +2,7 @@ package host
import ( import (
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
peer "github.com/libp2p/go-libp2p-peer"
) )
// Host is the client + server in p2p network. // Host is the client + server in p2p network.
@ -10,4 +11,6 @@ type Host interface {
SendMessage(p2p.Peer, []byte) error SendMessage(p2p.Peer, []byte) error
BindHandlerAndServe(handler p2p.StreamHandler) BindHandlerAndServe(handler p2p.StreamHandler)
Close() error Close() error
AddPeer(*p2p.Peer) error
GetID() peer.ID
} }

@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
peer "github.com/libp2p/go-libp2p-peer"
) )
// HostV1 is the version 1 p2p host, using direct socket call. // HostV1 is the version 1 p2p host, using direct socket call.
@ -17,10 +18,15 @@ type HostV1 struct {
quit chan struct{} quit chan struct{}
} }
// AddPeer do nothing
func (host *HostV1) AddPeer(p *p2p.Peer) error {
return nil
}
// New creates a HostV1 // New creates a HostV1
func New(self p2p.Peer) *HostV1 { func New(self *p2p.Peer) *HostV1 {
h := &HostV1{ h := &HostV1{
self: self, self: *self,
quit: make(chan struct{}, 1), quit: make(chan struct{}, 1),
} }
return h return h
@ -31,6 +37,11 @@ func (host *HostV1) GetSelfPeer() p2p.Peer {
return host.self return host.self
} }
// GetID return ID
func (host *HostV1) GetID() peer.ID {
return peer.ID(fmt.Sprintf("%s:%s", host.self.IP, host.self.Port))
}
// BindHandlerAndServe Version 0 p2p. Going to be deprecated. // BindHandlerAndServe Version 0 p2p. Going to be deprecated.
func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) { func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) {
port := host.self.Port port := host.self.Port

@ -1,13 +1,13 @@
package hostv2 package hostv2
import ( import (
"bufio"
"context" "context"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
libp2p "github.com/libp2p/go-libp2p" libp2p "github.com/libp2p/go-libp2p"
p2p_crypto "github.com/libp2p/go-libp2p-crypto"
libp2phost "github.com/libp2p/go-libp2p-host" libp2phost "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net" net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
@ -26,6 +26,36 @@ const (
type HostV2 struct { type HostV2 struct {
h libp2phost.Host h libp2phost.Host
self p2p.Peer self p2p.Peer
priKey p2p_crypto.PrivKey
}
// AddPeer add p2p.Peer into Peerstore
func (host *HostV2) AddPeer(p *p2p.Peer) error {
if p.PeerID != "" && len(p.Addrs) != 0 {
host.Peerstore().AddAddrs(p.PeerID, p.Addrs, peerstore.PermanentAddrTTL)
return nil
}
if p.PeerID == "" {
log.Error("AddPeer PeerID is EMPTY")
return fmt.Errorf("AddPeer error: peerID is empty")
}
// reconstruct the multiaddress based on ip/port
// PeerID has to be known for the ip/port
addr := fmt.Sprintf("/ip4/%s/tcp/%s", p.IP, p.Port)
targetAddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
log.Error("AddPeer NewMultiaddr error", "error", err)
return err
}
p.Addrs = append(p.Addrs, targetAddr)
host.Peerstore().AddAddrs(p.PeerID, p.Addrs, peerstore.PermanentAddrTTL)
log.Info("AddPeer add to peerstore", "peer", *p)
return nil
} }
// Peerstore returns the peer store // Peerstore returns the peer store
@ -34,24 +64,34 @@ func (host *HostV2) Peerstore() peerstore.Peerstore {
} }
// New creates a host for p2p communication // New creates a host for p2p communication
func New(self p2p.Peer) *HostV2 { func New(self p2p.Peer, priKey p2p_crypto.PrivKey) *HostV2 {
sourceAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", self.Port))
catchError(err) // TODO (leo), use the [0] of Addrs for now, need to find a reliable way of using listenAddr
p2pHost, err := libp2p.New(context.Background(), p2pHost, err := libp2p.New(context.Background(),
libp2p.ListenAddrs(sourceAddr), libp2p.ListenAddrs(self.Addrs[0]),
libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves. libp2p.Identity(priKey),
// TODO(ricl): Other features to probe // TODO(ricl): Other features to probe
// libp2p.EnableRelay; libp2p.Routing; // libp2p.EnableRelay; libp2p.Routing;
) )
catchError(err) catchError(err)
log.Debug("HostV2 is up!", "port", self.Port, "id", p2pHost.ID().Pretty(), "addr", sourceAddr) log.Debug("HostV2 is up!", "port", self.Port, "id", p2pHost.ID().Pretty(), "addr", self.Addrs)
// has to save the private key for host
h := &HostV2{ h := &HostV2{
h: p2pHost, h: p2pHost,
self: self, self: self,
priKey: priKey,
} }
return h return h
} }
// GetID returns ID.Pretty
func (host *HostV2) GetID() peer.ID {
return host.h.ID()
}
// GetSelfPeer gets self peer // GetSelfPeer gets self peer
func (host *HostV2) GetSelfPeer() p2p.Peer { func (host *HostV2) GetSelfPeer() p2p.Peer {
return host.self return host.self
@ -68,22 +108,15 @@ func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) {
// SendMessage a p2p message sending function with signature compatible to p2pv1. // SendMessage a p2p message sending function with signature compatible to p2pv1.
func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error {
addr := fmt.Sprintf("/ip4/%s/tcp/%s", p.IP, p.Port) s, err := host.h.NewStream(context.Background(), p.PeerID, ProtocolID)
targetAddr, err := multiaddr.NewMultiaddr(addr)
catchError(err)
peerID := peer.ID(addr)
host.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL)
s, err := host.h.NewStream(context.Background(), peerID, ProtocolID)
if err != nil { if err != nil {
log.Error("Failed to send message", "from", host.self, "to", p, "error", err) log.Error("Failed to send message", "from", host.self, "to", p, "error", err, "PeerID", p.PeerID)
return err return err
} }
// Create a buffered stream so that read and writes are non blocking. defer s.Close()
w := bufio.NewWriter(s) s.Write(message)
// Create a thread to read and write data.
go writeData(w, message)
return nil return nil
} }

@ -1,20 +1,39 @@
package host package host
import ( import (
"fmt"
"reflect" "reflect"
"testing" "testing"
"time" "time"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host/hostv2" "github.com/harmony-one/harmony/p2p/host/hostv2"
peer "github.com/libp2p/go-libp2p-peer"
multiaddr "github.com/multiformats/go-multiaddr"
) )
func TestSendMessage(test *testing.T) { func TestSendMessage(test *testing.T) {
peer1 := p2p.Peer{IP: "127.0.0.1", Port: "9000"} peer1 := p2p.Peer{IP: "127.0.0.1", Port: "9000"}
selfAddr1, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", peer1.Port))
peer1.Addrs = append(peer1.Addrs, selfAddr1)
priKey1, pubKey1, _ := utils.GenKeyP2P(peer1.IP, peer1.Port)
peerID1, _ := peer.IDFromPublicKey(pubKey1)
peer1.PeerID = peerID1
host1 := hostv2.New(peer1, priKey1)
peer2 := p2p.Peer{IP: "127.0.0.1", Port: "9001"} peer2 := p2p.Peer{IP: "127.0.0.1", Port: "9001"}
selfAddr2, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", peer2.Port))
peer2.Addrs = append(peer2.Addrs, selfAddr2)
priKey2, pubKey2, _ := utils.GenKeyP2P(peer2.IP, peer2.Port)
peerID2, _ := peer.IDFromPublicKey(pubKey2)
peer2.PeerID = peerID2
host2 := hostv2.New(peer2, priKey2)
msg := []byte{0x00, 0x01, 0x02, 0x03, 0x04} msg := []byte{0x00, 0x01, 0x02, 0x03, 0x04}
host1 := hostv2.New(peer1) host1.AddPeer(&peer2)
host2 := hostv2.New(peer2)
go host2.BindHandlerAndServe(handler) go host2.BindHandlerAndServe(handler)
SendMessage(host1, peer2, msg, nil) SendMessage(host1, peer2, msg, nil)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)

@ -7,6 +7,7 @@ package mock
import ( import (
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
p2p "github.com/harmony-one/harmony/p2p" p2p "github.com/harmony-one/harmony/p2p"
peer "github.com/libp2p/go-libp2p-peer"
reflect "reflect" reflect "reflect"
) )
@ -78,3 +79,27 @@ func (m *MockHost) Close() error {
func (mr *MockHostMockRecorder) Close() *gomock.Call { func (mr *MockHostMockRecorder) Close() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockHost)(nil).Close)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockHost)(nil).Close))
} }
// AddPeer mocks base method
func (m *MockHost) AddPeer(arg0 *p2p.Peer) error {
ret := m.ctrl.Call(m, "AddPeer", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// AddPeer indicates an expected call of AddPeer
func (mr *MockHostMockRecorder) AddPeer(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPeer", reflect.TypeOf((*MockHost)(nil).AddPeer), arg0)
}
// GetID mocks base method
func (m *MockHost) GetID() peer.ID {
ret := m.ctrl.Call(m, "GetID")
ret0, _ := ret[0].(peer.ID)
return ret0
}
// GetID indicates an expected call of GetID
func (mr *MockHostMockRecorder) GetID() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetID", reflect.TypeOf((*MockHost)(nil).GetID))
}

@ -1,9 +1,12 @@
package p2p package p2p
import ( import (
"fmt"
"net" "net"
"github.com/dedis/kyber" "github.com/dedis/kyber"
peer "github.com/libp2p/go-libp2p-peer"
multiaddr "github.com/multiformats/go-multiaddr"
) )
// StreamHandler handles incoming p2p message. // StreamHandler handles incoming p2p message.
@ -13,10 +16,13 @@ type StreamHandler func(Stream)
type Peer struct { type Peer struct {
IP string // IP address of the peer IP string // IP address of the peer
Port string // Port number of the peer Port string // Port number of the peer
PubKey kyber.Point // Public key of the peer PubKey kyber.Point // Public key of the peer, used for consensus signing
Ready bool // Ready is true if the peer is ready to join consensus. Ready bool // Ready is true if the peer is ready to join consensus. (FIXME: deprecated)
ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard
// TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available. Addrs []multiaddr.Multiaddr // MultiAddress of the peer
PeerID peer.ID // PeerID, the pubkey for communication
} }
func (p Peer) String() string { return net.JoinHostPort(p.IP, p.Port) } func (p Peer) String() string {
return fmt.Sprintf("%s/%s[%d]", net.JoinHostPort(p.IP, p.Port), p.PeerID, len(p.Addrs))
}

@ -1,10 +1,17 @@
package p2pimpl package p2pimpl
import ( import (
"fmt"
"net"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/host/hostv1" "github.com/harmony-one/harmony/p2p/host/hostv1"
"github.com/harmony-one/harmony/p2p/host/hostv2" "github.com/harmony-one/harmony/p2p/host/hostv2"
"github.com/harmony-one/harmony/internal/utils"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
) )
// Version The version number of p2p library // Version The version number of p2p library
@ -12,14 +19,39 @@ import (
// 2 - libp2p // 2 - libp2p
const Version = 2 const Version = 2
// NewHost starts the host // NewHost starts the host for p2p
func NewHost(peer p2p.Peer) host.Host { // for hostv2, it generates multiaddress, keypair and add PeerID to peer, add priKey to host
// log.Debug("New Host", "ip/port", net.JoinHostPort(peer.IP, peer.Port)) // TODO (leo) the PriKey of the host has to be persistent in disk, so that we don't need to regenerate it
// on the same host if the node software restarted. The peerstore has to be persistent as well.
func NewHost(self *p2p.Peer) (host.Host, error) {
if Version == 1 { if Version == 1 {
h := hostv1.New(peer) h := hostv1.New(self)
return h return h, nil
}
selfAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", self.Port))
if err != nil {
return nil, err
}
self.Addrs = append(self.Addrs, selfAddr)
// TODO (leo), change to GenKeyP2PRand() to generate random key. Right now, the key is predictable as the
// seed is fixed.
priKey, pubKey, err := utils.GenKeyP2P(self.IP, self.Port)
if err != nil {
return nil, err
} }
h := hostv2.New(peer) peerID, err := peer.IDFromPublicKey(pubKey)
return h
if err != nil {
return nil, err
}
self.PeerID = peerID
h := hostv2.New(*self, priKey)
utils.GetLogInstance().Info("NewHost", "self", net.JoinHostPort(self.IP, self.Port), "PeerID", self.PeerID)
return h, nil
} }

@ -119,13 +119,18 @@ LOG_FILE=$log_folder/r.log
echo "launching beacon chain ..." echo "launching beacon chain ..."
$DRYRUN $ROOT/bin/beacon -numShards $SHARDS > $log_folder/beacon.log 2>&1 | tee -a $LOG_FILE & $DRYRUN $ROOT/bin/beacon -numShards $SHARDS > $log_folder/beacon.log 2>&1 | tee -a $LOG_FILE &
sleep 1 #waiting for beaconchain sleep 1 #waiting for beaconchain
MA=$(grep "Beacon Chain Started" $log_folder/beacon.log | awk -F: ' { print $2 } ')
if [ -n "$MA" ]; then
HMY_OPT="-bc_addr $MA"
fi
# Start nodes # Start nodes
while IFS='' read -r line || [[ -n "$line" ]]; do while IFS='' read -r line || [[ -n "$line" ]]; do
IFS=' ' read ip port mode shardID <<< $line IFS=' ' read ip port mode shardID <<< $line
#echo $ip $port $mode #echo $ip $port $mode
if [ "$mode" != "client" ]; then if [ "$mode" != "client" ]; then
$DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN 2>&1 | tee -a $LOG_FILE & $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT 2>&1 | tee -a $LOG_FILE &
sleep 0.5 sleep 0.5
fi fi
done < $config done < $config
@ -138,7 +143,7 @@ if [ "$TXGEN" == "true" ]; then
line=$(grep client $config) line=$(grep client $config)
IFS=' ' read ip port mode shardID <<< $line IFS=' ' read ip port mode shardID <<< $line
if [ "$mode" == "client" ]; then if [ "$mode" == "client" ]; then
$DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port 2>&1 | tee -a $LOG_FILE $DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT 2>&1 | tee -a $LOG_FILE
fi fi
fi fi

Loading…
Cancel
Save