diff --git a/aws-experiment-launch/create_and_deploy_minh.py b/aws-experiment-launch/create_and_deploy_minh.py index 3f62fe3e6..f0ddb24a2 100644 --- a/aws-experiment-launch/create_and_deploy_minh.py +++ b/aws-experiment-launch/create_and_deploy_minh.py @@ -8,19 +8,14 @@ from threading import Thread from Queue import Queue import base64 +from utils import utils + class InstanceResource: ON_DEMAND = 1 SPOT_INSTANCE = 2 SPOT_FLEET = 3 - -REGION_NAME = 'region_name' -REGION_KEY = 'region_key' -REGION_SECURITY_GROUP = 'region_security_group' -REGION_HUMAN_NAME = 'region_human_name' -INSTANCE_TYPE = 't2.small' -REGION_AMI = 'region_ami' with open("user-data.sh", "r") as userdata_file: USER_DATA = userdata_file.read() @@ -36,350 +31,49 @@ CURRENT_SESSION = datetime.datetime.fromtimestamp( PLACEMENT_GROUP = "PLACEMENT-" + CURRENT_SESSION NODE_NAME_SUFFIX = "NODE-" + CURRENT_SESSION -""" -TODO: - -save NODE to disk, so that you can selectively only run deploy (not recreate instances). -Right now all instances have "NODE" so this has uninted consequences of running on instances that were previous created. -Build (argparse,functions) support for -1. run only create instance (multiple times) -2. run only codedeploy (multiple times) -3. run create instance followed by codedeploy - -""" - -### CREATE INSTANCES ### - - def run_one_region_instances(config, region_number, number_of_instances, instance_resource=InstanceResource.ON_DEMAND): - #todo: explore the use ec2 resource and not client. e.g. create_instances -- Might make for better code. - """ - e.g. ec2.create_instances - """ - region_name = config[region_number][REGION_NAME] + region_name = config[region_number][utils.REGION_NAME] session = boto3.Session(region_name=region_name) ec2_client = session.client('ec2') + print utils.get_one_availability_zone(ec2_client) if instance_resource == InstanceResource.ON_DEMAND: - NODE_NAME = create_instances( - config, ec2_client, region_number, int(number_of_instances)) - print("Created %s in region %s"%(NODE_NAME,region_number)) ##REPLACE ALL print with logger - elif instance_resource == InstanceResource.SPOT_INSTANCE: - response = request_spot_instances( + response, node_name_tag = create_instances( config, ec2_client, region_number, int(number_of_instances)) + print("Created %s in region %s" % (node_name_tag, region_number)) else: - response = request_spot_fleet( - config, ec2_client, region_number, int(number_of_instances)) - return session + return None, node_name_tag + return response, node_name_tag def create_instances(config, ec2_client, region_number, number_of_instances): - NODE_NAME = region_number + "-" + NODE_NAME_SUFFIX + node_name_tag = region_number + "-" + NODE_NAME_SUFFIX response = ec2_client.run_instances( MinCount=number_of_instances, MaxCount=number_of_instances, - ImageId=config[region_number][REGION_AMI], + ImageId=config[region_number][utils.REGION_AMI], Placement={ - 'AvailabilityZone': get_one_availability_zone(ec2_client), + 'AvailabilityZone': utils.get_one_availability_zone(ec2_client), }, - SecurityGroups=[config[region_number][REGION_SECURITY_GROUP]], + SecurityGroups=[config[region_number][utils.REGION_SECURITY_GROUP]], IamInstanceProfile={ 'Name': IAM_INSTANCE_PROFILE }, - KeyName=config[region_number][REGION_KEY], + KeyName=config[region_number][utils.REGION_KEY], UserData=USER_DATA, - InstanceType=INSTANCE_TYPE, + InstanceType=utils.INSTANCE_TYPE, TagSpecifications=[ { 'ResourceType': 'instance', 'Tags': [ { 'Key': 'Name', - 'Value': NODE_NAME + 'Value': node_name_tag }, ] }, ], - # We can also request spot instances this way but this way will block the - # process until spot requests are fulfilled, otherwise it will throw exception - # after 4 failed re-try. - # InstanceMarketOptions= { - # 'MarketType': 'spot', - # 'SpotOptions': { - # 'SpotInstanceType': 'one-time', - # 'BlockDurationMinutes': 60, - # } - # } - ) - return NODE_NAME - - -def request_spot_instances(config, ec2_client, region_number, number_of_instances): - NODE_NAME = region_number + "-" + NODE_NAME_SUFFIX - response = ec2_client.request_spot_instances( - # DryRun=True, - BlockDurationMinutes=60, - InstanceCount=number_of_instances, - LaunchSpecification={ - 'SecurityGroups': [config[region_number][REGION_SECURITY_GROUP]], - 'IamInstanceProfile': { - 'Name': IAM_INSTANCE_PROFILE - }, - 'UserData': USER_DATA_BASE64, - 'ImageId': config[region_number][REGION_AMI], - 'InstanceType': INSTANCE_TYPE, - 'KeyName': config[region_number][REGION_KEY], - 'Placement': { - 'AvailabilityZone': get_one_availability_zone(ec2_client) - } - } - ) - return response - - -def request_spot_fleet(config, ec2_client, region_number, number_of_instances): - NODE_NAME = region_number + "-" + NODE_NAME_SUFFIX - # https://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.Client.request_spot_fleet - response = ec2_client.request_spot_fleet( - # DryRun=True, - SpotFleetRequestConfig={ - # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-fleet.html#spot-fleet-allocation-strategy - 'AllocationStrategy': 'diversified', - # 'IamFleetRole': IAM_INSTANCE_PROFILE, // TODO@ricl, create fleet role. - 'LaunchSpecifications': [ - { - 'SecurityGroups': [ - { - 'GroupName': config[region_number][REGION_SECURITY_GROUP] - } - ], - 'IamInstanceProfile': { - 'Name': IAM_INSTANCE_PROFILE - }, - 'ImageId': config[region_number][REGION_AMI], - 'InstanceType': INSTANCE_TYPE, - 'KeyName': config[region_number][REGION_KEY], - 'Placement': { - 'AvailabilityZone': get_one_availability_zone(ec2_client) - }, - 'UserData': USER_DATA, - # 'WeightedCapacity': 123.0, - 'TagSpecifications': [ - { - 'ResourceType': 'instance', - 'Tags': [ - { - 'Key': 'Name', - 'Value': NODE_NAME - }, - ] - } - ] - }, - ], - # 'SpotPrice': 'string', # The maximum price per unit hour that you are willing to pay for a Spot Instance. The default is the On-Demand price. - 'TargetCapacity': 1, - 'OnDemandTargetCapacity': 0, - 'Type': 'maintain', - } - ) - return response - - -def get_availability_zones(ec2_client): - response = ec2_client.describe_availability_zones() - all_zones = [] - if response.get('AvailabilityZones', None): - region_info = response.get('AvailabilityZones') - for info in region_info: - if info['State'] == 'available': - all_zones.append(info['ZoneName']) - return all_zones - - -def get_one_availability_zone(ec2_client): - all_zones = get_availability_zones(ec2_client) - if len(all_zones) > 0: - return all_zones[0] - else: - print("No availability zone for this region") - sys.exit() - -#### CODEDEPLOY ### - - -def run_one_region_codedeploy(region_number, commitId): - #todo: explore the use ec2 resource and not client. e.g. create_instances -- Might make for better code. - """ - for getting instance ids:--- - ec2 = boto3.resource('ec2', region_name=region_name]) - result = ec2.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]) - for instance in result: - instances.append(instance.id) - - for getting public ips : -- - ec2 = boto3.resource('ec2') - instance - """ - region_name = config[region_number][REGION_NAME] - NODE_NAME = region_number + "-" + NODE_NAME_SUFFIX - session = boto3.Session(region_name=region_name) - ec2_client = session.client('ec2') - filters = [{'Name': 'tag:Name','Values': [NODE_NAME]}] - instance_ids = get_instance_ids(ec2_client.describe_instances(Filters=filters)) - - print("Number of instances: %d" % len(instance_ids)) - - print("Waiting for all %d instances in region %s to start running"%(len(instance_ids),region_number)) - waiter = ec2_client.get_waiter('instance_running') - waiter.wait(InstanceIds=instance_ids) - - print("Waiting for all %d instances in region %s to be INSTANCE STATUS OK"%(len(instance_ids),region_number)) - waiter = ec2_client.get_waiter('instance_status_ok') - waiter.wait(InstanceIds=instance_ids) - - print("Waiting for all %d instances in region %s to be SYSTEM STATUS OK"%(len(instance_ids),region_number)) - waiter = ec2_client.get_waiter('system_status_ok') - waiter.wait(InstanceIds=instance_ids) - - codedeploy = session.client('codedeploy') - application_name = APPLICATION_NAME - deployment_group = APPLICATION_NAME + "-" + str(commitId)[6] + "-" + CURRENT_SESSION - repo = REPO - - print("Setting up to deploy commitId %s on region %s"%(commitId,region_number)) - response = get_application(codedeploy, application_name) - deployment_group = get_deployment_group( - codedeploy, region_number, application_name, deployment_group) - depId = deploy(codedeploy, application_name, - deployment_group, repo, commitId) - return region_number, depId - - -def get_deployment_group(codedeploy, region_number,application_name, deployment_group): - NODE_NAME = region_number + "-" + NODE_NAME_SUFFIX - response = codedeploy.create_deployment_group( - applicationName=application_name, - deploymentGroupName=deployment_group, - deploymentConfigName='CodeDeployDefault.AllAtOnce', - serviceRoleArn='arn:aws:iam::656503231766:role/BenchMarkCodeDeployServiceRole', - deploymentStyle={ - 'deploymentType': 'IN_PLACE', - 'deploymentOption': 'WITHOUT_TRAFFIC_CONTROL' - }, - ec2TagFilters = [ - { - 'Key': 'Name', - 'Value': NODE_NAME, - 'Type': 'KEY_AND_VALUE' - } - ] ) - return deployment_group - - -def get_application(codedeploy, application_name): - response = codedeploy.list_applications() - if application_name in response['applications']: - return response - else: - response = codedeploy.create_application( - applicationName=application_name, - computePlatform='Server' - ) - return response - - -def deploy(codedeploy, application_name, deployment_group, repo, commitId): - """Deploy new code at specified revision to instance. - - arguments: - - repo: GitHub repository path from which to get the code - - commitId: commit ID to be deployed - - wait: wait until the CodeDeploy finishes - """ - print("Launching CodeDeploy with commit " + commitId) - res = codedeploy.create_deployment( - applicationName=application_name, - deploymentGroupName=deployment_group, - deploymentConfigName='CodeDeployDefault.AllAtOnce', - description='benchmark experiments', - revision={ - 'revisionType': 'GitHub', - 'gitHubLocation': { - 'repository': repo, - 'commitId': commitId, - } - } - ) - depId = res["deploymentId"] - print("Deployment ID: " + depId) - # The deployment is launched at this point, so exit unless asked to wait - # until it finishes - info = {'status': 'Created'} - start = time.time() - while info['status'] not in ('Succeeded', 'Failed', 'Stopped',) and (time.time() - start < 600.0): - info = codedeploy.get_deployment(deploymentId=depId)['deploymentInfo'] - print(info['status']) - time.sleep(15) - if info['status'] == 'Succeeded': - print("\nDeploy Succeeded") - return depId - else: - print("\nDeploy Failed") - print(info) - return depId - -def run_one_region_codedeploy_wrapper(region_number, commitId, queue): - region_number, depId = run_one_region_codedeploy(region_number, commitId) - queue.put((region_number, depId)) - -def launch_code_deploy(region_list, commitId): - queue = Queue() - jobs = [] - for i in range(len(region_list)): - region_number = region_list[i] - my_thread = Thread(target=run_one_region_codedeploy_wrapper, args=( - region_number, commitId, queue)) - my_thread.start() - jobs.append(my_thread) - for my_thread in jobs: - my_thread.join() - results = [queue.get() for job in jobs] - return results - -##### UTILS #### - - -def get_instance_ids(describe_instances_response): - instance_ids = [] - for reservation in describe_instances_response["Reservations"]: - for instance in reservation["Instances"]: - instance_ids.append(instance["InstanceId"]) - return instance_ids - -def read_configuration_file(filename): - config = {} - with open(filename, 'r') as f: - for myline in f: - mylist = myline.strip().split(',') - region_num = mylist[0] - config[region_num] = {} - config[region_num][REGION_NAME] = mylist[1] - config[region_num][REGION_KEY] = mylist[2] - config[region_num][REGION_SECURITY_GROUP] = mylist[3] - config[region_num][REGION_HUMAN_NAME] = mylist[4] - config[region_num][REGION_AMI] = mylist[5] - return config - -def get_commitId(commitId): - if commitId is None: - commitId = run("git rev-list --max-count=1 HEAD", - hide=True).stdout.strip() - print("Got newest commitId as " + commitId) - return commitId - -##### UTILS #### - + return response, node_name_tag if __name__ == "__main__": parser = argparse.ArgumentParser( @@ -390,20 +84,19 @@ if __name__ == "__main__": default='1', help='number of instances') parser.add_argument('--configuration', type=str, dest='config', default='configuration.txt') - parser.add_argument('--commitId', type=str, dest='commitId', - default='1f7e6e7ca7cf1c1190cedec10e791c01a29971cf') args = parser.parse_args() - config = read_configuration_file(args.config) + config = utils.read_region_config(args.config) region_list = args.regions.split(',') instances_list = args.numInstances.split(',') - assert len(region_list) == len(instances_list), "number of regions: %d != number of instances per region: %d" % ( - len(region_list), len(instances_list)) - commitId = args.commitId + assert len(region_list) == len(instances_list), "number of regions: %d != number of instances per region: %d" % (len(region_list), len(instances_list)) + for i in range(len(region_list)): region_number = region_list[i] number_of_instances = instances_list[i] - session = run_one_region_instances( + response, node_name_tag = run_one_region_instances( config, region_number, number_of_instances, InstanceResource.ON_DEMAND) + if response: + print utils.get_ip_list(response) # Enable the code below later. # results = launch_code_deploy(region_list, commitId)