increased timeout; doesnt affect deploy just the logging

pull/34/head
alok 7 years ago
commit d77bea30df
  1. 77
      aws-experiment-launch/create_and_deploy.py
  2. 8
      aws-experiment-launch/experiment/commander/main.go
  3. 83
      aws-experiment-launch/experiment/soldier/main.go

@ -8,18 +8,24 @@ from threading import Thread
from Queue import Queue from Queue import Queue
import base64 import base64
class InstanceType:
ON_DEMAND = 1
SPOT_INSTANCE = 2
SPOT_FLEET = 3
REGION_NAME = 'region_name' REGION_NAME = 'region_name'
REGION_KEY = 'region_key' REGION_KEY = 'region_key'
REGION_SECURITY_GROUP = 'region_security_group' REGION_SECURITY_GROUP = 'region_security_group'
REGION_HUMAN_NAME = 'region_human_name' REGION_HUMAN_NAME = 'region_human_name'
INSTANCE_TYPE = 't2.micro' INSTANCE_TYPE = 't2.small'
REGION_AMI = 'region_ami' REGION_AMI = 'region_ami'
with open("user-data.sh", "r") as userdata_file: with open("user-data.sh", "r") as userdata_file:
USER_DATA = userdata_file.read() USER_DATA = userdata_file.read()
# UserData must be base64 encoded for spot instances. # UserData must be base64 encoded for spot instances.
with open("user-data.sh", "rb") as userdata_file: USER_DATA_BASE64 = base64.b64encode(USER_DATA)
USER_DATA_SPOT = base64.b64encode(userdata_file.read())
IAM_INSTANCE_PROFILE = 'BenchMarkCodeDeployInstanceProfile' IAM_INSTANCE_PROFILE = 'BenchMarkCodeDeployInstanceProfile'
REPO = "simple-rules/harmony-benchmark" REPO = "simple-rules/harmony-benchmark"
@ -45,7 +51,7 @@ Build (argparse,functions) support for
### CREATE INSTANCES ### ### CREATE INSTANCES ###
def run_one_region_instances(config, region_number, number_of_instances, isOnDemand=True): def run_one_region_instances(config, region_number, number_of_instances, instance_type=InstanceType.ON_DEMAND):
#todo: explore the use ec2 resource and not client. e.g. create_instances -- Might make for better code. #todo: explore the use ec2 resource and not client. e.g. create_instances -- Might make for better code.
""" """
e.g. ec2.create_instances e.g. ec2.create_instances
@ -53,12 +59,15 @@ def run_one_region_instances(config, region_number, number_of_instances, isOnDem
region_name = config[region_number][REGION_NAME] region_name = config[region_number][REGION_NAME]
session = boto3.Session(region_name=region_name) session = boto3.Session(region_name=region_name)
ec2_client = session.client('ec2') ec2_client = session.client('ec2')
if isOnDemand: if instance_type == InstanceType.ON_DEMAND:
NODE_NAME = create_instances( NODE_NAME = create_instances(
config, ec2_client, region_number, int(number_of_instances)) config, ec2_client, region_number, int(number_of_instances))
print("Created %s in region %s"%(NODE_NAME,region_number)) ##REPLACE ALL print with logger print("Created %s in region %s"%(NODE_NAME,region_number)) ##REPLACE ALL print with logger
elif instance_type == InstanceType.SPOT_INSTANCE:
response = request_spot_instances(
config, ec2_client, region_number, int(number_of_instances))
else: else:
response = request_spots( response = request_spot_fleet(
config, ec2_client, region_number, int(number_of_instances)) config, ec2_client, region_number, int(number_of_instances))
return session return session
@ -104,7 +113,8 @@ def create_instances(config, ec2_client, region_number, number_of_instances):
return NODE_NAME return NODE_NAME
def request_spots(config, ec2_client, region_number, number_of_instances): def request_spot_instances(config, ec2_client, region_number, number_of_instances):
NODE_NAME = region_number + "-" + NODE_NAME_SUFFIX
response = ec2_client.request_spot_instances( response = ec2_client.request_spot_instances(
# DryRun=True, # DryRun=True,
BlockDurationMinutes=60, BlockDurationMinutes=60,
@ -114,7 +124,7 @@ def request_spots(config, ec2_client, region_number, number_of_instances):
'IamInstanceProfile': { 'IamInstanceProfile': {
'Name': IAM_INSTANCE_PROFILE 'Name': IAM_INSTANCE_PROFILE
}, },
'UserData': USER_DATA_SPOT, 'UserData': USER_DATA_BASE64,
'ImageId': config[region_number][REGION_AMI], 'ImageId': config[region_number][REGION_AMI],
'InstanceType': INSTANCE_TYPE, 'InstanceType': INSTANCE_TYPE,
'KeyName': config[region_number][REGION_KEY], 'KeyName': config[region_number][REGION_KEY],
@ -126,6 +136,55 @@ def request_spots(config, ec2_client, region_number, number_of_instances):
return response return response
def request_spot_fleet(config, ec2_client, region_number, number_of_instances):
NODE_NAME = region_number + "-" + NODE_NAME_SUFFIX
# https://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.Client.request_spot_fleet
response = ec2_client.request_spot_fleet(
# DryRun=True,
SpotFleetRequestConfig={
# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-fleet.html#spot-fleet-allocation-strategy
'AllocationStrategy': 'diversified',
# 'IamFleetRole': IAM_INSTANCE_PROFILE, // TODO@ricl, create fleet role.
'LaunchSpecifications': [
{
'SecurityGroups': [
{
'GroupName': config[region_number][REGION_SECURITY_GROUP]
}
],
'IamInstanceProfile': {
'Name': IAM_INSTANCE_PROFILE
},
'ImageId': config[region_number][REGION_AMI],
'InstanceType': INSTANCE_TYPE,
'KeyName': config[region_number][REGION_KEY],
'Placement': {
'AvailabilityZone': get_one_availability_zone(ec2_client)
},
'UserData': USER_DATA,
# 'WeightedCapacity': 123.0,
'TagSpecifications': [
{
'ResourceType': 'instance',
'Tags': [
{
'Key': 'Name',
'Value': NODE_NAME
},
]
}
]
},
],
# 'SpotPrice': 'string', # The maximum price per unit hour that you are willing to pay for a Spot Instance. The default is the On-Demand price.
'TargetCapacity': 1,
'OnDemandTargetCapacity': 0,
'Type': 'maintain',
}
)
return response
def get_availability_zones(ec2_client): def get_availability_zones(ec2_client):
response = ec2_client.describe_availability_zones() response = ec2_client.describe_availability_zones()
all_zones = [] all_zones = []
@ -344,6 +403,6 @@ if __name__ == "__main__":
region_number = region_list[i] region_number = region_list[i]
number_of_instances = instances_list[i] number_of_instances = instances_list[i]
session = run_one_region_instances( session = run_one_region_instances(
config, region_number, number_of_instances) config, region_number, number_of_instances, InstanceType.SPOT_FLEET)
results = launch_code_deploy(region_list, commitId) results = launch_code_deploy(region_list, commitId)
print(results) print(results)

@ -10,10 +10,13 @@ import (
) )
const ( const (
message = "init http://localhost:8080/config.txt"
StopCharacter = "\r\n\r\n" StopCharacter = "\r\n\r\n"
) )
var (
configFile *string
)
func SocketClient(addr string) { func SocketClient(addr string) {
conn, err := net.Dial("tcp", addr) conn, err := net.Dial("tcp", addr)
@ -23,6 +26,7 @@ func SocketClient(addr string) {
log.Fatalln(err) log.Fatalln(err)
} }
message := "init http://localhost:8080/" + *configFile
conn.Write([]byte(message)) conn.Write([]byte(message))
// conn.Write([]byte(StopCharacter)) // conn.Write([]byte(StopCharacter))
log.Printf("Send: %s", message) log.Printf("Send: %s", message)
@ -33,7 +37,7 @@ func SocketClient(addr string) {
} }
func main() { func main() {
configFile := flag.String("config_file", "config.txt", "file containing all ip addresses") configFile = flag.String("config_file", "test.txt", "file containing all ip addresses")
flag.Parse() flag.Parse()
configs := readConfigFile(*configFile) configs := readConfigFile(*configFile)

@ -24,13 +24,13 @@ var (
localConfig string localConfig string
) )
func SocketServer() { func socketServer() {
listen, err := net.Listen("tcp4", ":"+*port) listen, err := net.Listen("tcp4", ":"+*port)
defer listen.Close()
if err != nil { if err != nil {
log.Fatalf("Socket listen port %s failed,%s", *port, err) log.Fatalf("Socket listen port %s failed,%s", *port, err)
os.Exit(1) os.Exit(1)
} }
defer listen.Close()
log.Printf("Begin listen for command on port: %s", *port) log.Printf("Begin listen for command on port: %s", *port)
for { for {
@ -61,13 +61,15 @@ ILOOP:
case io.EOF: case io.EOF:
break ILOOP break ILOOP
case nil: case nil:
log.Println("Receive:", data) log.Println("Received command", data)
if isTransportOver(data) { if isTransportOver(data) {
log.Println("Tranport Over!") log.Println("Tranport Over!")
break ILOOP break ILOOP
} }
go handleCommand(data, w) handleCommand(data, w)
log.Println("Waiting for new command...")
default: default:
log.Fatalf("Receive data failed:%s", err) log.Fatalf("Receive data failed:%s", err)
@ -99,9 +101,10 @@ func handleCommand(command string, w *bufio.Writer) {
func handleInitCommand(args []string, w *bufio.Writer) { func handleInitCommand(args []string, w *bufio.Writer) {
log.Println("Init command", args) log.Println("Init command", args)
// create local config file // create local config file
localConfig = "node_config_" + *port + ".txt"
out, err := os.Create(localConfig) out, err := os.Create(localConfig)
if err != nil { if err != nil {
log.Fatal("Failed to create local file") log.Fatal("Failed to create local file", err)
} }
defer out.Close() defer out.Close()
@ -127,28 +130,56 @@ func handleInitCommand(args []string, w *bufio.Writer) {
log.Println("Successfully downloaded config") log.Println("Successfully downloaded config")
log.Println(string(content)) log.Println(string(content))
runInstance() run()
w.Write([]byte("Successfully init-ed")) w.Write([]byte("Successfully init-ed"))
w.Flush() w.Flush()
} }
func runInstance() { func createLogFolder() string {
t := time.Now().Format("20060102-150405") t := time.Now().Format("20060102-150405")
logFolder := "../tmp_log/log-" + t logFolder := "../tmp_log/log-" + t
err := os.MkdirAll(logFolder, os.ModePerm) err := os.MkdirAll(logFolder, os.ModePerm)
if err != nil { if err != nil {
log.Fatal("Failed to create log folder") log.Fatal("Failed to create log folder")
} }
cmdName := "./benchmark" log.Println("Created log folder", logFolder)
cmdArgs := []string{"-ip", *ip, "-port", *port, "-config_file", localConfig, "-log_folder", logFolder} return logFolder
log.Println(cmdName, cmdArgs) }
cmdOut, err := exec.Command(cmdName, cmdArgs...).Output()
func runCmd(name string, args []string) {
log.Println(name, args)
err := exec.Command(name, args...).Start()
if err != nil { if err != nil {
log.Fatal("Failed to run command: ", err) log.Fatal("Failed to run command: ", err)
} }
log.Println(string(cmdOut)) log.Println("Command running")
}
func run() {
config := readConfigFile(localConfig)
myConfig := getMyConfig(*ip, *port, &config)
log.Println(myConfig)
if myConfig[2] == "client" {
runClient()
} else {
runInstance()
}
}
func runInstance() {
log.Println("running instance")
logFolder := createLogFolder()
runCmd("./benchmark", []string{"-ip", *ip, "-port", *port, "-config_file", localConfig, "-log_folder", logFolder})
}
func runClient() {
log.Println("running client")
logFolder := createLogFolder()
runCmd("./txgen", []string{"-config_file", localConfig, "-log_folder", logFolder})
} }
func isTransportOver(data string) (over bool) { func isTransportOver(data string) (over bool) {
@ -156,11 +187,35 @@ func isTransportOver(data string) (over bool) {
return return
} }
func readConfigFile(configFile string) [][]string {
file, _ := os.Open(configFile)
fscanner := bufio.NewScanner(file)
result := [][]string{}
for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ")
result = append(result, p)
}
return result
}
func getMyConfig(myIP string, myPort string, config *[][]string) []string {
for _, node := range *config {
ip, port := node[0], node[1]
if ip == myIP && port == myPort {
return node
}
}
return nil
}
// go build -o bin/soldier aws-experiment-launch/experiment/soldier/main.go
// cd bin/
// ./soldier --port=xxxx
func main() { func main() {
ip = flag.String("ip", "127.0.0.1", "IP of the node.") ip = flag.String("ip", "127.0.0.1", "IP of the node.")
port = flag.String("port", "3000", "port of the node.") port = flag.String("port", "3000", "port of the node.")
flag.Parse() flag.Parse()
localConfig = "node_config_" + *port + ".txt" socketServer()
SocketServer()
} }

Loading…
Cancel
Save