added log command to upload log files.

pull/37/head
Richard Liu 7 years ago
parent ba1299557c
commit 6529cf2dab
  1. 88
      aws-experiment-launch/experiment/commander/main.go
  2. 157
      aws-experiment-launch/experiment/soldier/main.go

@ -3,11 +3,16 @@ package main
import ( import (
"bufio" "bufio"
"flag" "flag"
"fmt"
"io"
"io/ioutil"
"log" "log"
"mime/multipart"
"net" "net"
"net/http" "net/http"
"os" "os"
"strings" "strings"
"time"
) )
type commanderSetting struct { type commanderSetting struct {
@ -15,6 +20,7 @@ type commanderSetting struct {
port string port string
configFile string configFile string
configs [][]string configs [][]string
sessionID string
} }
var ( var (
@ -46,11 +52,11 @@ func handleCommand(command string) {
switch cmd := args[0]; cmd { switch cmd := args[0]; cmd {
case "init": case "init":
{ {
dictateNodes("init http://" + setting.ip + ":" + setting.port + "/" + setting.configFile) setting.sessionID = time.Now().Format("20060102-150405")
log.Println("New session", setting.sessionID)
dictateNodes(fmt.Sprintf("init %v %v %v %v", setting.ip, setting.port, setting.configFile, setting.sessionID))
} }
case "ping": case "ping", "kill", "log":
fallthrough
case "kill":
{ {
dictateNodes(command) dictateNodes(command)
} }
@ -102,10 +108,75 @@ func dictateNode(addr string, command string) {
log.Printf("Receive from %s: %s", addr, buff[:n]) log.Printf("Receive from %s: %s", addr, buff[:n])
} }
func hostConfigFile() { func handleUploadRequest(w http.ResponseWriter, r *http.Request) {
err := http.ListenAndServe(":"+setting.port, http.FileServer(http.Dir("./"))) if r.Method != http.MethodPost {
// reject non-post requests
jsonResponse(w, http.StatusBadRequest, "Only post request is accepted.")
return
}
// create upload folder
uploadFolder := fmt.Sprintf("upload/%s", setting.sessionID)
err := os.MkdirAll(uploadFolder, os.ModePerm)
if err != nil {
jsonResponse(w, http.StatusInternalServerError, err.Error())
return
}
reader, err := r.MultipartReader()
if err != nil { if err != nil {
panic("Failed to host config file!") jsonResponse(w, http.StatusBadRequest, err.Error())
return
}
for {
part, err := reader.NextPart()
if err == io.EOF {
break
}
dst, err := os.Create(fmt.Sprintf("%s/%s", uploadFolder, part.FileName()))
log.Println(part.FileName())
if err != nil {
jsonResponse(w, http.StatusInternalServerError, err.Error())
return
}
defer dst.Close()
if _, err := io.Copy(dst, part); err != nil {
jsonResponse(w, http.StatusInternalServerError, err.Error())
return
}
}
}
func saveFile(w http.ResponseWriter, file multipart.File, handle *multipart.FileHeader) {
data, err := ioutil.ReadAll(file)
if err != nil {
jsonResponse(w, http.StatusInternalServerError, err.Error())
return
}
err = ioutil.WriteFile("./files/"+handle.Filename, data, 0666)
if err != nil {
jsonResponse(w, http.StatusInternalServerError, err.Error())
return
}
jsonResponse(w, http.StatusCreated, "File uploaded successfully!")
}
func jsonResponse(w http.ResponseWriter, code int, message string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
fmt.Fprint(w, message)
log.Println(message)
}
func serve() {
http.Handle("/", http.FileServer(http.Dir("./")))
http.HandleFunc("/upload", handleUploadRequest)
err := http.ListenAndServe(":"+setting.port, nil)
if err != nil {
log.Fatalf("Failed to setup server! Error: %s", err.Error())
} }
} }
@ -117,7 +188,8 @@ func main() {
config(*ip, *port, *configFile) config(*ip, *port, *configFile)
log.Println("Start to host config file at http://" + setting.ip + ":" + setting.port + "/" + setting.configFile) log.Println("Start to host config file at http://" + setting.ip + ":" + setting.port + "/" + setting.configFile)
go hostConfigFile()
go serve()
scanner := bufio.NewScanner(os.Stdin) scanner := bufio.NewScanner(os.Stdin)
for true { for true {

@ -2,26 +2,37 @@ package main
import ( import (
"bufio" "bufio"
"bytes"
"flag" "flag"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"mime/multipart"
"net" "net"
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"strings" "strings"
"time"
) )
type soliderSetting struct { type soliderSetting struct {
ip string ip string
port string port string
localConfig string }
type sessionInfo struct {
id string
commanderIP string
commanderPort string
localConfigFileName string
logFolder string
} }
var ( var (
setting soliderSetting setting soliderSetting
session sessionInfo
) )
func socketServer() { func socketServer() {
@ -63,10 +74,6 @@ ILOOP:
break ILOOP break ILOOP
case nil: case nil:
log.Println("Received command", data) log.Println("Received command", data)
if isTransportOver(data) {
log.Println("Tranport Over!")
break ILOOP
}
handleCommand(data, w) handleCommand(data, w)
@ -88,6 +95,10 @@ func handleCommand(command string, w *bufio.Writer) {
} }
switch command := args[0]; command { switch command := args[0]; command {
case "ping":
{
handlePingCommand(w)
}
case "init": case "init":
{ {
handleInitCommand(args[1:], w) handleInitCommand(args[1:], w)
@ -96,25 +107,37 @@ func handleCommand(command string, w *bufio.Writer) {
{ {
handleKillCommand(w) handleKillCommand(w)
} }
case "ping": case "log":
{ {
handlePingCommand(w) handleLogCommand(w)
} }
} }
} }
func handleInitCommand(args []string, w *bufio.Writer) { func handleInitCommand(args []string, w *bufio.Writer) {
// init ip port config_file sessionID
log.Println("Init command", args) log.Println("Init command", args)
// read arguments
ip := args[0]
session.commanderIP = ip
port := args[1]
session.commanderPort = port
configFile := args[2]
sessionID := args[3]
session.id = sessionID
configURL := fmt.Sprintf("http://%v:%v/%v", ip, port, configFile)
session.logFolder = fmt.Sprintf("../tmp_log/log-%v", sessionID)
// create local config file // create local config file
setting.localConfig = "node_config_" + setting.port + ".txt" session.localConfigFileName = fmt.Sprintf("node_config_%v_%v.txt", setting.port, session.id)
out, err := os.Create(setting.localConfig) out, err := os.Create(session.localConfigFileName)
if err != nil { if err != nil {
log.Fatal("Failed to create local file", err) log.Fatal("Failed to create local file", err)
} }
defer out.Close() defer out.Close()
// get remote config file // get remote config file
configURL := args[0]
resp, err := http.Get(configURL) resp, err := http.Get(configURL)
if err != nil { if err != nil {
log.Fatal("Failed to read file content") log.Fatal("Failed to read file content")
@ -128,14 +151,14 @@ func handleInitCommand(args []string, w *bufio.Writer) {
} }
// log config file // log config file
content, err := ioutil.ReadFile(setting.localConfig) content, err := ioutil.ReadFile(session.localConfigFileName)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
log.Println("Successfully downloaded config") log.Println("Successfully downloaded config")
log.Println(string(content)) log.Println(string(content))
run() runInstance()
logAndReply(w, "Successfully init-ed") logAndReply(w, "Successfully init-ed")
} }
@ -151,21 +174,73 @@ func handlePingCommand(w *bufio.Writer) {
logAndReply(w, "I'm alive") logAndReply(w, "I'm alive")
} }
func logAndReply(w *bufio.Writer, message string) { func handleLogCommand(w *bufio.Writer) {
log.Println(message) log.Println("Log command")
w.Write([]byte(message))
w.Flush() files, err := ioutil.ReadDir(session.logFolder)
if err != nil {
logAndReply(w, fmt.Sprintf("Failed to create access log folder. Error: %s", err.Error()))
return
}
filePaths := make([]string, len(files))
for i, f := range files {
filePaths[i] = fmt.Sprintf("%s/%s", session.logFolder, f.Name())
}
req, err := newUploadFileRequest(
fmt.Sprintf("http://%s:%s/upload", session.commanderIP, session.commanderPort),
"file",
filePaths,
nil)
if err != nil {
logAndReply(w, fmt.Sprintf("Failed to create upload request. Error: %s", err.Error()))
return
}
client := &http.Client{}
_, err = client.Do(req)
if err != nil {
logAndReply(w, fmt.Sprintf("Failed to upload log. Error: %s", err.Error()))
return
}
logAndReply(w, "Upload log done!")
} }
func createLogFolder() string { // Creates a new file upload http request with optional extra params
t := time.Now().Format("20060102-150405") func newUploadFileRequest(uri string, paramName string, paths []string, params map[string]string) (*http.Request, error) {
logFolder := "../tmp_log/log-" + t body := &bytes.Buffer{}
err := os.MkdirAll(logFolder, os.ModePerm) writer := multipart.NewWriter(body)
for _, path := range paths {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
part, err := writer.CreateFormFile(paramName, filepath.Base(path))
if err != nil {
return nil, err
}
_, err = io.Copy(part, file)
log.Printf(path)
}
for key, val := range params {
_ = writer.WriteField(key, val)
}
err := writer.Close()
if err != nil { if err != nil {
log.Fatal("Failed to create log folder") return nil, err
} }
log.Println("Created log folder", logFolder)
return logFolder req, err := http.NewRequest("POST", uri, body)
req.Header.Set("Content-Type", writer.FormDataContentType())
return req, err
}
func logAndReply(w *bufio.Writer, message string) {
log.Println(message)
w.Write([]byte(message))
w.Flush()
} }
func runCmd(name string, args ...string) error { func runCmd(name string, args ...string) error {
@ -174,32 +249,28 @@ func runCmd(name string, args ...string) error {
return err return err
} }
func run() { func runInstance() {
config := readConfigFile(setting.localConfig) config := readConfigFile(session.localConfigFileName)
myConfig := getMyConfig(setting.ip, setting.port, &config) myConfig := getMyConfig(setting.ip, setting.port, &config)
logFolder := createLogFolder() os.MkdirAll(session.logFolder, os.ModePerm)
if myConfig[2] == "client" { if myConfig[2] == "client" {
runClient(logFolder) runClient()
} else { } else {
runInstance(logFolder) runNode()
} }
} }
func runInstance(logFolder string) error { func runNode() error {
log.Println("running instance") log.Println("running instance")
return runCmd("./benchmark", "-ip", setting.ip, "-port", setting.port, "-config_file", setting.localConfig, "-log_folder", logFolder) return runCmd("./benchmark", "-ip", setting.ip, "-port", setting.port, "-config_file", session.localConfigFileName, "-log_folder", session.logFolder)
} }
func runClient(logFolder string) error { func runClient() error {
log.Println("running client") log.Println("running client")
return runCmd("./txgen", "-config_file", setting.localConfig, "-log_folder", logFolder) return runCmd("./txgen", "-config_file", session.localConfigFileName, "-log_folder", session.logFolder)
}
func isTransportOver(data string) (over bool) {
over = strings.HasSuffix(data, "\r\n\r\n")
return
} }
func readConfigFile(configFile string) [][]string { func readConfigFile(configFile string) [][]string {
@ -224,9 +295,9 @@ func getMyConfig(myIP string, myPort string, config *[][]string) []string {
return nil return nil
} }
// go build -o bin/soldier aws-experiment-launch/experiment/soldier/main.go // cd harmony-benchmark
// cd bin/ // go build -o soldier ../aws-experiment-launch/experiment/soldier/main.go
// ./soldier --port=xxxx // ./soldier -ip=xx -port=xx
func main() { func main() {
ip := flag.String("ip", "127.0.0.1", "IP of the node.") ip := flag.String("ip", "127.0.0.1", "IP of the node.")
port := flag.String("port", "3000", "port of the node.") port := flag.String("port", "3000", "port of the node.")

Loading…
Cancel
Save