diff --git a/aws-experiment-launch/experiment/commander/main.go b/aws-experiment-launch/experiment/commander/main.go index 6d25ff1bb..b5243a13d 100644 --- a/aws-experiment-launch/experiment/commander/main.go +++ b/aws-experiment-launch/experiment/commander/main.go @@ -3,11 +3,16 @@ package main import ( "bufio" "flag" + "fmt" + "io" + "io/ioutil" "log" + "mime/multipart" "net" "net/http" "os" "strings" + "time" ) type commanderSetting struct { @@ -15,6 +20,7 @@ type commanderSetting struct { port string configFile string configs [][]string + sessionID string } var ( @@ -46,11 +52,11 @@ func handleCommand(command string) { switch cmd := args[0]; cmd { case "init": { - dictateNodes("init http://" + setting.ip + ":" + setting.port + "/" + setting.configFile) + setting.sessionID = time.Now().Format("20060102-150405") + log.Println("New session", setting.sessionID) + dictateNodes(fmt.Sprintf("init %v %v %v %v", setting.ip, setting.port, setting.configFile, setting.sessionID)) } - case "ping": - fallthrough - case "kill": + case "ping", "kill", "log": { dictateNodes(command) } @@ -102,10 +108,75 @@ func dictateNode(addr string, command string) { log.Printf("Receive from %s: %s", addr, buff[:n]) } -func hostConfigFile() { - err := http.ListenAndServe(":"+setting.port, http.FileServer(http.Dir("./"))) +func handleUploadRequest(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + // reject non-post requests + jsonResponse(w, http.StatusBadRequest, "Only post request is accepted.") + return + } + + // create upload folder + uploadFolder := fmt.Sprintf("upload/%s", setting.sessionID) + err := os.MkdirAll(uploadFolder, os.ModePerm) + if err != nil { + jsonResponse(w, http.StatusInternalServerError, err.Error()) + return + } + + reader, err := r.MultipartReader() if err != nil { - panic("Failed to host config file!") + jsonResponse(w, http.StatusBadRequest, err.Error()) + return + } + + for { + part, err := reader.NextPart() + if err == io.EOF { + break + } + + dst, err := os.Create(fmt.Sprintf("%s/%s", uploadFolder, part.FileName())) + log.Println(part.FileName()) + if err != nil { + jsonResponse(w, http.StatusInternalServerError, err.Error()) + return + } + defer dst.Close() + + if _, err := io.Copy(dst, part); err != nil { + jsonResponse(w, http.StatusInternalServerError, err.Error()) + return + } + } +} + +func saveFile(w http.ResponseWriter, file multipart.File, handle *multipart.FileHeader) { + data, err := ioutil.ReadAll(file) + if err != nil { + jsonResponse(w, http.StatusInternalServerError, err.Error()) + return + } + err = ioutil.WriteFile("./files/"+handle.Filename, data, 0666) + if err != nil { + jsonResponse(w, http.StatusInternalServerError, err.Error()) + return + } + jsonResponse(w, http.StatusCreated, "File uploaded successfully!") +} + +func jsonResponse(w http.ResponseWriter, code int, message string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + fmt.Fprint(w, message) + log.Println(message) +} + +func serve() { + http.Handle("/", http.FileServer(http.Dir("./"))) + http.HandleFunc("/upload", handleUploadRequest) + err := http.ListenAndServe(":"+setting.port, nil) + if err != nil { + log.Fatalf("Failed to setup server! Error: %s", err.Error()) } } @@ -117,7 +188,8 @@ func main() { config(*ip, *port, *configFile) log.Println("Start to host config file at http://" + setting.ip + ":" + setting.port + "/" + setting.configFile) - go hostConfigFile() + + go serve() scanner := bufio.NewScanner(os.Stdin) for true { diff --git a/aws-experiment-launch/experiment/soldier/main.go b/aws-experiment-launch/experiment/soldier/main.go index 7aa8d9a7f..c529c5ccd 100644 --- a/aws-experiment-launch/experiment/soldier/main.go +++ b/aws-experiment-launch/experiment/soldier/main.go @@ -2,26 +2,37 @@ package main import ( "bufio" + "bytes" "flag" + "fmt" "io" "io/ioutil" "log" + "mime/multipart" "net" "net/http" "os" "os/exec" + "path/filepath" "strings" - "time" ) type soliderSetting struct { - ip string - port string - localConfig string + ip string + port string +} + +type sessionInfo struct { + id string + commanderIP string + commanderPort string + localConfigFileName string + logFolder string } var ( setting soliderSetting + session sessionInfo ) func socketServer() { @@ -63,10 +74,6 @@ ILOOP: break ILOOP case nil: log.Println("Received command", data) - if isTransportOver(data) { - log.Println("Tranport Over!") - break ILOOP - } handleCommand(data, w) @@ -88,6 +95,10 @@ func handleCommand(command string, w *bufio.Writer) { } switch command := args[0]; command { + case "ping": + { + handlePingCommand(w) + } case "init": { handleInitCommand(args[1:], w) @@ -96,25 +107,37 @@ func handleCommand(command string, w *bufio.Writer) { { handleKillCommand(w) } - case "ping": + case "log": { - handlePingCommand(w) + handleLogCommand(w) } } } func handleInitCommand(args []string, w *bufio.Writer) { + // init ip port config_file sessionID log.Println("Init command", args) + + // read arguments + ip := args[0] + session.commanderIP = ip + port := args[1] + session.commanderPort = port + configFile := args[2] + sessionID := args[3] + session.id = sessionID + configURL := fmt.Sprintf("http://%v:%v/%v", ip, port, configFile) + session.logFolder = fmt.Sprintf("../tmp_log/log-%v", sessionID) + // create local config file - setting.localConfig = "node_config_" + setting.port + ".txt" - out, err := os.Create(setting.localConfig) + session.localConfigFileName = fmt.Sprintf("node_config_%v_%v.txt", setting.port, session.id) + out, err := os.Create(session.localConfigFileName) if err != nil { log.Fatal("Failed to create local file", err) } defer out.Close() // get remote config file - configURL := args[0] resp, err := http.Get(configURL) if err != nil { log.Fatal("Failed to read file content") @@ -128,14 +151,14 @@ func handleInitCommand(args []string, w *bufio.Writer) { } // log config file - content, err := ioutil.ReadFile(setting.localConfig) + content, err := ioutil.ReadFile(session.localConfigFileName) if err != nil { log.Fatal(err) } log.Println("Successfully downloaded config") log.Println(string(content)) - run() + runInstance() logAndReply(w, "Successfully init-ed") } @@ -151,21 +174,73 @@ func handlePingCommand(w *bufio.Writer) { logAndReply(w, "I'm alive") } -func logAndReply(w *bufio.Writer, message string) { - log.Println(message) - w.Write([]byte(message)) - w.Flush() +func handleLogCommand(w *bufio.Writer) { + log.Println("Log command") + + files, err := ioutil.ReadDir(session.logFolder) + if err != nil { + logAndReply(w, fmt.Sprintf("Failed to create access log folder. Error: %s", err.Error())) + return + } + + filePaths := make([]string, len(files)) + for i, f := range files { + filePaths[i] = fmt.Sprintf("%s/%s", session.logFolder, f.Name()) + } + + req, err := newUploadFileRequest( + fmt.Sprintf("http://%s:%s/upload", session.commanderIP, session.commanderPort), + "file", + filePaths, + nil) + if err != nil { + logAndReply(w, fmt.Sprintf("Failed to create upload request. Error: %s", err.Error())) + return + } + client := &http.Client{} + _, err = client.Do(req) + if err != nil { + logAndReply(w, fmt.Sprintf("Failed to upload log. Error: %s", err.Error())) + return + } + logAndReply(w, "Upload log done!") } -func createLogFolder() string { - t := time.Now().Format("20060102-150405") - logFolder := "../tmp_log/log-" + t - err := os.MkdirAll(logFolder, os.ModePerm) +// Creates a new file upload http request with optional extra params +func newUploadFileRequest(uri string, paramName string, paths []string, params map[string]string) (*http.Request, error) { + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + for _, path := range paths { + file, err := os.Open(path) + if err != nil { + return nil, err + } + defer file.Close() + part, err := writer.CreateFormFile(paramName, filepath.Base(path)) + if err != nil { + return nil, err + } + _, err = io.Copy(part, file) + log.Printf(path) + } + + for key, val := range params { + _ = writer.WriteField(key, val) + } + err := writer.Close() if err != nil { - log.Fatal("Failed to create log folder") + return nil, err } - log.Println("Created log folder", logFolder) - return logFolder + + req, err := http.NewRequest("POST", uri, body) + req.Header.Set("Content-Type", writer.FormDataContentType()) + return req, err +} + +func logAndReply(w *bufio.Writer, message string) { + log.Println(message) + w.Write([]byte(message)) + w.Flush() } func runCmd(name string, args ...string) error { @@ -174,32 +249,28 @@ func runCmd(name string, args ...string) error { return err } -func run() { - config := readConfigFile(setting.localConfig) +func runInstance() { + config := readConfigFile(session.localConfigFileName) myConfig := getMyConfig(setting.ip, setting.port, &config) - logFolder := createLogFolder() + os.MkdirAll(session.logFolder, os.ModePerm) + if myConfig[2] == "client" { - runClient(logFolder) + runClient() } else { - runInstance(logFolder) + runNode() } } -func runInstance(logFolder string) error { +func runNode() error { log.Println("running instance") - return runCmd("./benchmark", "-ip", setting.ip, "-port", setting.port, "-config_file", setting.localConfig, "-log_folder", logFolder) + return runCmd("./benchmark", "-ip", setting.ip, "-port", setting.port, "-config_file", session.localConfigFileName, "-log_folder", session.logFolder) } -func runClient(logFolder string) error { +func runClient() error { log.Println("running client") - return runCmd("./txgen", "-config_file", setting.localConfig, "-log_folder", logFolder) -} - -func isTransportOver(data string) (over bool) { - over = strings.HasSuffix(data, "\r\n\r\n") - return + return runCmd("./txgen", "-config_file", session.localConfigFileName, "-log_folder", session.logFolder) } func readConfigFile(configFile string) [][]string { @@ -224,9 +295,9 @@ func getMyConfig(myIP string, myPort string, config *[][]string) []string { return nil } -// go build -o bin/soldier aws-experiment-launch/experiment/soldier/main.go -// cd bin/ -// ./soldier --port=xxxx +// cd harmony-benchmark +// go build -o soldier ../aws-experiment-launch/experiment/soldier/main.go +// ./soldier -ip=xx -port=xx func main() { ip := flag.String("ip", "127.0.0.1", "IP of the node.") port := flag.String("port", "3000", "port of the node.")