add counter for each commmand in commander and output # of nodes receiving the commander

pull/43/head
Minh Doan 6 years ago
parent b9c97ee8fb
commit 84ef951b7e
  1. 33
      aws-experiment-launch/experiment/commander/main.go

@ -35,6 +35,7 @@ var (
const ( const (
DistributionFileName = "distribution_config.txt" DistributionFileName = "distribution_config.txt"
MaxFileOpen = 5000
) )
func readConfigFile() [][]string { func readConfigFile() [][]string {
@ -74,12 +75,11 @@ func handleCommand(command string) {
} }
log.Println("New session", session.id) log.Println("New session", session.id)
dictateNodes(fmt.Sprintf("init %v %v %v %v", setting.ip, setting.port, setting.configURL, session.id)) count := dictateNodes(fmt.Sprintf("init %v %v %v %v", setting.ip, setting.port, setting.configURL, session.id))
// log.Printf("Finished init with %v nodes\n", count) log.Printf("Finished init with %v nodes\n", count)
case "ping", "kill", "log", "log2": case "ping", "kill", "log", "log2":
dictateNodes(command) count := dictateNodes(command)
// count := dictateNodes(command) log.Printf("Finished %v with %v nodes\n", cmd, count)
// log.Printf("Finished %v with %v nodes\n", cmd, count)
default: default:
log.Println("Unknown command") log.Println("Unknown command")
} }
@ -91,11 +91,10 @@ func config(ip string, port string, configURL string) {
setting.configURL = configURL setting.configURL = configURL
} }
func dictateNodes(command string) { func dictateNodes(command string) int {
const MaxFileOpen = 5000
var wg sync.WaitGroup var wg sync.WaitGroup
count := MaxFileOpen count := MaxFileOpen
result_chan := make(chan int)
for _, config := range setting.configs { for _, config := range setting.configs {
ip := config[0] ip := config[0]
port := "1" + config[1] // the port number of solider is "1" + node port port := "1" + config[1] // the port number of solider is "1" + node port
@ -106,7 +105,7 @@ func dictateNodes(command string) {
} }
go func() { go func() {
defer wg.Done() defer wg.Done()
dictateNode(addr, command) result_chan <- dictateNode(addr, command)
}() }()
count -= 1 count -= 1
// Because of the limitation of ulimit // Because of the limitation of ulimit
@ -119,14 +118,20 @@ func dictateNodes(command string) {
if count < MaxFileOpen { if count < MaxFileOpen {
wg.Wait() wg.Wait()
} }
count = len(setting.configs)
res := 0
for count > 0 {
res += <-result_chan
}
return res
} }
func dictateNode(addr string, command string) bool { func dictateNode(addr string, command string) int {
// creates client // creates client
conn, err := net.DialTimeout("tcp", addr, 5*time.Second) conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return false return 0
} }
defer conn.Close() defer conn.Close()
@ -134,16 +139,16 @@ func dictateNode(addr string, command string) bool {
_, err = conn.Write([]byte(command)) _, err = conn.Write([]byte(command))
if err != nil { if err != nil {
log.Printf("Failed to send command to %s", addr) log.Printf("Failed to send command to %s", addr)
return false return 0
} }
log.Printf("Send \"%s\" to %s", command, addr) log.Printf("Send \"%s\" to %s", command, addr)
// read response // read response
if buf, err := ioutil.ReadAll(conn); err != nil { if buf, err := ioutil.ReadAll(conn); err != nil {
return false return 0
} else { } else {
log.Printf("Receive from %s: %s", addr, buf) log.Printf("Receive from %s: %s", addr, buf)
return true return 1
} }
} }

Loading…
Cancel
Save