From b4c1a246a34f6060e957e76d896ab63274ae0d08 Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Mon, 16 Jul 2018 23:55:19 -0700 Subject: [PATCH] updated soldier to upload logs to s3. --- .../experiment/soldier/main.go | 67 +++++++++---- .../experiment/soldier/s3/s3.go | 93 +++++++++++++++++++ 2 files changed, 143 insertions(+), 17 deletions(-) create mode 100644 aws-experiment-launch/experiment/soldier/s3/s3.go diff --git a/aws-experiment-launch/experiment/soldier/main.go b/aws-experiment-launch/experiment/soldier/main.go index 97359ab8b..7aed13acb 100644 --- a/aws-experiment-launch/experiment/soldier/main.go +++ b/aws-experiment-launch/experiment/soldier/main.go @@ -5,6 +5,7 @@ import ( "bytes" "flag" "fmt" + "harmony-benchmark/aws-experiment-launch/experiment/soldier/s3" "io" "io/ioutil" "log" @@ -30,9 +31,14 @@ type sessionInfo struct { logFolder string } +const ( + bucketName = "richard-bucket-test" + logFolderPrefix = "../tmp_log/" +) + var ( - setting soliderSetting - session sessionInfo + setting soliderSetting + globalSession sessionInfo ) func socketServer() { @@ -111,6 +117,10 @@ func handleCommand(command string, w *bufio.Writer) { { handleLogCommand(w) } + case "log2": + { + handleLog2Command(w) + } } } @@ -120,17 +130,17 @@ func handleInitCommand(args []string, w *bufio.Writer) { // read arguments ip := args[0] - session.commanderIP = ip + globalSession.commanderIP = ip port := args[1] - session.commanderPort = port + globalSession.commanderPort = port configURL := args[2] sessionID := args[3] - session.id = sessionID - session.logFolder = fmt.Sprintf("../tmp_log/log-%v", sessionID) + globalSession.id = sessionID + globalSession.logFolder = fmt.Sprintf("%slog-%v", logFolderPrefix, sessionID) // create local config file - session.localConfigFileName = fmt.Sprintf("node_config_%v_%v.txt", setting.port, session.id) - out, err := os.Create(session.localConfigFileName) + globalSession.localConfigFileName = fmt.Sprintf("node_config_%v_%v.txt", setting.port, globalSession.id) + out, err := os.Create(globalSession.localConfigFileName) if err != nil { log.Fatal("Failed to create local file", err) } @@ -150,7 +160,7 @@ func handleInitCommand(args []string, w *bufio.Writer) { } // log config file - content, err := ioutil.ReadFile(session.localConfigFileName) + content, err := ioutil.ReadFile(globalSession.localConfigFileName) if err != nil { log.Fatal(err) } @@ -176,19 +186,19 @@ func handlePingCommand(w *bufio.Writer) { func handleLogCommand(w *bufio.Writer) { log.Println("Log command") - files, err := ioutil.ReadDir(session.logFolder) + files, err := ioutil.ReadDir(globalSession.logFolder) if err != nil { - logAndReply(w, fmt.Sprintf("Failed to create access log folder. Error: %s", err.Error())) + logAndReply(w, fmt.Sprintf("Failed to create log folder. Error: %s", err.Error())) return } filePaths := make([]string, len(files)) for i, f := range files { - filePaths[i] = fmt.Sprintf("%s/%s", session.logFolder, f.Name()) + filePaths[i] = fmt.Sprintf("%s/%s", globalSession.logFolder, f.Name()) } req, err := newUploadFileRequest( - fmt.Sprintf("http://%s:%s/upload", session.commanderIP, session.commanderPort), + fmt.Sprintf("http://%s:%s/upload", globalSession.commanderIP, globalSession.commanderPort), "file", filePaths, nil) @@ -242,6 +252,29 @@ func logAndReply(w *bufio.Writer, message string) { w.Flush() } +func handleLog2Command(w *bufio.Writer) { + log.Println("Log command") + + files, err := ioutil.ReadDir(globalSession.logFolder) + if err != nil { + logAndReply(w, fmt.Sprintf("Failed to create log folder. Error: %s", err.Error())) + return + } + + filePaths := make([]string, len(files)) + for i, f := range files { + filePaths[i] = fmt.Sprintf("%s/%s", globalSession.logFolder, f.Name()) + } + + // TODO: currently only upload the first file. + _, err = s3.UploadFile(bucketName, filePaths[0], strings.Replace(filePaths[0], logFolderPrefix, "", 1)) + if err != nil { + logAndReply(w, fmt.Sprintf("Failed to create upload request. Error: %s", err.Error())) + return + } + logAndReply(w, "Upload log done!") +} + func runCmd(name string, args ...string) error { err := exec.Command(name, args...).Start() log.Println("Command running", name, args) @@ -249,11 +282,11 @@ func runCmd(name string, args ...string) error { } func runInstance() { - config := readConfigFile(session.localConfigFileName) + config := readConfigFile(globalSession.localConfigFileName) myConfig := getMyConfig(setting.ip, setting.port, &config) - os.MkdirAll(session.logFolder, os.ModePerm) + os.MkdirAll(globalSession.logFolder, os.ModePerm) if myConfig[2] == "client" { runClient() @@ -264,12 +297,12 @@ func runInstance() { func runNode() error { log.Println("running instance") - return runCmd("./benchmark", "-ip", setting.ip, "-port", setting.port, "-config_file", session.localConfigFileName, "-log_folder", session.logFolder) + return runCmd("./benchmark", "-ip", setting.ip, "-port", setting.port, "-config_file", globalSession.localConfigFileName, "-log_folder", globalSession.logFolder) } func runClient() error { log.Println("running client") - return runCmd("./txgen", "-config_file", session.localConfigFileName, "-log_folder", session.logFolder) + return runCmd("./txgen", "-config_file", globalSession.localConfigFileName, "-log_folder", globalSession.logFolder) } func readConfigFile(configFile string) [][]string { diff --git a/aws-experiment-launch/experiment/soldier/s3/s3.go b/aws-experiment-launch/experiment/soldier/s3/s3.go new file mode 100644 index 000000000..ef1fb7f28 --- /dev/null +++ b/aws-experiment-launch/experiment/soldier/s3/s3.go @@ -0,0 +1,93 @@ +package s3 + +import ( + "fmt" + "log" + "os" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/endpoints" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +func createSession() *session.Session { + return session.Must(session.NewSession(&aws.Config{ + Region: aws.String(endpoints.UsWest2RegionID), + MaxRetries: aws.Int(3), + })) +} + +func CreateBucket(bucketName string, region string) { + sess := createSession() + svc := s3.New(sess) + input := &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + CreateBucketConfiguration: &s3.CreateBucketConfiguration{ + LocationConstraint: aws.String(region), + }, + } + + result, err := svc.CreateBucket(input) + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case s3.ErrCodeBucketAlreadyExists: + fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error()) + case s3.ErrCodeBucketAlreadyOwnedByYou: + fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error()) + default: + fmt.Println(aerr.Error()) + } + } else { + // Print the error, cast err to awserr.Error to get the Code and + // Message from an error. + fmt.Println(err.Error()) + } + return + } + + fmt.Println(result) +} + +func UploadFile(bucketName string, fileName string, key string) (result *s3manager.UploadOutput, err error) { + sess := createSession() + uploader := s3manager.NewUploader(sess) + + f, err := os.Open(fileName) + if err != nil { + log.Println("Failed to open file", err) + return nil, err + } + // Upload the file to S3. + result, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + Body: f, + }) + if err != nil { + log.Println("failed to upload file", err) + return nil, err + } + fmt.Printf("file uploaded to, %s\n", result.Location) + return result, nil +} + +func DownloadFile(bucketName string, fileName string, key string) (n int64, err error) { + sess := createSession() + + downloader := s3manager.NewDownloader(sess) + + f, err := os.Create(fileName) + if err != nil { + return + } + + n, err = downloader.Download(f, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + }) + return +}