diff --git a/aws-experiment-launch/create_and_deploy.py b/aws-experiment-launch/create_and_deploy.py index 9467807f9..ecea07781 100644 --- a/aws-experiment-launch/create_and_deploy.py +++ b/aws-experiment-launch/create_and_deploy.py @@ -8,18 +8,24 @@ from threading import Thread from Queue import Queue import base64 + +class InstanceType: + ON_DEMAND = 1 + SPOT_INSTANCE = 2 + SPOT_FLEET = 3 + + REGION_NAME = 'region_name' REGION_KEY = 'region_key' REGION_SECURITY_GROUP = 'region_security_group' REGION_HUMAN_NAME = 'region_human_name' -INSTANCE_TYPE = 't2.micro' +INSTANCE_TYPE = 't2.small' REGION_AMI = 'region_ami' with open("user-data.sh", "r") as userdata_file: USER_DATA = userdata_file.read() # UserData must be base64 encoded for spot instances. -with open("user-data.sh", "rb") as userdata_file: - USER_DATA_SPOT = base64.b64encode(userdata_file.read()) +USER_DATA_BASE64 = base64.b64encode(USER_DATA) IAM_INSTANCE_PROFILE = 'BenchMarkCodeDeployInstanceProfile' REPO = "simple-rules/harmony-benchmark" @@ -45,7 +51,7 @@ Build (argparse,functions) support for ### CREATE INSTANCES ### -def run_one_region_instances(config, region_number, number_of_instances, isOnDemand=True): +def run_one_region_instances(config, region_number, number_of_instances, instance_type=InstanceType.ON_DEMAND): #todo: explore the use ec2 resource and not client. e.g. create_instances -- Might make for better code. """ e.g. ec2.create_instances @@ -53,12 +59,15 @@ def run_one_region_instances(config, region_number, number_of_instances, isOnDem region_name = config[region_number][REGION_NAME] session = boto3.Session(region_name=region_name) ec2_client = session.client('ec2') - if isOnDemand: + if instance_type == InstanceType.ON_DEMAND: NODE_NAME = create_instances( config, ec2_client, region_number, int(number_of_instances)) print("Created %s in region %s"%(NODE_NAME,region_number)) ##REPLACE ALL print with logger + elif instance_type == InstanceType.SPOT_INSTANCE: + response = request_spot_instances( + config, ec2_client, region_number, int(number_of_instances)) else: - response = request_spots( + response = request_spot_fleet( config, ec2_client, region_number, int(number_of_instances)) return session @@ -104,7 +113,8 @@ def create_instances(config, ec2_client, region_number, number_of_instances): return NODE_NAME -def request_spots(config, ec2_client, region_number, number_of_instances): +def request_spot_instances(config, ec2_client, region_number, number_of_instances): + NODE_NAME = region_number + "-" + NODE_NAME_SUFFIX response = ec2_client.request_spot_instances( # DryRun=True, BlockDurationMinutes=60, @@ -114,7 +124,7 @@ def request_spots(config, ec2_client, region_number, number_of_instances): 'IamInstanceProfile': { 'Name': IAM_INSTANCE_PROFILE }, - 'UserData': USER_DATA_SPOT, + 'UserData': USER_DATA_BASE64, 'ImageId': config[region_number][REGION_AMI], 'InstanceType': INSTANCE_TYPE, 'KeyName': config[region_number][REGION_KEY], @@ -126,6 +136,55 @@ def request_spots(config, ec2_client, region_number, number_of_instances): return response +def request_spot_fleet(config, ec2_client, region_number, number_of_instances): + NODE_NAME = region_number + "-" + NODE_NAME_SUFFIX + # https://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.Client.request_spot_fleet + response = ec2_client.request_spot_fleet( + # DryRun=True, + SpotFleetRequestConfig={ + # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-fleet.html#spot-fleet-allocation-strategy + 'AllocationStrategy': 'diversified', + # 'IamFleetRole': IAM_INSTANCE_PROFILE, // TODO@ricl, create fleet role. + 'LaunchSpecifications': [ + { + 'SecurityGroups': [ + { + 'GroupName': config[region_number][REGION_SECURITY_GROUP] + } + ], + 'IamInstanceProfile': { + 'Name': IAM_INSTANCE_PROFILE + }, + 'ImageId': config[region_number][REGION_AMI], + 'InstanceType': INSTANCE_TYPE, + 'KeyName': config[region_number][REGION_KEY], + 'Placement': { + 'AvailabilityZone': get_one_availability_zone(ec2_client) + }, + 'UserData': USER_DATA, + # 'WeightedCapacity': 123.0, + 'TagSpecifications': [ + { + 'ResourceType': 'instance', + 'Tags': [ + { + 'Key': 'Name', + 'Value': NODE_NAME + }, + ] + } + ] + }, + ], + # 'SpotPrice': 'string', # The maximum price per unit hour that you are willing to pay for a Spot Instance. The default is the On-Demand price. + 'TargetCapacity': 1, + 'OnDemandTargetCapacity': 0, + 'Type': 'maintain', + } + ) + return response + + def get_availability_zones(ec2_client): response = ec2_client.describe_availability_zones() all_zones = [] @@ -344,6 +403,6 @@ if __name__ == "__main__": region_number = region_list[i] number_of_instances = instances_list[i] session = run_one_region_instances( - config, region_number, number_of_instances) + config, region_number, number_of_instances, InstanceType.SPOT_FLEET) results = launch_code_deploy(region_list, commitId) print(results) diff --git a/aws-experiment-launch/experiment/commander/main.go b/aws-experiment-launch/experiment/commander/main.go index eb78dbaad..4efc93ed2 100644 --- a/aws-experiment-launch/experiment/commander/main.go +++ b/aws-experiment-launch/experiment/commander/main.go @@ -10,10 +10,13 @@ import ( ) const ( - message = "init http://localhost:8080/config.txt" StopCharacter = "\r\n\r\n" ) +var ( + configFile *string +) + func SocketClient(addr string) { conn, err := net.Dial("tcp", addr) @@ -23,6 +26,7 @@ func SocketClient(addr string) { log.Fatalln(err) } + message := "init http://localhost:8080/" + *configFile conn.Write([]byte(message)) // conn.Write([]byte(StopCharacter)) log.Printf("Send: %s", message) @@ -33,7 +37,7 @@ func SocketClient(addr string) { } func main() { - configFile := flag.String("config_file", "config.txt", "file containing all ip addresses") + configFile = flag.String("config_file", "test.txt", "file containing all ip addresses") flag.Parse() configs := readConfigFile(*configFile) diff --git a/aws-experiment-launch/experiment/soldier/main.go b/aws-experiment-launch/experiment/soldier/main.go index 14b98229c..01d596032 100644 --- a/aws-experiment-launch/experiment/soldier/main.go +++ b/aws-experiment-launch/experiment/soldier/main.go @@ -24,13 +24,13 @@ var ( localConfig string ) -func SocketServer() { +func socketServer() { listen, err := net.Listen("tcp4", ":"+*port) - defer listen.Close() if err != nil { log.Fatalf("Socket listen port %s failed,%s", *port, err) os.Exit(1) } + defer listen.Close() log.Printf("Begin listen for command on port: %s", *port) for { @@ -61,13 +61,15 @@ ILOOP: case io.EOF: break ILOOP case nil: - log.Println("Receive:", data) + log.Println("Received command", data) if isTransportOver(data) { log.Println("Tranport Over!") break ILOOP } - go handleCommand(data, w) + handleCommand(data, w) + + log.Println("Waiting for new command...") default: log.Fatalf("Receive data failed:%s", err) @@ -99,9 +101,10 @@ func handleCommand(command string, w *bufio.Writer) { func handleInitCommand(args []string, w *bufio.Writer) { log.Println("Init command", args) // create local config file + localConfig = "node_config_" + *port + ".txt" out, err := os.Create(localConfig) if err != nil { - log.Fatal("Failed to create local file") + log.Fatal("Failed to create local file", err) } defer out.Close() @@ -127,28 +130,56 @@ func handleInitCommand(args []string, w *bufio.Writer) { log.Println("Successfully downloaded config") log.Println(string(content)) - runInstance() + run() w.Write([]byte("Successfully init-ed")) w.Flush() } -func runInstance() { +func createLogFolder() string { t := time.Now().Format("20060102-150405") logFolder := "../tmp_log/log-" + t err := os.MkdirAll(logFolder, os.ModePerm) if err != nil { log.Fatal("Failed to create log folder") } - cmdName := "./benchmark" - cmdArgs := []string{"-ip", *ip, "-port", *port, "-config_file", localConfig, "-log_folder", logFolder} - log.Println(cmdName, cmdArgs) - cmdOut, err := exec.Command(cmdName, cmdArgs...).Output() + log.Println("Created log folder", logFolder) + return logFolder +} + +func runCmd(name string, args []string) { + log.Println(name, args) + err := exec.Command(name, args...).Start() if err != nil { log.Fatal("Failed to run command: ", err) } - log.Println(string(cmdOut)) + log.Println("Command running") +} + +func run() { + config := readConfigFile(localConfig) + + myConfig := getMyConfig(*ip, *port, &config) + + log.Println(myConfig) + if myConfig[2] == "client" { + runClient() + } else { + runInstance() + } +} + +func runInstance() { + log.Println("running instance") + logFolder := createLogFolder() + runCmd("./benchmark", []string{"-ip", *ip, "-port", *port, "-config_file", localConfig, "-log_folder", logFolder}) +} + +func runClient() { + log.Println("running client") + logFolder := createLogFolder() + runCmd("./txgen", []string{"-config_file", localConfig, "-log_folder", logFolder}) } func isTransportOver(data string) (over bool) { @@ -156,11 +187,35 @@ func isTransportOver(data string) (over bool) { return } +func readConfigFile(configFile string) [][]string { + file, _ := os.Open(configFile) + fscanner := bufio.NewScanner(file) + + result := [][]string{} + for fscanner.Scan() { + p := strings.Split(fscanner.Text(), " ") + result = append(result, p) + } + return result +} + +func getMyConfig(myIP string, myPort string, config *[][]string) []string { + for _, node := range *config { + ip, port := node[0], node[1] + if ip == myIP && port == myPort { + return node + } + } + return nil +} + +// go build -o bin/soldier aws-experiment-launch/experiment/soldier/main.go +// cd bin/ +// ./soldier --port=xxxx func main() { ip = flag.String("ip", "127.0.0.1", "IP of the node.") port = flag.String("port", "3000", "port of the node.") flag.Parse() - localConfig = "node_config_" + *port + ".txt" - SocketServer() + socketServer() }