From b62fec99ceb15408808528b890d4d081334f04ea Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Mon, 3 Dec 2018 00:47:56 -0800 Subject: [PATCH 1/5] do proper cleanup in deploy.sh Signed-off-by: Leo Chen --- deploy.sh | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/deploy.sh b/deploy.sh index 4d3a6a0c8..364652d6e 100755 --- a/deploy.sh +++ b/deploy.sh @@ -2,6 +2,15 @@ set -eo pipefail +function cleanup() { + for pid in `/bin/ps -fu $USER| grep "benchmark\|txgen\|soldier\|commander\|profiler\|beacon" | grep -v "grep" | grep -v "vi" | awk '{print $2}'`; + do + echo 'Killed process: '$pid + kill -9 $pid 2> /dev/null + done +} + +trap cleanup SIGINT SIGTERM function usage { local ME=$(basename $0) @@ -51,14 +60,16 @@ if [ -z "$config" ]; then usage fi + # Kill nodes if any -./kill_node.sh +cleanup # Since `go run` will generate a temporary exe every time, # On windows, your system will pop up a network security dialog for each instance # and you won't be able to turn it off. With `go build` generating one # exe, the dialog will only pop up once at the very first time. # Also it's recommended to use `go build` for testing the whole exe. +echo "compiling ..." go build -o bin/benchmark go build -o bin/txgen client/txgen/main.go go build -o bin/beacon runbeacon/run-beacon.go @@ -70,6 +81,7 @@ log_folder="tmp_log/log-$t" mkdir -p $log_folder if [ -n "$PEER" ]; then + echo "launching beacon chain ..." ./bin/beacon > $log_folder/beacon.log 2>&1 & sleep 1 #wait or beachchain up fi @@ -88,9 +100,13 @@ while IFS='' read -r line || [[ -n "$line" ]]; do fi done < $config +echo "launching txgen ..." if [ "$TXGEN" == "true" ]; then - ./bin/txgen -config_file $config -log_folder $log_folder -duration $DURATION + if [ -z "$PEER" ]; then + ./bin/txgen -config_file $config -log_folder $log_folder -duration $DURATION + else + ./bin/txgen -log_folder $log_folder -duration $DURATION $PEER + fi fi -# Kill nodes if any -./kill_node.sh +cleanup From 6754267f39b9fb61cf3c1938f68763a605b2227a Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Mon, 3 Dec 2018 00:16:53 -0800 Subject: [PATCH 2/5] support peerdiscovery in client support beaconchain query Signed-off-by: Leo Chen --- client/txgen/main.go | 60 ++++++++++++++++++++++++++++++++++++++------ newnode/newnode.go | 20 +++++++++------ 2 files changed, 64 insertions(+), 16 deletions(-) diff --git a/client/txgen/main.go b/client/txgen/main.go index ccb0c08ef..070185f4a 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -13,10 +13,12 @@ import ( "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/client" client_config "github.com/harmony-one/harmony/client/config" + "github.com/harmony-one/harmony/client/txgen/txgen" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/log" + "github.com/harmony-one/harmony/newnode" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" proto_node "github.com/harmony-one/harmony/proto/node" @@ -37,6 +39,9 @@ func printVersion(me string) { } func main() { + ip := flag.String("ip", "127.0.0.1", "IP of the node") + port := flag.String("port", "9999", "port of the node.") + accountModel := flag.Bool("account_model", true, "Whether to use account model") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 20000, "number of transactions to send per message") @@ -45,6 +50,11 @@ func main() { duration := flag.Int("duration", 10, "duration of the tx generation in second. If it's negative, the experiment runs forever.") versionFlag := flag.Bool("version", false, "Output version info") crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") + + idcIP := flag.String("idc", "127.0.0.1", "IP of the identity chain") + idcPort := flag.String("idc_port", "8081", "port of the identity chain") + peerDiscovery := flag.Bool("peer_discovery", false, "Enable Peer Discovery") + flag.Parse() if *versionFlag { @@ -54,10 +64,37 @@ func main() { // Add GOMAXPROCS to achieve max performance. runtime.GOMAXPROCS(1024) - // Read the configs - config := client_config.NewConfig() - config.ReadConfigFile(*configFile) - shardIDLeaderMap := config.GetShardIDToLeaderMap() + var clientPeer *p2p.Peer + var peers []p2p.Peer + var shardIDLeaderMap map[uint32]p2p.Peer + var config *client_config.Config + + if *peerDiscovery { + + candidateNode := newnode.New(*ip, *port) + BCPeer := p2p.Peer{IP: *idcIP, Port: *idcPort} + service := candidateNode.NewService(*ip, *port) + candidateNode.ConnectBeaconChain(BCPeer) + peers = nil + + service.Stop() + + clientPeer = &p2p.Peer{IP: *ip, Port: *port} + _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) + clientPeer.PubKey = pubKey + + shardIDLeaderMap = candidateNode.Leaders + + } else { + // Read the configs + config = client_config.NewConfig() + config.ReadConfigFile(*configFile) + shardIDLeaderMap = config.GetShardIDToLeaderMap() + clientPeer = config.GetClientPeer() + _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) + clientPeer.PubKey = pubKey + } + debugPrintShardIDLeaderMap(shardIDLeaderMap) // Do cross shard tx if there are more than one shard setting := txgen.Settings{ @@ -78,10 +115,7 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} - clientPeer := config.GetClientPeer() for shardID := range shardIDLeaderMap { - _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) - clientPeer.PubKey = pubKey node := node.New(&consensus.Consensus{ShardID: shardID}, nil, *clientPeer) // Assign many fake addresses so we have enough address to play with at first node.AddTestingAddresses(setting.NumOfAddress) @@ -244,7 +278,11 @@ func main() { // Send a stop message to stop the nodes at the end msg := proto_node.ConstructStopMessage() - peers := append(config.GetValidators(), clientNode.Client.GetLeaders()...) + if *peerDiscovery { + peers = clientNode.Consensus.GetValidatorPeers() + } else { + peers = append(config.GetValidators(), clientNode.Client.GetLeaders()...) + } p2p.BroadcastMessage(peers, msg) time.Sleep(3000 * time.Millisecond) } @@ -262,3 +300,9 @@ func SendTxsToLeaderAccount(leader p2p.Peer, txs types.Transactions) { msg := proto_node.ConstructTransactionListMessageAccount(txs) p2p.SendMessage(leader, msg) } + +func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) { + for k, v := range leaderMap { + log.Debug("Leader", "ShardID", k, "Leader", v) + } +} diff --git a/newnode/newnode.go b/newnode/newnode.go index da2f8fa0d..fd1802998 100644 --- a/newnode/newnode.go +++ b/newnode/newnode.go @@ -31,7 +31,7 @@ type NewNode struct { leader p2p.Peer isLeader bool Self p2p.Peer - peers []p2p.Peer + Leaders map[uint32]p2p.Peer PubK kyber.Point priK kyber.Scalar log log.Logger @@ -48,6 +48,7 @@ func New(ip string, port string) *NewNode { node.Self = p2p.Peer{IP: ip, Port: port, PubKey: pubKey, ValidatorID: -1} node.log = log.New() node.SetInfo = false + node.Leaders = map[uint32]p2p.Peer{} return &node } @@ -155,14 +156,17 @@ func (node *NewNode) processShardInfo(msgPayload []byte) bool { leadersInfo := bcconn.DeserializeRandomInfo(msgPayload) leaders := leadersInfo.Leaders shardNum, isLeader := utils.AllocateShard(leadersInfo.NumberOfNodesAdded, leadersInfo.NumberOfShards) - leaderNode := leaders[shardNum-1] //0 indexing. - leaderPeer := p2p.Peer{IP: leaderNode.Self.IP, Port: leaderNode.Self.Port} - leaderPeer.PubKey = crypto.Ed25519Curve.Point() - err := leaderPeer.PubKey.UnmarshalBinary(leaderNode.PubK[:]) - if err != nil { - node.log.Info("Could not unmarshall leaders public key from binary to kyber.point") + for n, v := range leaders { + leaderPeer := p2p.Peer{IP: v.Self.IP, Port: v.Self.Port} + leaderPeer.PubKey = crypto.Ed25519Curve.Point() + err := leaderPeer.PubKey.UnmarshalBinary(v.PubK[:]) + if err != nil { + node.log.Error("Could not unmarshall leaders public key from binary to kyber.point") + } + node.Leaders[uint32(n)] = leaderPeer } - node.leader = leaderPeer + + node.leader = node.Leaders[uint32(shardNum-1)] node.isLeader = isLeader node.ShardID = shardNum - 1 //0 indexing. node.SetInfo = true From 96fb941392ddfaa306b7e93f593fca8336ac0bf2 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Mon, 3 Dec 2018 16:26:57 -0800 Subject: [PATCH 3/5] Add Role field to Ping message The Role message is used to differentiate the node joined the shard Signed-off-by: Leo Chen --- proto/node/pingpong.go | 23 ++++++++++++++++++++++- proto/node/pingpong_test.go | 12 +++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/proto/node/pingpong.go b/proto/node/pingpong.go index 62dac0f59..6285a8f43 100644 --- a/proto/node/pingpong.go +++ b/proto/node/pingpong.go @@ -21,6 +21,25 @@ import ( "github.com/harmony-one/harmony/proto" ) +// RoleType defines the role of the node +type RoleType int + +// Type of roles of a node +const ( + ValidatorRole RoleType = iota + ClientRole +) + +func (r RoleType) String() string { + switch r { + case ValidatorRole: + return "Validator" + case ClientRole: + return "Client" + } + return "Unknown" +} + // refer to Peer struct in p2p/peer.go // this is basically a simplified version of Peer // for network transportation @@ -29,6 +48,7 @@ type nodeInfo struct { Port string PubKey []byte ValidatorID int + Role RoleType } // PingMessageType defines the data structure of the Ping message @@ -45,7 +65,7 @@ type PongMessageType struct { } func (p PingMessageType) String() string { - return fmt.Sprintf("ping:%v=>%v:%v:%v/%v", p.Version, p.Node.IP, p.Node.Port, p.Node.ValidatorID, p.Node.PubKey) + return fmt.Sprintf("ping:%v/%v=>%v:%v:%v/%v", p.Node.Role, p.Version, p.Node.IP, p.Node.Port, p.Node.ValidatorID, p.Node.PubKey) } func (p PongMessageType) String() string { @@ -63,6 +83,7 @@ func NewPingMessage(peer p2p.Peer) *PingMessageType { ping.Node.Port = peer.Port ping.Node.ValidatorID = peer.ValidatorID ping.Node.PubKey, err = peer.PubKey.MarshalBinary() + ping.Node.Role = ValidatorRole if err != nil { fmt.Printf("Error Marshal PubKey: %v", err) diff --git a/proto/node/pingpong_test.go b/proto/node/pingpong_test.go index 3e4810fc5..7f42446d7 100644 --- a/proto/node/pingpong_test.go +++ b/proto/node/pingpong_test.go @@ -21,7 +21,8 @@ var ( ValidatorID: -1, PubKey: pubKey1, } - e1 = "ping:1=>127.0.0.1:9999:-1/[90 217 28 68 64 211 160 232 61 244 159 244 160 36 61 161 237 242 236 45 147 118 237 88 234 122 198 188 157 116 90 228]" + e1 = "ping:Validator/1=>127.0.0.1:9999:-1/[90 217 28 68 64 211 160 232 61 244 159 244 160 36 61 161 237 242 236 45 147 118 237 88 234 122 198 188 157 116 90 228]" + e3 = "ping:Client/1=>127.0.0.1:9999:-1/[90 217 28 68 64 211 160 232 61 244 159 244 160 36 61 161 237 242 236 45 147 118 237 88 234 122 198 188 157 116 90 228]" priKey2 = crypto.Ed25519Curve.Scalar().SetInt64(int64(999)) pubKey2 = pki.GetPublicKeyFromScalar(priKey2) @@ -60,6 +61,15 @@ func TestString(test *testing.T) { fmt.Println(r1) } + ping1.Node.Role = ClientRole + + r3 := fmt.Sprintf("%v", *ping1) + if strings.Compare(r3, e3) != 0 { + test.Errorf("expect: %v, got: %v", e3, r3) + } else { + fmt.Println(r3) + } + pong1 := NewPongMessage(p2, pubKeys) r2 := fmt.Sprintf("%v", *pong1) From 54e4a7baa29f5a86f2648d9e059bfbbe6b164da0 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Mon, 3 Dec 2018 17:36:50 -0800 Subject: [PATCH 4/5] client node ping leaders to register client Signed-off-by: Leo Chen --- client/txgen/main.go | 17 ++++++++++++++--- node/node.go | 3 +++ node/node_handler.go | 6 ++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/client/txgen/main.go b/client/txgen/main.go index 070185f4a..83d38c536 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -166,12 +166,23 @@ func main() { } } } + clientNode.Client.UpdateBlocks = updateBlocksFunc + + // Start the client server to listen to leader's message + go clientNode.StartServer() + + for _, leader := range shardIDLeaderMap { + log.Debug("Client Join Shard", "leader", leader) + clientNode.State = node.NodeWaitToJoin + go clientNode.JoinShard(leader) + // wait for 3 seconds for client to send ping message to leader + time.Sleep(3 * time.Second) + // change the state of the client node will terminate the goroutine of JoinShard + clientNode.State = node.NodeJoinedShard + } } clientNode.Client.UpdateBlocks = updateBlocksFunc - // Start the client server to listen to leader's message - go clientNode.StartServer() - // Transaction generation process time.Sleep(10 * time.Second) // wait for nodes to be ready start := time.Now() diff --git a/node/node.go b/node/node.go index a1fa9ca73..858112938 100644 --- a/node/node.go +++ b/node/node.go @@ -411,6 +411,9 @@ func (node *Node) JoinShard(leader p2p.Peer) { for node.State == NodeWaitToJoin { backoff.Sleep() ping := proto_node.NewPingMessage(node.SelfPeer) + if node.Client != nil { // assume this is the client node + ping.Node.Role = proto_node.ClientRole + } buffer := ping.ConstructPingMessage() // Talk to leader. diff --git a/node/node_handler.go b/node/node_handler.go index be8135dab..47393ba8b 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -541,6 +541,12 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int { return -1 } + if ping.Node.Role == proto_node.ClientRole { + node.log.Info("Add Client Peer to Node", "Node", node.Consensus.GetNodeID(), "Client", peer) + node.ClientPeer = peer + return 0 + } + // Add to Node's peer list anyway node.AddPeers([]*p2p.Peer{peer}) From ab4a61d360a10684abde79ea7f29fa33f0a160f9 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Tue, 4 Dec 2018 23:09:12 -0800 Subject: [PATCH 5/5] use a better way to stop ping message use a channel to replace the old way of using shared memory Signed-off-by: Leo Chen --- client/txgen/main.go | 26 ++++++++++++-------------- node/node.go | 39 +++++++++++++++++++++++++++------------ node/node_handler.go | 2 ++ 3 files changed, 41 insertions(+), 26 deletions(-) diff --git a/client/txgen/main.go b/client/txgen/main.go index 83d38c536..5801e20bb 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -166,23 +166,21 @@ func main() { } } } - clientNode.Client.UpdateBlocks = updateBlocksFunc - - // Start the client server to listen to leader's message - go clientNode.StartServer() - - for _, leader := range shardIDLeaderMap { - log.Debug("Client Join Shard", "leader", leader) - clientNode.State = node.NodeWaitToJoin - go clientNode.JoinShard(leader) - // wait for 3 seconds for client to send ping message to leader - time.Sleep(3 * time.Second) - // change the state of the client node will terminate the goroutine of JoinShard - clientNode.State = node.NodeJoinedShard - } } clientNode.Client.UpdateBlocks = updateBlocksFunc + // Start the client server to listen to leader's message + go clientNode.StartServer() + + for _, leader := range shardIDLeaderMap { + log.Debug("Client Join Shard", "leader", leader) + go clientNode.JoinShard(leader) + // wait for 3 seconds for client to send ping message to leader + time.Sleep(3 * time.Second) + clientNode.StopPing <- 1 + clientNode.State = node.NodeJoinedShard + } + // Transaction generation process time.Sleep(10 * time.Second) // wait for nodes to be ready start := time.Now() diff --git a/node/node.go b/node/node.go index 858112938..e2e34d126 100644 --- a/node/node.go +++ b/node/node.go @@ -106,6 +106,9 @@ type Node struct { // Test only TestBankKeys []*ecdsa.PrivateKey + + // Channel to stop sending ping message + StopPing chan int } // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client @@ -328,6 +331,7 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node // Setup initial state of syncing. node.syncingState = NotDoingSyncing + node.StopPing = make(chan int) return &node } @@ -405,19 +409,30 @@ func (node *Node) GetSyncingPeers() []p2p.Peer { // JoinShard helps a new node to join a shard. func (node *Node) JoinShard(leader p2p.Peer) { - // try to join the shard, with 10 minutes time-out - backoff := p2p.NewExpBackoff(waitBeforeJoinShard, timeOutToJoinShard, 2) - - for node.State == NodeWaitToJoin { - backoff.Sleep() - ping := proto_node.NewPingMessage(node.SelfPeer) - if node.Client != nil { // assume this is the client node - ping.Node.Role = proto_node.ClientRole - } - buffer := ping.ConstructPingMessage() + // try to join the shard, send ping message every 1 second, with a 10 minutes time-out + tick := time.NewTicker(1 * time.Second) + timeout := time.NewTicker(10 * time.Minute) + defer tick.Stop() + defer timeout.Stop() - // Talk to leader. - p2p.SendMessage(leader, buffer) + for { + select { + case <-tick.C: + ping := proto_node.NewPingMessage(node.SelfPeer) + if node.Client != nil { // assume this is the client node + ping.Node.Role = proto_node.ClientRole + } + buffer := ping.ConstructPingMessage() + + // Talk to leader. + p2p.SendMessage(leader, buffer) + case <-timeout.C: + node.log.Info("JoinShard timeout") + return + case <-node.StopPing: + node.log.Info("Stopping JoinShard") + return + } } } diff --git a/node/node_handler.go b/node/node_handler.go index 47393ba8b..bb96fb7c5 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -616,6 +616,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { } node.State = NodeJoinedShard + // Notify JoinShard to stop sending Ping messages + node.StopPing <- 1 return node.Consensus.UpdatePublicKeys(publicKeys) }