You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
165 lines
5.3 KiB
165 lines
5.3 KiB
3 years ago
|
#! /usr/bin/python3
|
||
|
|
||
|
# This script will provision required keys for Optics Agents.
|
||
|
# If keys have already been provisioned, it will fetch their details.
|
||
|
#
|
||
|
# Keys will be provisioned for each configured environment, currently staging and production.
|
||
|
#
|
||
|
# It requires the following dependencies:
|
||
|
# pip3 install boto3 tabulate asn1tools web3
|
||
|
#
|
||
|
# It requires AWS Credentials to be set in a way that is compatible with the AWS SDK:
|
||
|
# https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html
|
||
|
#
|
||
|
# The User must have access to the following APIs:
|
||
|
# kms:CreateAlias
|
||
|
# kms:CreateKey
|
||
|
# kms:DescribeKey
|
||
|
# kms:GetPublicKey
|
||
|
# kms:ListAliases
|
||
|
|
||
|
import boto3
|
||
|
import json
|
||
|
import logging
|
||
|
from tabulate import tabulate
|
||
|
import asn1tools
|
||
|
from web3.auto import w3
|
||
|
|
||
|
# Logging Config
|
||
|
logging.basicConfig(level=logging.DEBUG)
|
||
|
logging.getLogger('botocore').setLevel(logging.CRITICAL)
|
||
|
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
|
||
|
|
||
|
logger = logging.getLogger("kms_provisioner")
|
||
|
logger.setLevel(logging.DEBUG)
|
||
|
|
||
|
# Agent Keys
|
||
|
required_keys = [
|
||
|
"watcher-signer",
|
||
|
"watcher-attestation",
|
||
|
"updater-signer",
|
||
|
"updater-attestation",
|
||
|
"processor-signer",
|
||
|
"relayer-signer"
|
||
|
]
|
||
|
|
||
|
# nAgentKeys * nEnvironments
|
||
|
environments = [
|
||
|
"staging",
|
||
|
"production"
|
||
|
]
|
||
|
|
||
|
# AWS Region where we should provison keys
|
||
|
region = "us-west-2"
|
||
|
|
||
|
def get_kms_public_key(key_id: str) -> bytes:
|
||
|
client = boto3.client('kms', region_name=region)
|
||
|
logger.info(f"Fetching Public key for {key_id}")
|
||
|
response = client.get_public_key(
|
||
|
KeyId=key_id
|
||
|
)
|
||
|
|
||
|
return response['PublicKey']
|
||
|
|
||
|
def calc_eth_address(pub_key) -> str:
|
||
|
SUBJECT_ASN = '''
|
||
|
Key DEFINITIONS ::= BEGIN
|
||
|
SubjectPublicKeyInfo ::= SEQUENCE {
|
||
|
algorithm AlgorithmIdentifier,
|
||
|
subjectPublicKey BIT STRING
|
||
|
}
|
||
|
AlgorithmIdentifier ::= SEQUENCE {
|
||
|
algorithm OBJECT IDENTIFIER,
|
||
|
parameters ANY DEFINED BY algorithm OPTIONAL
|
||
|
}
|
||
|
END
|
||
|
'''
|
||
|
|
||
|
key = asn1tools.compile_string(SUBJECT_ASN)
|
||
|
key_decoded = key.decode('SubjectPublicKeyInfo', pub_key)
|
||
|
|
||
|
pub_key_raw = key_decoded['subjectPublicKey'][0]
|
||
|
pub_key = pub_key_raw[1:len(pub_key_raw)]
|
||
|
|
||
|
# https://www.oreilly.com/library/view/mastering-ethereum/9781491971932/ch04.html
|
||
|
hex_address = w3.keccak(bytes(pub_key)).hex()
|
||
|
eth_address = '0x{}'.format(hex_address[-40:])
|
||
|
|
||
|
eth_checksum_addr = w3.toChecksumAddress(eth_address)
|
||
|
|
||
|
return eth_checksum_addr
|
||
|
|
||
|
|
||
|
kms = boto3.client('kms', region_name=region)
|
||
|
|
||
|
data_headers = ["Alias Name", "Region", "Key ID", "ARN", "Key Description", "Ethereum Address"]
|
||
|
data_rows = []
|
||
|
|
||
|
# If you have more than 100 aliases, this will break
|
||
|
current_aliases = kms.list_aliases(Limit=100)
|
||
|
|
||
|
logger.debug(f"Fetched {len(current_aliases['Aliases'])} aliases from KMS")
|
||
|
logger.debug(json.dumps(current_aliases, indent=2, default=str))
|
||
|
for environment in environments:
|
||
|
for key in required_keys:
|
||
|
|
||
|
key_name = f"{environment}-{key}"
|
||
|
alias_name = f"alias/{key_name}"
|
||
|
|
||
|
existing_alias = next((alias for alias in current_aliases["Aliases"] if alias["AliasName"] == alias_name), None)
|
||
|
|
||
|
if existing_alias == None:
|
||
|
logger.info(f"No existing alias found for {key_name}, creating new key")
|
||
|
|
||
|
key_response = kms.create_key(
|
||
|
Description=f'{environment} {key}',
|
||
|
KeyUsage='SIGN_VERIFY',
|
||
|
Origin='AWS_KMS',
|
||
|
BypassPolicyLockoutSafetyCheck=False,
|
||
|
CustomerMasterKeySpec="ECC_SECG_P256K1",
|
||
|
Tags=[
|
||
|
{
|
||
|
'TagKey': 'environment',
|
||
|
'TagValue': environment
|
||
|
},
|
||
|
]
|
||
|
)
|
||
|
|
||
|
alias_response = kms.create_alias(
|
||
|
# The alias to create. Aliases must begin with 'alias/'.
|
||
|
AliasName=alias_name,
|
||
|
# The identifier of the CMK whose alias you are creating. You can use the key ID or the Amazon Resource Name (ARN) of the CMK.
|
||
|
TargetKeyId=key_response["KeyMetadata"]["KeyId"],
|
||
|
)
|
||
|
|
||
|
logger.debug(json.dumps(key_response, indent=2, default=str))
|
||
|
logger.debug(json.dumps(alias_response, indent=2, default=str))
|
||
|
|
||
|
|
||
|
key_id = key_response["KeyMetadata"]["KeyId"]
|
||
|
key_arn = key_response["KeyMetadata"]["Arn"]
|
||
|
key_description = key_response["KeyMetadata"]["Description"]
|
||
|
else:
|
||
|
logger.info(f"Existing alias for {key_name}, fetching key.")
|
||
|
|
||
|
key_response = kms.describe_key(
|
||
|
KeyId=existing_alias["TargetKeyId"],
|
||
|
)
|
||
|
|
||
|
key_id = key_response["KeyMetadata"]["KeyId"]
|
||
|
key_arn = key_response["KeyMetadata"]["Arn"]
|
||
|
key_description = key_response["KeyMetadata"]["Description"]
|
||
|
|
||
|
logger.debug(f"Key Id: {key_id}")
|
||
|
logger.debug(f"Key Arn: {key_arn}")
|
||
|
logger.debug(f"Key Description: {key_description}")
|
||
|
|
||
|
# Get the Ethereum Address from the KMS CMK
|
||
|
public_key = get_kms_public_key(key_id)
|
||
|
ethereum_address = calc_eth_address(public_key)
|
||
|
|
||
|
data_rows.append([f'alias/{key_name}', region, key_id, key_arn, key_description, ethereum_address])
|
||
|
|
||
|
|
||
|
# Print out the results of the operation
|
||
|
print(tabulate(data_rows, data_headers, tablefmt="fancy_grid"))
|