From 971d3b059d68d2e7dc647b16ec1fdf3c7ba5a1b3 Mon Sep 17 00:00:00 2001 From: Mark Chappell Date: Mon, 27 Sep 2021 08:57:02 +0200 Subject: [PATCH] Migrate iam_server_certificate to boto3 This commit was initially merged in https://github.com/ansible-collections/community.aws See: https://github.com/ansible-collections/community.aws/commit/09f67c5727c1e2bd52498cfd0efcf7cdc33d6512 --- plugins/modules/iam_server_certificate.py | 375 +++++++++++------- .../iam_server_certificate/tasks/main.yml | 97 ++--- 2 files changed, 293 insertions(+), 179 deletions(-) diff --git a/plugins/modules/iam_server_certificate.py b/plugins/modules/iam_server_certificate.py index 14cc5cef077..1a5df57465c 100644 --- a/plugins/modules/iam_server_certificate.py +++ b/plugins/modules/iam_server_certificate.py @@ -126,114 +126,196 @@ import os try: - import boto - import boto.iam - import boto.ec2 + import botocore except ImportError: pass # Handled by HAS_BOTO +from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict + from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import get_aws_connection_info -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import connect_to_aws -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import HAS_BOTO - - -def cert_meta(iam, name): - certificate = iam.get_server_certificate(name).get_server_certificate_result.server_certificate - ocert = certificate.certificate_body - opath = certificate.server_certificate_metadata.path - ocert_id = certificate.server_certificate_metadata.server_certificate_id - upload_date = certificate.server_certificate_metadata.upload_date - exp = certificate.server_certificate_metadata.expiration - arn = certificate.server_certificate_metadata.arn - return opath, ocert, ocert_id, upload_date, exp, arn - - -def dup_check(module, iam, name, new_name, cert, orig_cert_names, orig_cert_bodies, dup_ok): - update = False - - # IAM cert names are case insensitive - names_lower = [n.lower() for n in [name, new_name] if n is not None] - orig_cert_names_lower = [ocn.lower() for ocn in orig_cert_names] - - if any(ct in orig_cert_names_lower for ct in names_lower): - for i_name in names_lower: - if cert is not None: - try: - c_index = orig_cert_names_lower.index(i_name) - except NameError: - continue - else: - # NOTE: remove the carriage return to strictly compare the cert bodies. - slug_cert = cert.replace('\r', '') - slug_orig_cert_bodies = orig_cert_bodies[c_index].replace('\r', '') - if slug_orig_cert_bodies == slug_cert: - update = True - break - elif slug_cert.startswith(slug_orig_cert_bodies): - update = True - break - else: - module.fail_json(changed=False, msg='A cert with the name %s already exists and' - ' has a different certificate body associated' - ' with it. Certificates cannot have the same name' % orig_cert_names[c_index]) - else: - update = True - break - elif cert in orig_cert_bodies and not dup_ok: - for crt_name, crt_body in zip(orig_cert_names, orig_cert_bodies): - if crt_body == cert: - module.fail_json(changed=False, msg='This certificate already' - ' exists under the name %s' % crt_name) - - return update - - -def cert_action(module, iam, name, cpath, new_name, new_path, state, - cert, key, cert_chain, orig_cert_names, orig_cert_bodies, dup_ok): - if state == 'present': - update = dup_check(module, iam, name, new_name, cert, orig_cert_names, - orig_cert_bodies, dup_ok) - if update: - opath, ocert, ocert_id, upload_date, exp, arn = cert_meta(iam, name) - changed = True - if new_name and new_path: - iam.update_server_cert(name, new_cert_name=new_name, new_path=new_path) - module.exit_json(changed=changed, original_name=name, new_name=new_name, - original_path=opath, new_path=new_path, cert_body=ocert, - upload_date=upload_date, expiration_date=exp, arn=arn) - elif new_name and not new_path: - iam.update_server_cert(name, new_cert_name=new_name) - module.exit_json(changed=changed, original_name=name, new_name=new_name, - cert_path=opath, cert_body=ocert, - upload_date=upload_date, expiration_date=exp, arn=arn) - elif not new_name and new_path: - iam.update_server_cert(name, new_path=new_path) - module.exit_json(changed=changed, name=new_name, - original_path=opath, new_path=new_path, cert_body=ocert, - upload_date=upload_date, expiration_date=exp, arn=arn) - else: - changed = False - module.exit_json(changed=changed, name=name, cert_path=opath, cert_body=ocert, - upload_date=upload_date, expiration_date=exp, arn=arn, - msg='No new path or name specified. No changes made') - else: - changed = True - iam.upload_server_cert(name, cert, key, cert_chain=cert_chain, path=cpath) - opath, ocert, ocert_id, upload_date, exp, arn = cert_meta(iam, name) - module.exit_json(changed=changed, name=name, cert_path=opath, cert_body=ocert, - upload_date=upload_date, expiration_date=exp, arn=arn) - elif state == 'absent': - if name in orig_cert_names: - changed = True - iam.delete_server_cert(name) - module.exit_json(changed=changed, deleted_cert=name) - else: - changed = False - module.exit_json(changed=changed, msg='Certificate with the name %s already absent' % name) +from ansible_collections.amazon.aws.plugins.module_utils.core import is_boto3_error_code +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import AWSRetry + + +@AWSRetry.jittered_backoff() +def _list_server_certficates(): + paginator = client.get_paginator('list_server_certificates') + return paginator.paginate().build_full_result()['ServerCertificateMetadataList'] + + +def check_duplicate_cert(new_cert): + orig_cert_names = list(c['ServerCertificateName'] for c in _list_server_certficates()) + for cert_name in orig_cert_names: + cert = get_server_certificate(cert_name) + if not cert: + continue + cert_body = cert.get('certificate_body', None) + if not _compare_cert(new_cert, cert_body): + continue + module.fail_json( + changed=False, + msg='This certificate already exists under the name {0} and dup_ok=False'.format(cert_name), + duplicate_cert=cert, + ) + + +def _compare_cert(cert_a, cert_b): + if not cert_a and not cert_b: + return True + if not cert_a or not cert_b: + return False + # Trim out the whitespace before comparing the certs. While this could mean + # an invalid cert 'matches' a valid cert, that's better than some stray + # whitespace breaking things + cert_a.replace('\r', '') + cert_a.replace('\n', '') + cert_a.replace(' ', '') + cert_b.replace('\r', '') + cert_b.replace('\n', '') + cert_b.replace(' ', '') + + return cert_a == cert_b + + +def update_server_certificate(current_cert): + changed = False + + cert, key, cert_chain = load_data() + + if not _compare_cert(cert, current_cert.get('certificate_body', None)): + module.fail_json(msg='Modifying the certificate body is not supported by AWS') + if not _compare_cert(cert_chain, current_cert.get('certificate_chain', None)): + module.fail_json(msg='Modifying the chaining certificate is not supported by AWS') + # We can't compare keys. + + if module.check_mode: + return changed + + # For now we can't make any changes. Updates to tagging would go here and + # update 'changed' + + return changed + + +def create_server_certificate(): + cert, key, cert_chain = load_data() + + if not module.params.get('dup_ok'): + check_duplicate_cert(cert) + + path = module.params.get('path') + name = module.params.get('name') + + params = dict( + ServerCertificateName=name, + CertificateBody=cert, + PrivateKey=key, + ) + + if cert_chain: + params['CertificateChain'] = cert_chain + if path: + params['Path'] = path + + if module.check_mode: + return True + + try: + client.upload_server_certificate( + aws_retry=True, + **params + ) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg='Failed to update server certificate {0}'.format(name)) + + return True + + +def rename_server_certificate(current_cert): + name = module.params.get('name') + new_name = module.params.get('new_name') + new_path = module.params.get('new_path') + + changes = dict() + + # Try to be nice, if we've already been renamed exit quietly. + if not current_cert: + current_cert = get_server_certificate(new_name) + else: + if new_name: + changes['NewServerCertificateName'] = new_name + + cert_metadata = current_cert.get('server_certificate_metadata', {}) + + if not current_cert: + module.fail_json(msg='Unable to find certificate {0}'.format(name)) + current_path = cert_metadata.get('path', None) + if new_path and current_path != new_path: + changes['NewPath'] = new_path + + if not changes: + return False + + if module.check_mode: + return True + + try: + client.update_server_certificate( + aws_retry=True, + ServerCertificateName=name, + **changes + ) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg='Failed to update server certificate {0}'.format(name), + changes=changes) + + return True + + +def delete_server_certificate(current_cert): + if not current_cert: + return False + + if module.check_mode: + return True + + name = module.params.get('name') + + try: + result = client.delete_server_certificate( + aws_retry=True, + ServerCertificateName=name, + ) + except is_boto3_error_code('NoSuchEntity'): + return None + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: # pylint: disable=duplicate-except + module.fail_json_aws(e, msg='Failed to delete server certificate {0}'.format(name)) + + return True + + +def get_server_certificate(name): + if not name: + return None + try: + result = client.get_server_certificate( + aws_retry=True, + ServerCertificateName=name, + ) + except is_boto3_error_code('NoSuchEntity'): + return None + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: # pylint: disable=duplicate-except + module.fail_json_aws(e, msg='Failed to get server certificate {0}'.format(name)) + cert = dict(camel_dict_to_snake_dict(result.get('ServerCertificate'))) + return cert + + +def load_data(): + cert = module.params.get('cert') + key = module.params.get('key') + cert_chain = module.params.get('cert_chain') -def load_data(cert, key, cert_chain): # if paths are provided rather than lookups read the files and return the contents if cert and os.path.isfile(cert): with open(cert, 'r') as cert_fh: @@ -259,9 +341,36 @@ def load_data(cert, key, cert_chain): return cert, key, cert_chain +def compatability_results(current_cert): + compat_results = dict() + + if not current_cert: + return compat_results + + metadata = current_cert.get('server_certificate_metadata', {}) + + if current_cert.get('certificate_body', None): + compat_results['cert_body'] = current_cert.get('certificate_body') + if current_cert.get('certificate_chain', None): + compat_results['chain_cert_body'] = current_cert.get('certificate_chain') + if metadata.get('arn', None): + compat_results['arn'] = metadata.get('arn') + if metadata.get('expiration', None): + compat_results['expiration_date'] = metadata.get('expiration') + if metadata.get('path', None): + compat_results['cert_path'] = metadata.get('path') + if metadata.get('server_certificate_name', None): + compat_results['name'] = metadata.get('server_certificate_name') + if metadata.get('upload_date', None): + compat_results['upload_date'] = metadata.get('upload_date') + + return compat_results + + def main(): global module + global client argument_spec = dict( state=dict(required=True, choices=['present', 'absent']), @@ -285,21 +394,9 @@ def main(): ['new_name', 'cert'], ['new_name', 'cert_chain'], ], - check_boto3=False, ) - if not HAS_BOTO: - module.fail_json(msg="Boto is required for this module") - - region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module) - - try: - if region: - iam = connect_to_aws(boto.iam, region, **aws_connect_kwargs) - else: - iam = boto.iam.connection.IAMConnection(**aws_connect_kwargs) - except boto.exception.NoAuthHandlerFound as e: - module.fail_json(msg=str(e)) + client = module.client('iam', retry_decorator=AWSRetry.jittered_backoff()) state = module.params.get('state') name = module.params.get('name') @@ -313,28 +410,38 @@ def main(): 'The dup_ok module currently defaults to false, this will change in ' 'release 4.0.0 to true.', version='4.0.0', collection_name='community.aws') - if state == 'present' and not new_name and not new_path: - cert, key, cert_chain = load_data(cert=module.params.get('cert'), - key=module.params.get('key'), - cert_chain=module.params.get('cert_chain')) + current_cert = get_server_certificate(name) + + results = dict() + if state == 'absent': + changed = delete_server_certificate(current_cert) + if changed: + results['deleted_cert'] = name + else: + msg = 'Certificate with the name {0} already absent'.format(name) + results['msg'] = msg else: - cert = key = cert_chain = None + if new_name or new_path: + changed = rename_server_certificate(current_cert) + if new_name: + name = new_name + updated_cert = get_server_certificate(name) + elif current_cert: + changed = update_server_certificate(current_cert) + updated_cert = get_server_certificate(name) + else: + changed = create_server_certificate() + updated_cert = get_server_certificate(name) - orig_cert_names = [ctb['server_certificate_name'] for ctb in - iam.get_all_server_certs().list_server_certificates_result.server_certificate_metadata_list] - orig_cert_bodies = [iam.get_server_certificate(thing).get_server_certificate_result.certificate_body - for thing in orig_cert_names] - if new_name == name: - new_name = None - if new_path == path: - new_path = None + results['server_certificate'] = updated_cert + compat_results = compatability_results(updated_cert) + if compat_results: + results.update(compat_results) - changed = False - try: - cert_action(module, iam, name, path, new_name, new_path, state, - cert, key, cert_chain, orig_cert_names, orig_cert_bodies, dup_ok) - except boto.exception.BotoServerError as err: - module.fail_json(changed=changed, msg=str(err), debug=[cert, key]) + module.exit_json( + changed=changed, + **results + ) if __name__ == '__main__': diff --git a/tests/integration/targets/iam_server_certificate/tasks/main.yml b/tests/integration/targets/iam_server_certificate/tasks/main.yml index dcc07e595e9..10895fcbbe3 100644 --- a/tests/integration/targets/iam_server_certificate/tasks/main.yml +++ b/tests/integration/targets/iam_server_certificate/tasks/main.yml @@ -112,18 +112,25 @@ assert: that: - update_cert is failed - - '"already exists" in update_cert.msg' - - ## AWS APIs provide no mechanism for accessing - ## any information about the key, and as such - ## the module can't tell if a key was updated. - # - name: Update Certificate - # iam_server_certificate: - # name: '{{ cert_name }}' - # state: present - # key: '{{ lookup("file", path_intermediate_key) }}' - # register: update_cert - # ignore_errors: True + - '"not supported" in update_cert.msg' + + - name: Update Chaining Certificate + iam_server_certificate: + name: '{{ cert_name }}' + state: present + cert_chain: '{{ chain_cert_data }}' + register: update_cert + ignore_errors: True + + - name: check result - Update Chaining Certificate + assert: + that: + - update_cert is failed + - '"not supported" in update_cert.msg' + + # AWS APIs provide no mechanism for accessing + # any information about the key, and as such + # the module can't tell if a key was updated. ################################################ @@ -285,30 +292,30 @@ - update_path is changed - '"arn" in update_path' - '"cert_body" in update_path' - # - '"cert_path" in update_path' + - '"cert_path" in update_path' - '"expiration_date" in update_path' - '"name" in update_path' - '"upload_date" in update_path' - update_path.arn.startswith('arn:aws') - update_path.arn.endswith(cert_name) - # - update_path.name == cert_name - # - update_path.cert_path == '/path/' + - update_path.name == cert_name + - update_path.cert_path == '/path/' - update_path.cert_body == cert_a_data - # - name: Update certificate path - idempotency - # iam_server_certificate: - # name: '{{ cert_name }}' - # state: present - # path: '/example/' - # new_path: '/path/' - # register: update_path - # ignore_errors: True - - # - name: check result - Update certificate path - idempotency - # assert: - # that: - # - update_path is successful - # - update_path is not changed + - name: Update certificate path - idempotency + iam_server_certificate: + name: '{{ cert_name }}' + state: present + path: '/example/' + new_path: '/path/' + register: update_path + ignore_errors: True + + - name: check result - Update certificate path - idempotency + assert: + that: + - update_path is successful + - update_path is not changed ################################################ @@ -329,27 +336,27 @@ - '"cert_body" in update_name' - '"cert_path" in update_name' - '"expiration_date" in update_name' - # - '"name" in update_name' + - '"name" in update_name' - '"upload_date" in update_name' - update_name.arn.startswith('arn:aws') - # - update_name.arn.endswith('-renamed') - # - update_name.name.endswith('renamed') + - update_name.arn.endswith('-renamed') + - update_name.name.endswith('renamed') - update_name.cert_path == '/path/' - update_name.cert_body == cert_a_data - # - name: Update certificate name - idempotency - # iam_server_certificate: - # name: '{{ cert_name }}' - # new_name: '{{ cert_name }}-renamed' - # state: present - # register: update_name - # ignore_errors: True - - # - name: check result - Update certificate name - idempotency - # assert: - # that: - # - update_name is successful - # - update_name is not changed + - name: Update certificate name - idempotency + iam_server_certificate: + name: '{{ cert_name }}' + new_name: '{{ cert_name }}-renamed' + state: present + register: update_name + ignore_errors: True + + - name: check result - Update certificate name - idempotency + assert: + that: + - update_name is successful + - update_name is not changed always: