The core protocol of WoopChain
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
woop/aws-experiment-launch/experiment/soldier/main.go

335 lines
7.4 KiB

package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"harmony-benchmark/aws-experiment-launch/experiment/soldier/s3"
"harmony-benchmark/aws-experiment-launch/experiment/utils"
"harmony-benchmark/configr"
"io"
"io/ioutil"
"log"
"mime/multipart"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
)
type soliderSetting struct {
ip string
port string
}
type sessionInfo struct {
id string
commanderIP string
commanderPort string
localConfigFileName string
logFolder string
}
const (
bucketName = "richard-bucket-test"
logFolderPrefix = "../tmp_log/"
)
var (
setting soliderSetting
globalSession sessionInfo
)
func socketServer() {
soldierPort := "1" + setting.port // the soldier port is "1" + node port
listen, err := net.Listen("tcp4", ":"+soldierPort)
if err != nil {
log.Fatalf("Socket listen port %s failed,%s", soldierPort, err)
os.Exit(1)
}
defer listen.Close()
log.Printf("Begin listen for command on port: %s", soldierPort)
for {
conn, err := listen.Accept()
if err != nil {
log.Fatalln(err)
continue
}
go handler(conn)
}
}
func handler(conn net.Conn) {
defer conn.Close()
var (
buf = make([]byte, 1024)
r = bufio.NewReader(conn)
w = bufio.NewWriter(conn)
)
ILOOP:
for {
n, err := r.Read(buf)
data := string(buf[:n])
switch err {
case io.EOF:
break ILOOP
case nil:
log.Println("Received command", data)
handleCommand(data, w)
log.Println("Waiting for new command...")
default:
log.Fatalf("Receive data failed:%s", err)
return
}
}
}
func handleCommand(command string, w *bufio.Writer) {
args := strings.Split(command, " ")
if len(args) <= 0 {
return
}
switch command := args[0]; command {
case "ping":
{
handlePingCommand(w)
}
case "init":
{
handleInitCommand(args[1:], w)
}
case "kill":
{
handleKillCommand(w)
}
case "log":
{
handleLogCommand(w)
}
case "log2":
{
handleLog2Command(w)
}
}
}
func handleInitCommand(args []string, w *bufio.Writer) {
// init ip port config_file sessionID
log.Println("Init command", args)
// read arguments
ip := args[0]
globalSession.commanderIP = ip
port := args[1]
globalSession.commanderPort = port
configURL := args[2]
sessionID := args[3]
globalSession.id = sessionID
globalSession.logFolder = fmt.Sprintf("%slog-%v", logFolderPrefix, sessionID)
// create local config file
globalSession.localConfigFileName = fmt.Sprintf("node_config_%v_%v.txt", setting.port, globalSession.id)
utils.DownloadFile(globalSession.localConfigFileName, configURL)
log.Println("Successfully downloaded config")
if err := runInstance(); err == nil {
logAndReply(w, "Done init.")
} else {
logAndReply(w, "Failed.")
}
}
func handleKillCommand(w *bufio.Writer) {
log.Println("Kill command")
if err := killPort(setting.port); err == nil {
logAndReply(w, "Done kill.")
} else {
logAndReply(w, "Failed.")
}
}
func killPort(port string) error {
if runtime.GOOS == "windows" {
command := fmt.Sprintf("(Get-NetTCPConnection -LocalPort %s).OwningProcess -Force", port)
return runCmd("Stop-Process", "-Id", command)
} else {
command := fmt.Sprintf("lsof -i tcp:%s | grep LISTEN | awk '{print $2}' | xargs kill -9", port)
return runCmd("bash", "-c", command)
}
}
func handlePingCommand(w *bufio.Writer) {
log.Println("Ping command")
logAndReply(w, "I'm alive")
}
func handleLogCommand(w *bufio.Writer) {
log.Println("Log command")
files, err := ioutil.ReadDir(globalSession.logFolder)
if err != nil {
logAndReply(w, fmt.Sprintf("Failed to read log folder. Error: %s", err.Error()))
return
}
filePaths := make([]string, len(files))
for i, f := range files {
filePaths[i] = fmt.Sprintf("%s/%s", globalSession.logFolder, f.Name())
}
req, err := newUploadFileRequest(
fmt.Sprintf("http://%s:%s/upload", globalSession.commanderIP, globalSession.commanderPort),
"file",
filePaths,
nil)
if err != nil {
logAndReply(w, fmt.Sprintf("Failed to create upload request. Error: %s", err.Error()))
return
}
client := &http.Client{}
_, err = client.Do(req)
if err != nil {
logAndReply(w, fmt.Sprintf("Failed to upload log. Error: %s", err.Error()))
return
}
logAndReply(w, "Upload log done!")
}
// Creates a new file upload http request with optional extra params
func newUploadFileRequest(uri string, paramName string, paths []string, params map[string]string) (*http.Request, error) {
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
for _, path := range paths {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
part, err := writer.CreateFormFile(paramName, filepath.Base(path))
if err != nil {
return nil, err
}
_, err = io.Copy(part, file)
log.Printf(path)
}
for key, val := range params {
_ = writer.WriteField(key, val)
}
err := writer.Close()
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", uri, body)
req.Header.Set("Content-Type", writer.FormDataContentType())
return req, err
}
func logAndReply(w *bufio.Writer, message string) {
log.Println(message)
w.Write([]byte(message))
w.Flush()
}
func handleLog2Command(w *bufio.Writer) {
log.Println("Log command")
files, err := ioutil.ReadDir(globalSession.logFolder)
if err != nil {
logAndReply(w, fmt.Sprintf("Failed to create log folder. Error: %s", err.Error()))
return
}
filePaths := make([]string, len(files))
for i, f := range files {
filePaths[i] = fmt.Sprintf("%s/%s", globalSession.logFolder, f.Name())
}
// TODO: currently only upload the first file.
_, err = s3.UploadFile(bucketName, filePaths[0], strings.Replace(filePaths[0], logFolderPrefix, "", 1))
if err != nil {
logAndReply(w, fmt.Sprintf("Failed to create upload request. Error: %s", err.Error()))
return
}
logAndReply(w, "Upload log done!")
}
func runCmd(name string, args ...string) error {
cmd := exec.Command(name, args...)
if err := cmd.Start(); err != nil {
log.Fatal(err)
return err
} else {
log.Println("Command running", name, args)
go func() {
if err = cmd.Wait(); err != nil {
log.Printf("Command finished with error: %v", err)
} else {
log.Printf("Command finished successfully")
}
}()
return nil
}
}
func runInstance() error {
config, _ := configr.ReadConfigFile(globalSession.localConfigFileName)
myConfig := getMyConfig(setting.ip, setting.port, &config)
os.MkdirAll(globalSession.logFolder, os.ModePerm)
if myConfig[2] == "client" {
return runClient()
} else {
return runNode()
}
}
func runNode() error {
log.Println("running instance")
return runCmd("./benchmark", "-ip", setting.ip, "-port", setting.port, "-config_file", globalSession.localConfigFileName, "-log_folder", globalSession.logFolder)
}
func runClient() error {
log.Println("running client")
return runCmd("./txgen", "-config_file", globalSession.localConfigFileName, "-log_folder", globalSession.logFolder)
}
func getMyConfig(myIP string, myPort string, config *[][]string) []string {
for _, node := range *config {
ip, port := node[0], node[1]
if ip == myIP && port == myPort {
return node
}
}
return nil
}
// cd harmony-benchmark
// go build -o soldier ../aws-experiment-launch/experiment/soldier/main.go
// ./soldier -ip=xx -port=xx
func main() {
ip := flag.String("ip", "127.0.0.1", "IP of the node.")
port := flag.String("port", "9000", "port of the node.")
flag.Parse()
setting.ip = *ip
setting.port = *port
socketServer()
}