From 6754267f39b9fb61cf3c1938f68763a605b2227a Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Mon, 3 Dec 2018 00:16:53 -0800 Subject: [PATCH] support peerdiscovery in client support beaconchain query Signed-off-by: Leo Chen --- client/txgen/main.go | 60 ++++++++++++++++++++++++++++++++++++++------ newnode/newnode.go | 20 +++++++++------ 2 files changed, 64 insertions(+), 16 deletions(-) diff --git a/client/txgen/main.go b/client/txgen/main.go index ccb0c08ef..070185f4a 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -13,10 +13,12 @@ import ( "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/client" client_config "github.com/harmony-one/harmony/client/config" + "github.com/harmony-one/harmony/client/txgen/txgen" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/log" + "github.com/harmony-one/harmony/newnode" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" proto_node "github.com/harmony-one/harmony/proto/node" @@ -37,6 +39,9 @@ func printVersion(me string) { } func main() { + ip := flag.String("ip", "127.0.0.1", "IP of the node") + port := flag.String("port", "9999", "port of the node.") + accountModel := flag.Bool("account_model", true, "Whether to use account model") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 20000, "number of transactions to send per message") @@ -45,6 +50,11 @@ func main() { duration := flag.Int("duration", 10, "duration of the tx generation in second. If it's negative, the experiment runs forever.") versionFlag := flag.Bool("version", false, "Output version info") crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") + + idcIP := flag.String("idc", "127.0.0.1", "IP of the identity chain") + idcPort := flag.String("idc_port", "8081", "port of the identity chain") + peerDiscovery := flag.Bool("peer_discovery", false, "Enable Peer Discovery") + flag.Parse() if *versionFlag { @@ -54,10 +64,37 @@ func main() { // Add GOMAXPROCS to achieve max performance. runtime.GOMAXPROCS(1024) - // Read the configs - config := client_config.NewConfig() - config.ReadConfigFile(*configFile) - shardIDLeaderMap := config.GetShardIDToLeaderMap() + var clientPeer *p2p.Peer + var peers []p2p.Peer + var shardIDLeaderMap map[uint32]p2p.Peer + var config *client_config.Config + + if *peerDiscovery { + + candidateNode := newnode.New(*ip, *port) + BCPeer := p2p.Peer{IP: *idcIP, Port: *idcPort} + service := candidateNode.NewService(*ip, *port) + candidateNode.ConnectBeaconChain(BCPeer) + peers = nil + + service.Stop() + + clientPeer = &p2p.Peer{IP: *ip, Port: *port} + _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) + clientPeer.PubKey = pubKey + + shardIDLeaderMap = candidateNode.Leaders + + } else { + // Read the configs + config = client_config.NewConfig() + config.ReadConfigFile(*configFile) + shardIDLeaderMap = config.GetShardIDToLeaderMap() + clientPeer = config.GetClientPeer() + _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) + clientPeer.PubKey = pubKey + } + debugPrintShardIDLeaderMap(shardIDLeaderMap) // Do cross shard tx if there are more than one shard setting := txgen.Settings{ @@ -78,10 +115,7 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} - clientPeer := config.GetClientPeer() for shardID := range shardIDLeaderMap { - _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) - clientPeer.PubKey = pubKey node := node.New(&consensus.Consensus{ShardID: shardID}, nil, *clientPeer) // Assign many fake addresses so we have enough address to play with at first node.AddTestingAddresses(setting.NumOfAddress) @@ -244,7 +278,11 @@ func main() { // Send a stop message to stop the nodes at the end msg := proto_node.ConstructStopMessage() - peers := append(config.GetValidators(), clientNode.Client.GetLeaders()...) + if *peerDiscovery { + peers = clientNode.Consensus.GetValidatorPeers() + } else { + peers = append(config.GetValidators(), clientNode.Client.GetLeaders()...) + } p2p.BroadcastMessage(peers, msg) time.Sleep(3000 * time.Millisecond) } @@ -262,3 +300,9 @@ func SendTxsToLeaderAccount(leader p2p.Peer, txs types.Transactions) { msg := proto_node.ConstructTransactionListMessageAccount(txs) p2p.SendMessage(leader, msg) } + +func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) { + for k, v := range leaderMap { + log.Debug("Leader", "ShardID", k, "Leader", v) + } +} diff --git a/newnode/newnode.go b/newnode/newnode.go index da2f8fa0d..fd1802998 100644 --- a/newnode/newnode.go +++ b/newnode/newnode.go @@ -31,7 +31,7 @@ type NewNode struct { leader p2p.Peer isLeader bool Self p2p.Peer - peers []p2p.Peer + Leaders map[uint32]p2p.Peer PubK kyber.Point priK kyber.Scalar log log.Logger @@ -48,6 +48,7 @@ func New(ip string, port string) *NewNode { node.Self = p2p.Peer{IP: ip, Port: port, PubKey: pubKey, ValidatorID: -1} node.log = log.New() node.SetInfo = false + node.Leaders = map[uint32]p2p.Peer{} return &node } @@ -155,14 +156,17 @@ func (node *NewNode) processShardInfo(msgPayload []byte) bool { leadersInfo := bcconn.DeserializeRandomInfo(msgPayload) leaders := leadersInfo.Leaders shardNum, isLeader := utils.AllocateShard(leadersInfo.NumberOfNodesAdded, leadersInfo.NumberOfShards) - leaderNode := leaders[shardNum-1] //0 indexing. - leaderPeer := p2p.Peer{IP: leaderNode.Self.IP, Port: leaderNode.Self.Port} - leaderPeer.PubKey = crypto.Ed25519Curve.Point() - err := leaderPeer.PubKey.UnmarshalBinary(leaderNode.PubK[:]) - if err != nil { - node.log.Info("Could not unmarshall leaders public key from binary to kyber.point") + for n, v := range leaders { + leaderPeer := p2p.Peer{IP: v.Self.IP, Port: v.Self.Port} + leaderPeer.PubKey = crypto.Ed25519Curve.Point() + err := leaderPeer.PubKey.UnmarshalBinary(v.PubK[:]) + if err != nil { + node.log.Error("Could not unmarshall leaders public key from binary to kyber.point") + } + node.Leaders[uint32(n)] = leaderPeer } - node.leader = leaderPeer + + node.leader = node.Leaders[uint32(shardNum-1)] node.isLeader = isLeader node.ShardID = shardNum - 1 //0 indexing. node.SetInfo = true