From 553d8cbf46abcb0d1daed6777c4d35a5afe8b86a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gon=C3=A9ri=20Le=20Bouder?= Date: Tue, 11 Oct 2022 17:39:17 -0400 Subject: [PATCH] ec2_ami: unit-test coverage Refactoring and unit-test coverage of the `ec2_ami`. - Break up the long `create_image()`, `deregister_image()`, `update_image()` - Use classes and static methods to group the functions - Use more Pythonic expression when possible - Handle internal failures with exception to simplify the error managment --- .../fragments/ec2_ami_test-coverage.yaml | 3 + plugins/modules/ec2_ami.py | 598 ++++++++++-------- tests/unit/plugins/modules/test_ec2_ami.py | 79 ++- 3 files changed, 429 insertions(+), 251 deletions(-) create mode 100644 changelogs/fragments/ec2_ami_test-coverage.yaml diff --git a/changelogs/fragments/ec2_ami_test-coverage.yaml b/changelogs/fragments/ec2_ami_test-coverage.yaml new file mode 100644 index 00000000000..524f027f91c --- /dev/null +++ b/changelogs/fragments/ec2_ami_test-coverage.yaml @@ -0,0 +1,3 @@ +--- +minor_changes: +- "ec2_ami - Extend the unit-test coverage of the module (https://github.com/ansible-collections/amazon.aws/pull/1159)." diff --git a/plugins/modules/ec2_ami.py b/plugins/modules/ec2_ami.py index 2e9bdf4aa38..9a73957fd4b 100644 --- a/plugins/modules/ec2_ami.py +++ b/plugins/modules/ec2_ami.py @@ -406,8 +406,15 @@ from ansible_collections.amazon.aws.plugins.module_utils.waiters import get_waiter +class Ec2AmiFailure(Exception): + def __init__(self, message=None, original_e=None): + super().__init__(message) + self.original_e = original_e + self.message = message + + def get_block_device_mapping(image): - bdm_dict = dict() + bdm_dict = {} if image is not None and image.get('block_device_mappings') is not None: bdm = image.get('block_device_mappings') for device in bdm: @@ -458,71 +465,312 @@ def get_ami_info(camel_image): ) -def create_image(module, connection): - instance_id = module.params.get('instance_id') - name = module.params.get('name') - wait = module.params.get('wait') - wait_timeout = module.params.get('wait_timeout') - description = module.params.get('description') - architecture = module.params.get('architecture') - kernel_id = module.params.get('kernel_id') - root_device_name = module.params.get('root_device_name') - virtualization_type = module.params.get('virtualization_type') - no_reboot = module.params.get('no_reboot') - device_mapping = module.params.get('device_mapping') - tags = module.params.get('tags') - launch_permissions = module.params.get('launch_permissions') - image_location = module.params.get('image_location') - enhanced_networking = module.params.get('enhanced_networking') - billing_products = module.params.get('billing_products') - ramdisk_id = module.params.get('ramdisk_id') - sriov_net_support = module.params.get('sriov_net_support') - boot_mode = module.params.get('boot_mode') - tpm_support = module.params.get('tpm_support') - uefi_data = module.params.get('uefi_data') - - if tpm_support and boot_mode != 'uefi': - module.fail_json(msg="To specify 'tpm_support', 'boot_mode' must be 'uefi'.") - - if module.check_mode: - image = connection.describe_images(Filters=[{'Name': 'name', 'Values': [str(name)]}]) +def get_image_by_id(connection, image_id): + try: + images_response = connection.describe_images(aws_retry=True, ImageIds=[image_id]) + except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: + raise Ec2AmiFailure("Error retrieving image by image_id", e) + + images = images_response.get('Images', []) + image_counter = len(images) + if image_counter == 0: + return None + + if image_counter > 1: + raise Ec2AmiFailure("Invalid number of instances (%s) found for image_id: %s." % (str(len(images)), image_id)) + + result = images[0] + try: + result['LaunchPermissions'] = connection.describe_image_attribute(aws_retry=True, Attribute='launchPermission', + ImageId=image_id)['LaunchPermissions'] + result['ProductCodes'] = connection.describe_image_attribute(aws_retry=True, Attribute='productCodes', + ImageId=image_id)['ProductCodes'] + except is_boto3_error_code('InvalidAMIID.Unavailable'): + pass + except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: # pylint: disable=duplicate-except + raise Ec2AmiFailure("Error retrieving image attributes for image %s" % image_id, e) + return result + + +def rename_item_if_exists(dict_object, attribute, new_attribute, child_node=None, attribute_type=None): + new_item = dict_object.get(attribute) + if new_item is not None: + if attribute_type is not None: + new_item = attribute_type(new_item) + if child_node is None: + dict_object[new_attribute] = new_item + else: + dict_object[child_node][new_attribute] = new_item + dict_object.pop(attribute) + return dict_object + + +class DeregisterImage: + + @staticmethod + def do_check_mode(module, connection): + image_id = module.params.get('image_id') + image = get_image_by_id(connection, image_id) + + if image is None: + module.exit_json(changed=False) + + if 'ImageId' in image: + module.exit_json(changed=True, msg='Would have deregistered AMI if not in check mode.') + else: + module.exit_json(msg="Image %s has already been deregistered." % image_id, changed=False) + + @staticmethod + def defer_purge_snapshots(image): + def purge_snapshots(connection): + try: + for mapping in image.get('BlockDeviceMappings') or []: + snapshot_id = mapping.get('Ebs', {}).get('SnapshotId') + if snapshot_id is None: + continue + connection.delete_snapshot(aws_retry=True, SnapshotId=snapshot_id) + yield snapshot_id + except is_boto3_error_code('InvalidSnapshot.NotFound'): + pass + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: # pylint: disable=duplicate-except + raise Ec2AmiFailure('Failed to delete snapshot.', e) + return purge_snapshots + + @staticmethod + def timeout(connection, image_id, wait_timeout): + image = get_image_by_id(connection, image_id) + wait_timeout = time.time() + wait_timeout + + while wait_timeout > time.time() and image is not None: + image = get_image_by_id(connection, image_id) + time.sleep(3) + + if wait_timeout <= time.time(): + raise Ec2AmiFailure("Timed out waiting for image to be deregistered.") + + @classmethod + def do(cls, module, connection): + '''Entry point to deregister an image''' + image_id = module.params.get('image_id') + delete_snapshot = module.params.get('delete_snapshot') + wait = module.params.get('wait') + wait_timeout = module.params.get('wait_timeout') + image = get_image_by_id(connection, image_id) + + if image is None: + module.exit_json(changed=False) + + # Get all associated snapshot ids before deregistering image otherwise this information becomes unavailable. + purge_snapshots = cls.defer_purge_snapshots(image) + + # When trying to re-deregister an already deregistered image it doesn't raise an exception, it just returns an object without image attributes. + if 'ImageId' in image: + try: + connection.deregister_image(aws_retry=True, ImageId=image_id) + except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: + raise Ec2AmiFailure("Error deregistering image", e) + else: + module.exit_json(msg="Image %s has already been deregistered." % image_id, changed=False) + + if wait: + cls.timeout(connection, image_id, wait_timeout) + + exit_params = {'msg': "AMI deregister operation complete.", 'changed': True} + + if delete_snapshot: + exit_params['snapshots_deleted'] = list(purge_snapshots(connection)) + + module.exit_json(**exit_params) + + +class UpdateImage: + @staticmethod + def set_launch_permission(connection, image, launch_permissions, check_mode): + if launch_permissions is None: + return False + + current_permissions = image['LaunchPermissions'] + + current_users = set(permission['UserId'] for permission in current_permissions if 'UserId' in permission) + desired_users = set(str(user_id) for user_id in launch_permissions.get('user_ids', [])) + current_groups = set(permission['Group'] for permission in current_permissions if 'Group' in permission) + desired_groups = set(launch_permissions.get('group_names', [])) + + to_add_users = desired_users - current_users + to_remove_users = current_users - desired_users + to_add_groups = desired_groups - current_groups + to_remove_groups = current_groups - desired_groups + + to_add = [dict(Group=group) for group in to_add_groups] + [dict(UserId=user_id) for user_id in to_add_users] + to_remove = [dict(Group=group) for group in to_remove_groups] + [dict(UserId=user_id) for user_id in to_remove_users] + + if not (to_add or to_remove): + return False + + try: + if not check_mode: + connection.modify_image_attribute(aws_retry=True, + ImageId=image["ImageId"], Attribute='launchPermission', + LaunchPermission=dict(Add=to_add, Remove=to_remove)) + changed = True + except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: + raise Ec2AmiFailure("Error updating launch permissions of image %s" % image["ImageId"], e) + return changed + + @staticmethod + def set_tags(connection, module, image_id, tags, purge_tags): + if not tags: + return False + + return ensure_ec2_tags(connection, module, image_id, tags=tags, purge_tags=purge_tags) + + @staticmethod + def set_description(connection, module, image, description): + if not description: + return False + + if description == image['Description']: + return False + + try: + if not module.check_mode: + connection.modify_image_attribute(aws_retry=True, Attribute='Description ', ImageId=image["ImageId"], Description=dict(Value=description)) + return True + except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: + raise Ec2AmiFailure("Error setting description for image %s" % image["ImageId"], e) + + @classmethod + def do(cls, module, connection, image_id): + """Entry point to update an image""" + launch_permissions = module.params.get('launch_permissions') + image = get_image_by_id(connection, image_id) + if image is None: + raise Ec2AmiFailure("Image %s does not exist" % image_id) + + changed = False + changed |= cls.set_launch_permission(connection, image, launch_permissions, module.check_mode) + changed |= cls.set_tags(connection, module, image_id, module.params['tags'], module.params['purge_tags']) + changed |= cls.set_description(connection, module, image, module.params['description']) + + if changed and module.check_mode: + module.exit_json(changed=True, msg='Would have updated AMI if not in check mode.') + elif changed: + module.exit_json(msg="AMI updated.", changed=True, + **get_ami_info(get_image_by_id(connection, image_id))) + else: + module.exit_json(msg="AMI not updated.", changed=False, + **get_ami_info(image)) + + +class CreateImage: + + @staticmethod + def do_check_mode(module, connection): + image = connection.describe_images(Filters=[{'Name': 'name', 'Values': [str(module.params["name"])]}]) if not image['Images']: module.exit_json(changed=True, msg='Would have created a AMI if not in check mode.') else: module.exit_json(changed=False, msg='Error registering image: AMI name is already in use by another AMI') - try: + @staticmethod + def wait(connection, wait_timeout, image_id): + if not wait_timeout: + return + + delay = 15 + max_attempts = wait_timeout // delay + waiter = get_waiter(connection, 'image_available') + waiter.wait(ImageIds=[image_id], WaiterConfig={"Delay": delay, "MaxAttempts": max_attempts}) + + @staticmethod + def set_tags(connection, module, tags, image_id): + if not tags: + return + + image_info = get_image_by_id(connection, image_id) + add_ec2_tags(connection, module, image_id, module.params["tags"]) + if image_info and image_info.get('BlockDeviceMappings'): + for mapping in image_info.get('BlockDeviceMappings'): + # We can only tag Ebs volumes + if 'Ebs' not in mapping: + continue + add_ec2_tags(connection, module, mapping.get('Ebs').get('SnapshotId'), tags) + + @staticmethod + def set_launch_permissions(connection, launch_permissions, image_id): + if not launch_permissions: + return + + try: + params = {"Attribute": 'LaunchPermission', "ImageId": image_id, "LaunchPermission": {"Add": []}} + for group_name in launch_permissions.get('group_names', []): + params['LaunchPermission']['Add'].append(dict(Group=group_name)) + for user_id in launch_permissions.get('user_ids', []): + params['LaunchPermission']['Add'].append(dict(UserId=str(user_id))) + if params['LaunchPermission']['Add']: + connection.modify_image_attribute(aws_retry=True, **params) + except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: + raise Ec2AmiFailure("Error setting launch permissions for image %s" % image_id, e) + + @staticmethod + def create_or_register(connection, create_image_parameters): + create_from_instance = "InstanceId" in create_image_parameters + func = connection.create_image if create_from_instance else connection.register_image + return func + + @staticmethod + def build_create_image_parameters(**kwargs): + instance_id = kwargs.get('instance_id') + name = kwargs.get('name') + description = kwargs.get('description') + architecture = kwargs.get('architecture') + kernel_id = kwargs.get('kernel_id') + root_device_name = kwargs.get('root_device_name') + virtualization_type = kwargs.get('virtualization_type') + no_reboot = kwargs.get('no_reboot') + device_mapping = kwargs.get('device_mapping') or [] + tags = kwargs.get('tags') + image_location = kwargs.get('image_location') + enhanced_networking = kwargs.get('enhanced_networking') + billing_products = kwargs.get('billing_products') + ramdisk_id = kwargs.get('ramdisk_id') + sriov_net_support = kwargs.get('sriov_net_support') + boot_mode = kwargs.get('boot_mode') + tpm_support = kwargs.get('tpm_support') + uefi_data = kwargs.get('uefi_data') + + if tpm_support and boot_mode != 'uefi': + raise Ec2AmiFailure(message="To specify 'tpm_support', 'boot_mode' must be 'uefi'.") + params = { 'Name': name, 'Description': description } - block_device_mapping = None # Remove empty values injected by using options - if device_mapping: - block_device_mapping = [] - for device in device_mapping: - device = dict((k, v) for k, v in device.items() if v is not None) - device['Ebs'] = {} - device = rename_item_if_exists(device, 'device_name', 'DeviceName') - device = rename_item_if_exists(device, 'virtual_name', 'VirtualName') - device = rename_item_if_exists(device, 'no_device', 'NoDevice') - device = rename_item_if_exists(device, 'volume_type', 'VolumeType', 'Ebs') - device = rename_item_if_exists(device, 'snapshot_id', 'SnapshotId', 'Ebs') - device = rename_item_if_exists(device, 'delete_on_termination', 'DeleteOnTermination', 'Ebs') - device = rename_item_if_exists(device, 'size', 'VolumeSize', 'Ebs', attribute_type=int) - device = rename_item_if_exists(device, 'volume_size', 'VolumeSize', 'Ebs', attribute_type=int) - device = rename_item_if_exists(device, 'iops', 'Iops', 'Ebs') - device = rename_item_if_exists(device, 'encrypted', 'Encrypted', 'Ebs') - - # The NoDevice parameter in Boto3 is a string. Empty string omits the device from block device mapping - # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.create_image - if 'NoDevice' in device: - if device['NoDevice'] is True: - device['NoDevice'] = "" - else: - del device['NoDevice'] - block_device_mapping.append(device) + block_device_mapping = [] + for device in device_mapping: + device = {k: v for k, v in device.items() if v is not None} + device['Ebs'] = {} + device = rename_item_if_exists(device, 'device_name', 'DeviceName') + device = rename_item_if_exists(device, 'virtual_name', 'VirtualName') + device = rename_item_if_exists(device, 'no_device', 'NoDevice') + device = rename_item_if_exists(device, 'volume_type', 'VolumeType', 'Ebs') + device = rename_item_if_exists(device, 'snapshot_id', 'SnapshotId', 'Ebs') + device = rename_item_if_exists(device, 'delete_on_termination', 'DeleteOnTermination', 'Ebs') + device = rename_item_if_exists(device, 'size', 'VolumeSize', 'Ebs', attribute_type=int) + device = rename_item_if_exists(device, 'volume_size', 'VolumeSize', 'Ebs', attribute_type=int) + device = rename_item_if_exists(device, 'iops', 'Iops', 'Ebs') + device = rename_item_if_exists(device, 'encrypted', 'Encrypted', 'Ebs') + + # The NoDevice parameter in Boto3 is a string. Empty string omits the device from block device mapping + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.create_image + if 'NoDevice' in device: + if device['NoDevice'] is True: + device['NoDevice'] = "" + else: + del device['NoDevice'] + block_device_mapping.append(device) if block_device_mapping: params['BlockDeviceMappings'] = block_device_mapping if instance_id: @@ -531,7 +779,6 @@ def create_image(module, connection): tag_spec = boto3_tag_specifications(tags, types=['image', 'snapshot']) if tag_spec: params['TagSpecifications'] = tag_spec - image_id = connection.create_image(aws_retry=True, **params).get('ImageId') else: if architecture: params['Architecture'] = architecture @@ -557,191 +804,29 @@ def create_image(module, connection): params['TpmSupport'] = tpm_support if uefi_data: params['UefiData'] = uefi_data - image_id = connection.register_image(aws_retry=True, **params).get('ImageId') - except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: - module.fail_json_aws(e, msg="Error registering image") - - if wait: - delay = 15 - max_attempts = wait_timeout // delay - waiter = get_waiter(connection, 'image_available') - waiter.wait(ImageIds=[image_id], WaiterConfig=dict(Delay=delay, MaxAttempts=max_attempts)) - - if tags and 'TagSpecifications' not in params: - image_info = get_image_by_id(module, connection, image_id) - add_ec2_tags(connection, module, image_id, tags) - if image_info and image_info.get('BlockDeviceMappings'): - for mapping in image_info.get('BlockDeviceMappings'): - # We can only tag Ebs volumes - if 'Ebs' not in mapping: - continue - add_ec2_tags(connection, module, mapping.get('Ebs').get('SnapshotId'), tags) - - if launch_permissions: - try: - params = dict(Attribute='LaunchPermission', ImageId=image_id, LaunchPermission=dict(Add=list())) - for group_name in launch_permissions.get('group_names', []): - params['LaunchPermission']['Add'].append(dict(Group=group_name)) - for user_id in launch_permissions.get('user_ids', []): - params['LaunchPermission']['Add'].append(dict(UserId=str(user_id))) - if params['LaunchPermission']['Add']: - connection.modify_image_attribute(aws_retry=True, **params) - except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: - module.fail_json_aws(e, msg="Error setting launch permissions for image %s" % image_id) + return params - module.exit_json(msg="AMI creation operation complete.", changed=True, - **get_ami_info(get_image_by_id(module, connection, image_id))) + @classmethod + def do(cls, module, connection): + """Entry point to create image""" + create_image_parameters = cls.build_create_image_parameters(**module.params) - -def deregister_image(module, connection): - image_id = module.params.get('image_id') - delete_snapshot = module.params.get('delete_snapshot') - wait = module.params.get('wait') - wait_timeout = module.params.get('wait_timeout') - image = get_image_by_id(module, connection, image_id) - - if image is None: - module.exit_json(changed=False) - - # Get all associated snapshot ids before deregistering image otherwise this information becomes unavailable. - snapshots = [] - if 'BlockDeviceMappings' in image: - for mapping in image.get('BlockDeviceMappings'): - snapshot_id = mapping.get('Ebs', {}).get('SnapshotId') - if snapshot_id is not None: - snapshots.append(snapshot_id) - - # When trying to re-deregister an already deregistered image it doesn't raise an exception, it just returns an object without image attributes. - if 'ImageId' in image: - if module.check_mode: - module.exit_json(changed=True, msg='Would have deregistered AMI if not in check mode.') + func = cls.create_or_register(connection, create_image_parameters) try: - connection.deregister_image(aws_retry=True, ImageId=image_id) + image = func(aws_retry=True, **create_image_parameters) + image_id = image.get('ImageId') except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: - module.fail_json_aws(e, msg="Error deregistering image") - else: - module.exit_json(msg="Image %s has already been deregistered." % image_id, changed=False) + raise Ec2AmiFailure("Error registering image", e) - image = get_image_by_id(module, connection, image_id) - wait_timeout = time.time() + wait_timeout + cls.wait(connection, module.params.get("wait") and module.params.get("wait_timeout"), image_id) - while wait and wait_timeout > time.time() and image is not None: - image = get_image_by_id(module, connection, image_id) - time.sleep(3) + if 'TagSpecifications' not in create_image_parameters: + CreateImage.set_tags(connection, module, module.params.get("tags"), image_id) - if wait and wait_timeout <= time.time(): - module.fail_json(msg="Timed out waiting for image to be deregistered.") + cls.set_launch_permissions(connection, module.params.get('launch_permissions'), image_id) - exit_params = {'msg': "AMI deregister operation complete.", 'changed': True} - - if delete_snapshot: - for snapshot_id in snapshots: - try: - connection.delete_snapshot(aws_retry=True, SnapshotId=snapshot_id) - # Don't error out if root volume snapshot was already deregistered as part of deregister_image - except is_boto3_error_code('InvalidSnapshot.NotFound'): - pass - except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: # pylint: disable=duplicate-except - module.fail_json_aws(e, msg='Failed to delete snapshot.') - exit_params['snapshots_deleted'] = snapshots - - module.exit_json(**exit_params) - - -def update_image(module, connection, image_id): - launch_permissions = module.params.get('launch_permissions') - image = get_image_by_id(module, connection, image_id) - if image is None: - module.fail_json(msg="Image %s does not exist" % image_id, changed=False) - changed = False - - if launch_permissions is not None: - current_permissions = image['LaunchPermissions'] - - current_users = set(permission['UserId'] for permission in current_permissions if 'UserId' in permission) - desired_users = set(str(user_id) for user_id in launch_permissions.get('user_ids', [])) - current_groups = set(permission['Group'] for permission in current_permissions if 'Group' in permission) - desired_groups = set(launch_permissions.get('group_names', [])) - - to_add_users = desired_users - current_users - to_remove_users = current_users - desired_users - to_add_groups = desired_groups - current_groups - to_remove_groups = current_groups - desired_groups - - to_add = [dict(Group=group) for group in to_add_groups] + [dict(UserId=user_id) for user_id in to_add_users] - to_remove = [dict(Group=group) for group in to_remove_groups] + [dict(UserId=user_id) for user_id in to_remove_users] - - if to_add or to_remove: - try: - if not module.check_mode: - connection.modify_image_attribute(aws_retry=True, - ImageId=image_id, Attribute='launchPermission', - LaunchPermission=dict(Add=to_add, Remove=to_remove)) - changed = True - except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: - module.fail_json_aws(e, msg="Error updating launch permissions of image %s" % image_id) - - desired_tags = module.params.get('tags') - if desired_tags is not None: - changed |= ensure_ec2_tags(connection, module, image_id, tags=desired_tags, purge_tags=module.params.get('purge_tags')) - - description = module.params.get('description') - if description and description != image['Description']: - try: - if not module.check_mode: - connection.modify_image_attribute(aws_retry=True, Attribute='Description ', ImageId=image_id, Description=dict(Value=description)) - changed = True - except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: - module.fail_json_aws(e, msg="Error setting description for image %s" % image_id) - - if changed: - if module.check_mode: - module.exit_json(changed=True, msg='Would have updated AMI if not in check mode.') - module.exit_json(msg="AMI updated.", changed=True, - **get_ami_info(get_image_by_id(module, connection, image_id))) - else: - module.exit_json(msg="AMI not updated.", changed=False, - **get_ami_info(get_image_by_id(module, connection, image_id))) - - -def get_image_by_id(module, connection, image_id): - try: - try: - images_response = connection.describe_images(aws_retry=True, ImageIds=[image_id]) - except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: - module.fail_json_aws(e, msg="Error retrieving image %s" % image_id) - images = images_response.get('Images') - no_images = len(images) - if no_images == 0: - return None - if no_images == 1: - result = images[0] - try: - result['LaunchPermissions'] = connection.describe_image_attribute(aws_retry=True, Attribute='launchPermission', - ImageId=image_id)['LaunchPermissions'] - result['ProductCodes'] = connection.describe_image_attribute(aws_retry=True, Attribute='productCodes', - ImageId=image_id)['ProductCodes'] - except is_boto3_error_code('InvalidAMIID.Unavailable'): - pass - except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: # pylint: disable=duplicate-except - module.fail_json_aws(e, msg="Error retrieving image attributes for image %s" % image_id) - return result - module.fail_json(msg="Invalid number of instances (%s) found for image_id: %s." % (str(len(images)), image_id)) - except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: - module.fail_json_aws(e, msg="Error retrieving image by image_id") - - -def rename_item_if_exists(dict_object, attribute, new_attribute, child_node=None, attribute_type=None): - new_item = dict_object.get(attribute) - if new_item is not None: - if attribute_type is not None: - new_item = attribute_type(new_item) - if child_node is None: - dict_object[new_attribute] = new_item - else: - dict_object[child_node][new_attribute] = new_item - dict_object.pop(attribute) - return dict_object + module.exit_json(msg="AMI creation operation complete.", changed=True, + **get_ami_info(get_image_by_id(connection, image_id))) def main(): @@ -757,14 +842,14 @@ def main(): volume_size=dict(type='int', aliases=['size']), ) argument_spec = dict( - instance_id=dict(), - image_id=dict(), + instance_id={}, + image_id={}, architecture=dict(default='x86_64'), - kernel_id=dict(), + kernel_id={}, virtualization_type=dict(default='hvm'), - root_device_name=dict(), + root_device_name={}, delete_snapshot=dict(default=False, type='bool'), - name=dict(), + name={}, wait=dict(type='bool', default=False), wait_timeout=dict(default=1200, type='int'), description=dict(default=''), @@ -772,11 +857,11 @@ def main(): state=dict(default='present', choices=['present', 'absent']), device_mapping=dict(type='list', elements='dict', options=mapping_options), launch_permissions=dict(type='dict'), - image_location=dict(), + image_location={}, enhanced_networking=dict(type='bool'), billing_products=dict(type='list', elements='str',), - ramdisk_id=dict(), - sriov_net_support=dict(), + ramdisk_id={}, + sriov_net_support={}, tags=dict(type='dict', aliases=['resource_tags']), purge_tags=dict(type='bool', default=True), boot_mode=dict(type='str', choices=['legacy-bios', 'uefi']), @@ -795,21 +880,34 @@ def main(): # Using a required_one_of=[['name', 'image_id']] overrides the message that should be provided by # the required_if for state=absent, so check manually instead if not any([module.params['image_id'], module.params['name']]): - module.fail_json(msg="one of the following is required: name, image_id") + module.fail_json("one of the following is required: name, image_id") if any([module.params['tpm_support'], module.params['uefi_data']]): module.require_botocore_at_least('1.26.0', reason='required for ec2.register_image with tpm_support or uefi_data') connection = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff()) - if module.params.get('state') == 'absent': - deregister_image(module, connection) - elif module.params.get('state') == 'present': - if module.params.get('image_id'): - update_image(module, connection, module.params.get('image_id')) - if not module.params.get('instance_id') and not module.params.get('device_mapping'): - module.fail_json(msg="The parameters instance_id or device_mapping (register from EBS snapshot) are required for a new image.") - create_image(module, connection) + try: + if module.params.get('state') == 'absent': + if module.check_mode: + DeregisterImage.do_check_mode(module, connection) + else: + DeregisterImage.do(module, connection) + elif module.params.get('image_id') and module.params.get('state') == 'present': + UpdateImage.do(module, connection, module.params.get('image_id')) + elif module.params.get('state') == 'present': + if not module.params.get('instance_id') and not module.params.get('device_mapping'): + Ec2AmiFailure("The parameters instance_id or device_mapping " + "(register from EBS snapshot) are required for a new image.") + if module.check_mode: + CreateImage.do_check_mode(module, connection) + else: + CreateImage.do(module, connection) + except Ec2AmiFailure as e: + if e.original_e: + module.fail_json_aws(e.original_e, e.message) + else: + module.fail_json(e.message) if __name__ == '__main__': diff --git a/tests/unit/plugins/modules/test_ec2_ami.py b/tests/unit/plugins/modules/test_ec2_ami.py index 5e8140d4a46..c68ce6abfc3 100644 --- a/tests/unit/plugins/modules/test_ec2_ami.py +++ b/tests/unit/plugins/modules/test_ec2_ami.py @@ -28,7 +28,7 @@ def test_create_image_uefi_data(m_get_image_by_id): "uefi_data": "QU1aTlVFRkk9xcN0AAAAAHj5a7fZ9+3aT2gcVRgA8Ek3NipiPST0pCiCIlTJtj20FzENCcQa", } - ec2_ami.create_image(module, connection) + ec2_ami.CreateImage.do(module, connection) assert connection.register_image.call_count == 1 connection.register_image.assert_has_calls( [ @@ -42,3 +42,80 @@ def test_create_image_uefi_data(m_get_image_by_id): ) ] ) + + +def test_get_block_device_mapping_virtual_name(): + image = { + "block_device_mappings": [ + {"device_name": "/dev/sdc", "virtual_name": "ephemeral0"} + ] + } + block_device = ec2_ami.get_block_device_mapping(image) + assert block_device == {"/dev/sdc": {"virtual_name": "ephemeral0"}} + + +def test_get_image_by_id_found(): + connection = MagicMock() + + connection.describe_images.return_value = { + "Images": [{"ImageId": "ami-0c7a795306730b288"}] + } + + image = ec2_ami.get_image_by_id(connection, "ami-0c7a795306730b288") + assert image["ImageId"] == "ami-0c7a795306730b288" + assert connection.describe_images.call_count == 1 + assert connection.describe_image_attribute.call_count == 2 + connection.describe_images.assert_has_calls( + [ + call( + aws_retry=True, + ImageIds=["ami-0c7a795306730b288"], + ) + ] + ) + + +def test_get_image_missing(): + connection = MagicMock() + + connection.describe_images.return_value = {"Images": []} + + image = ec2_ami.get_image_by_id(connection, "ami-0c7a795306730b288") + assert image is None + assert connection.describe_images.call_count == 1 + connection.describe_images.assert_has_calls( + [ + call( + aws_retry=True, + ImageIds=["ami-0c7a795306730b288"], + ) + ] + ) + + +@patch( + module_name + ".get_image_by_id", +) +def test_create_image_minimal(m_get_image_by_id): + module = MagicMock() + connection = MagicMock() + + m_get_image_by_id.return_value = {"ImageId": "ami-0c7a795306730b288"} + module.params = { + "name": "my-image", + "instance_id": "i-123456789", + "image_id": "ami-0c7a795306730b288", + } + ec2_ami.CreateImage.do(module, connection) + assert connection.create_image.call_count == 1 + connection.create_image.assert_has_calls( + [ + call( + aws_retry=True, + Description=None, + InstanceId="i-123456789", + Name="my-image", + NoReboot=None, + ) + ] + )