Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Migrate to boto3 #754

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions nixops/backends/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def get_type(cls):
def __init__(self, depl, name, id):
MachineState.__init__(self, depl, name, id)
self._conn = None
self._conn_vpc = None
self._conn_route53 = None
self._cached_instance = None

Expand Down Expand Up @@ -253,12 +252,6 @@ def connect(self):
return self._conn


def connect_vpc(self):
if self._conn_vpc:
return self._conn_vpc
self._conn_vpc = nixops.ec2_utils.connect_vpc(self.region, self.access_key_id)
return self._conn_vpc

def connect_route53(self):
if self._conn_route53:
return
Expand All @@ -272,14 +265,16 @@ def connect_route53(self):
def _get_spot_instance_request_by_id(self, request_id, allow_missing=False):
"""Get spot instance request object by id."""
self.connect()
result = self._conn.get_all_spot_instance_requests([request_id])

request_id_filter = {'Name': 'spot-instance-request-id', 'Values': [request_id]}
# TODO: error handling
result = self._conn.meta.client.describe_spot_instance_requests(Filters=[request_id_filter])['SpotInstanceRequests']
if len(result) == 0:
if allow_missing:
return None
raise EC2InstanceDisappeared("Spot instance request ‘{0}’ disappeared!".format(request_id))
return result[0]


def _get_instance(self, instance_id=None, allow_missing=False, update=False):
"""Get instance object for this machine, with caching"""
if not instance_id: instance_id = self.vm_id
Expand Down Expand Up @@ -312,13 +307,12 @@ def _get_instance(self, instance_id=None, allow_missing=False, update=False):
def _get_snapshot_by_id(self, snapshot_id):
"""Get snapshot object by instance id."""
self.connect()
snapshots = self._conn.get_all_snapshots([snapshot_id])
snapshots = [snapshot for snapshot in self._conn.snapshots.filter(Filters=[{'Name': 'snapshot-id',
'Values': [snapshot_id]}])]
if len(snapshots) != 1:
raise Exception("unable to find snapshot ‘{0}’".format(snapshot_id))
return snapshots[0]



def _wait_for_ip(self):
self.log_start("waiting for IP address... ".format(self.name))

Expand Down Expand Up @@ -563,7 +557,9 @@ def _assign_elastic_ip(self, elastic_ipv4, check):
instance = self._get_instance(update=True)
self.log_end("")

addresses = self._conn.get_all_addresses(addresses=[elastic_ipv4])
elastic_ip_filter = {'Name': 'public-ip', 'Values': [elastic_ipv4]}
# TODO: errors
addresses = self._conn.meta.client.describe_addresses(Filters=[elastic_ip_filter])['Addresses']
if addresses[0].instance_id != "" \
and addresses[0].instance_id is not None \
and addresses[0].instance_id != self.vm_id \
Expand All @@ -573,7 +569,8 @@ def _assign_elastic_ip(self, elastic_ipv4, check):
raise Exception("elastic IP ‘{0}’ already in use...".format(elastic_ipv4))
else:
self.log("associating IP address ‘{0}’...".format(elastic_ipv4))
addresses[0].associate(self.vm_id)
self._conn.meta.client.associate_address(AllocationId=addresses[0].allocation_id,
InstanceId=self.vm_id)
self.log_start("waiting for address to be associated with this machine... ")
instance = self._get_instance(update=True)
while True:
Expand All @@ -592,9 +589,12 @@ def _assign_elastic_ip(self, elastic_ipv4, check):
self.ssh_pinged = False

elif self.elastic_ipv4 != None:
addresses = self._conn.get_all_addresses(addresses=[self.elastic_ipv4])
elastic_ip_filter = {'Name': 'public-ip', 'Values': [elastic_ipv4]}
# TODO: errors
addresses = self._conn.meta.client.describe_addresses(Filters=[elastic_ip_filter])['Addresses']
if len(addresses) == 1 and addresses[0].instance_id == self.vm_id:
self.log("disassociating IP address ‘{0}’...".format(self.elastic_ipv4))
# TODO
self._conn.disassociate_address(public_ip=self.elastic_ipv4)
else:
self.log("address ‘{0}’ was not associated with instance ‘{1}’".format(self.elastic_ipv4, self.vm_id))
Expand All @@ -605,13 +605,7 @@ def _assign_elastic_ip(self, elastic_ipv4, check):
self.ssh_pinged = False

def security_groups_to_ids(self, subnetId, groups):
sg_names = filter(lambda g: not g.startswith('sg-'), groups)
if sg_names != [ ] and subnetId != "":
self.connect_vpc()
vpc_id = self._conn_vpc.get_all_subnets([subnetId])[0].vpc_id
groups = map(lambda g: nixops.ec2_utils.name_to_security_group(self._conn, g, vpc_id), groups)

return groups
return nixops.ec2_utils.security_groups_to_ids(self._conn, subnetId, groups)

def _get_network_interfaces(self, defn):
groups = self.security_groups_to_ids(defn.subnet_id, defn.security_group_ids)
Expand Down Expand Up @@ -676,9 +670,9 @@ def create_instance(self, defn, zone, devmap, user_data, ebs_optimized):
self.log_start("waiting for spot instance request ‘{0}’ to be fulfilled... ".format(self.spot_instance_request_id))
while True:
request = self._get_spot_instance_request_by_id(self.spot_instance_request_id)
self.log_continue("[{0}] ".format(request.status.code))
if request.status.code == "fulfilled": break
if request.status.code in {"schedule-expired", "canceled-before-fulfillment", "bad-parameters", "system-error"}:
self.log_continue("[{0}] ".format(request.status_code))
if request.status_code == "fulfilled": break
if request.status_code in {"schedule-expired", "canceled-before-fulfillment", "bad-parameters", "system-error"}:
self.spot_instance_request_id = None
self.log_end("")
raise Exception("spot instance request failed with result ‘{0}’".format(request.status.code))
Expand Down Expand Up @@ -720,7 +714,7 @@ def _cancel_spot_request(self):
while True:
request = self._get_spot_instance_request_by_id(self.spot_instance_request_id, allow_missing=True)
if request is None: break
self.log_continue("[{0}] ".format(request.status.code))
self.log_continue("[{0}] ".format(request.status_code))
if request.instance_id is not None and request.instance_id != self.vm_id:
if self.vm_id is not None:
raise Exception("spot instance request got fulfilled unexpectedly as instance ‘{0}’".format(request.instance_id))
Expand Down
75 changes: 39 additions & 36 deletions nixops/ec2_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
import random
import nixops.util

from boto.exception import EC2ResponseError
from boto.exception import SQSError
from boto.exception import BotoServerError

import botocore
import boto3
from boto.pyami.config import Config

def fetch_aws_secret_key(access_key_id):
Expand Down Expand Up @@ -59,26 +57,25 @@ def ec2_keys_from_env():

return credentials

def _get_available_regions(service_name):
"""Get available regions for given service form offline botocore lib data."""
return boto3.session.Session().get_available_regions(service_name)

def connect(region, access_key_id):
"""Connect to the specified EC2 region using the given access key."""
assert region
(access_key_id, secret_access_key) = fetch_aws_secret_key(access_key_id)
conn = boto.ec2.connect_to_region(
region_name=region, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
if not conn:

# https://stackoverflow.com/a/38464150
# boto3.session.Session().get_available_regions('ec2') -- doesn't make any API calls,
# grabs data from botocore lib data, should we use this or a data from the endpoint?
if region not in _get_available_regions('ec2'):
raise Exception("invalid EC2 region ‘{0}’".format(region))
return conn

def connect_vpc(region, access_key_id):
"""Connect to the specified VPC region using the given access key."""
assert region
(access_key_id, secret_access_key) = fetch_aws_secret_key(access_key_id)
conn = boto.vpc.connect_to_region(
region_name=region, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
if not conn:
raise Exception("invalid VPC region ‘{0}’".format(region))
return conn

conn = boto3.resource('ec2', region_name=region,
aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
return conn

def get_access_key_id():
return os.environ.get('EC2_ACCESS_KEY') or os.environ.get('AWS_ACCESS_KEY_ID')
Expand All @@ -91,7 +88,7 @@ def retry(f, error_codes=[], logger=None):
"""

def handle_exception(e):
if i == num_retries or (error_codes != [] and not e.error_code in error_codes):
if i == num_retries or (error_codes != [] and not e.response['Error']['Code'] in error_codes):
raise e
if logger is not None:
logger.log("got (possibly transient) EC2 error code '{0}': {1}. retrying...".format(e.error_code, e.error_message))
Expand All @@ -104,12 +101,9 @@ def handle_exception(e):

try:
return f()
except EC2ResponseError as e:
handle_exception(e)
except SQSError as e:
handle_exception(e)
except BotoServerError as e:
if e.error_code == "RequestLimitExceeded":
except ClientError as e:
# TODO: research boto3 means of retrying requests
if e.response['Error']['Code'] == "RequestLimitExceeded":
num_retries += 1
else:
handle_exception(e)
Expand All @@ -118,19 +112,18 @@ def handle_exception(e):

time.sleep(next_sleep)


def get_volume_by_id(conn, volume_id, allow_missing=False):
"""Get volume object by volume id."""
try:
volumes = conn.get_all_volumes([volume_id])
volumes = [vol for vol in conn.volumes.filter(Filters=[{'Name': 'volume-id',
'Values': [volume_id]}])]
if len(volumes) != 1:
raise Exception("unable to find volume ‘{0}’".format(volume_id))
return volumes[0]
except boto.exception.EC2ResponseError as e:
if e.error_code != "InvalidVolume.NotFound": raise
except boto3.exceptions.ClientError as e:
if e.response['Error']['Code'] != "InvalidVolume.NotFound": raise
return None


def wait_for_volume_available(conn, volume_id, logger, states=['available']):
"""Wait for an EBS volume to become available."""

Expand All @@ -146,23 +139,33 @@ def check_available():

logger.log_end('')


def name_to_security_group(conn, name, vpc_id):
if not vpc_id or name.startswith('sg-'):
return name

id = None
for sg in conn.get_all_security_groups(filters={'group-name':name, 'vpc-id': vpc_id}):
if sg.name == name:
for sg in conn.security_groups.filter(Filters=[{'Name': 'group-name', 'Values': [name]},
{'Name': 'vpc-id', 'Values': [vpc_id]}]):
if sg.group_name == name:
id = sg.id
return id

raise Exception("could not resolve security group name '{0}' in VPC '{1}'".format(name, vpc_id))

def id_to_security_group_name(conn, sg_id, vpc_id):
name = None
for sg in conn.get_all_security_groups(filters={'group-id':sg_id, 'vpc-id': vpc_id}):
if sg.id == sg_id:
name = sg.name
for sg in conn.security_groups.filter(Filters=[{'Name': 'group-id', 'Values': [sg_id]},
{'Name': 'vpc-id', 'Values': [vpc_id]}]):
if sg.group_id == sg_id:
name = sg.group_name
return name
raise Exception("could not resolve security group id '{0}' in VPC '{1}'".format(sg_id, vpc_id))

def security_groups_to_ids(conn, subnetId, groups):
sg_names = filter(lambda g: not g.startswith('sg-'), groups)
if sg_names != [ ] and subnetId != "":
self.connect()
subnet_id_filter = {'Name': 'subnet-id', 'Values': [vpc_id]}
vpc_id = [vpc.vpc_id for vpc in self._conn.subnets.filter(Filters=[subnet_id_filter]).limit(1)][0]
groups = map(lambda g: nixops.ec2_utils.name_to_security_group(self._conn, g, vpc_id), groups)

return groups
9 changes: 1 addition & 8 deletions nixops/resources/elastic_file_system_mount_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,4 @@ def destroy(self, wipe=False):

def security_groups_to_ids(self, region, access_key_id, subnetId, groups):
conn = nixops.ec2_utils.connect(region, access_key_id)
conn_vpc = nixops.ec2_utils.connect_vpc(region, access_key_id)

sg_names = filter(lambda g: not g.startswith('sg-'), groups)
if sg_names != [ ] and subnetId != "":
vpc_id = conn_vpc.get_all_subnets([subnetId])[0].vpc_id
groups = map(lambda g: nixops.ec2_utils.name_to_security_group(conn, g, vpc_id), groups)

return groups
return nixops.ec2_utils.security_groups_to_ids(set)