From 2757479e0906c73d7fdb19ecb08d2dbbdb1c6435 Mon Sep 17 00:00:00 2001 From: Sebastien Rosset Date: Thu, 31 Mar 2022 03:47:24 -0700 Subject: [PATCH] Add support for tagging certificates. Fix deprecated tasks in aws_acm integration tests (#870) Add support for tagging certificates. Fix deprecated tasks in aws_acm integration tests SUMMARY This PR adds support for configuring arbitrary tags when importing a certificate using the aws_acm module. Previously, it was only possible to set the 'Name' tag. Additionally, this PR fixes issues with the aws_acm integration tests. The integration tests were using deprecated tasks or attributes, such as openssl_certificate. ISSUE TYPE Bugfix Pull Request COMPONENT NAME aws_acm ADDITIONAL INFORMATION Changes to the aws_acm.py module: Add new tags and purge_tags attributes. The certificate_arn attribute is now allowed when state='present'. A playbook should be allowed to modify an existing certificate entry by providing the ARN. For example, a play may want to add, modify, remove tags on an existing certificate. The aws_acm module returns the updated tags. See example below. Refactor aws_acm.py to improve code reuse and make it possible to set arbitrary tags. This should also help to 1) improve readability. 2) prepare for #869 which I am planning to work on next. Backwards-compatibility is retained, even though it might make sense to normalize some of the attributes. Example return value: "certificate": { "arn": "arn:aws:acm:us-west-1:account:certificate/f85abf9d-4bda-4dcc-98c3-770664a68243", "domain_name": "acm1.949058644.ansible.com", "tags": { "Application": "search", "Environment": "development", "Name": "ansible-test-78006277-398b5796f999_949058644_1" } } Integration tests: The openssl_certificate task is deprecated. Migrate to x509_certificate. The signature_algorithms attribute is no longer supported by the new x509_certificate task. Using selfsigned_digest instead. The integration tests for the aws_acm module pass locally. I see ansible/ansible#67788 has been closed, but tests/integration/targets/aws_acm/aliases still has unstable. I am not sure what to do about it. I was able to run the tests in my local workspace after making the above changes. Reviewed-by: Markus Bergholz Reviewed-by: Sebastien Rosset Reviewed-by: Mark Woolley Reviewed-by: Alina Buzachis --- aws_acm.py | 343 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 251 insertions(+), 92 deletions(-) diff --git a/aws_acm.py b/aws_acm.py index d28301e9160..1125ead5036 100644 --- a/aws_acm.py +++ b/aws_acm.py @@ -79,15 +79,17 @@ certificate: description: - The body of the PEM encoded public certificate. - - Required when I(state) is not C(absent). + - Required when I(state) is not C(absent) and the certificate does not exist. - > If your certificate is in a file, use C(lookup('file', 'path/to/cert.pem')). type: str certificate_arn: description: - - The ARN of a certificate in ACM to delete - - Ignored when I(state=present). + - The ARN of a certificate in ACM to modify or delete. + - > + If I(state=present), the certificate with the specified ARN can be updated. + For example, this can be used to add/remove tags to an existing certificate. - > If I(state=absent), you must provide one of I(certificate_arn), I(domain_name) or I(name_tag). @@ -131,6 +133,9 @@ This is to ensure Ansible can treat certificates idempotently, even though the ACM API allows duplicate certificates. - If I(state=preset), this must be specified. + - > + If I(state=absent) and I(name_tag) is specified, + this task will delete all ACM certificates with this Name tag. - > If I(state=absent), you must provide exactly one of I(certificate_arn), I(domain_name) or I(name_tag). @@ -139,7 +144,7 @@ private_key: description: - The body of the PEM encoded private key. - - Required when I(state=present). + - Required when I(state=present) and the certificate does not exist. - Ignored when I(state=absent). - > If your private key is in a file, @@ -157,6 +162,26 @@ choices: [present, absent] default: present type: str + + tags: + description: + - Tags to apply to certificates imported in ACM. + - > + If both I(name_tag) and the 'Name' tag in I(tags) are set, + the values must be the same. + - > + If the 'Name' tag in I(tags) is not set and I(name_tag) is set, + the I(name_tag) value is copied to I(tags). + type: dict + version_added: 3.2.0 + + purge_tags: + description: + - whether to remove tags not present in the C(tags) parameter. + default: false + type: bool + version_added: 3.2.0 + author: - Matthew Davis (@matt-telstra) on behalf of Telstra Corporation Limited extends_documentation_fragment: @@ -206,6 +231,14 @@ state: absent region: ap-southeast-2 +- name: add tags to an existing certificate with a particular ARN + community.aws.aws_acm: + certificate_arn: "arn:aws:acm:ap-southeast-2:123456789012:certificate/01234567-abcd-abcd-abcd-012345678901" + tags: + Name: my_certificate + Application: search + Environment: development + purge_tags: true ''' RETURN = ''' @@ -234,11 +267,65 @@ ''' +import base64 +from copy import deepcopy +import re # regex library + +try: + import botocore +except ImportError: + pass # handled by AnsibleAWSModule + from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule from ansible_collections.amazon.aws.plugins.module_utils.acm import ACMServiceManager +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import compare_aws_tags +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ( + boto3_tag_list_to_ansible_dict, + ansible_dict_to_boto3_tag_list, +) from ansible.module_utils._text import to_text -import base64 -import re # regex library + + +def ensure_tags(client, module, resource_arn, existing_tags, tags, purge_tags): + if tags is None: + return (False, existing_tags) + + tags_to_add, tags_to_remove = compare_aws_tags(existing_tags, tags, purge_tags) + changed = bool(tags_to_add or tags_to_remove) + if tags_to_add and not module.check_mode: + try: + client.add_tags_to_certificate( + CertificateArn=resource_arn, + Tags=ansible_dict_to_boto3_tag_list(tags_to_add), + ) + except ( + botocore.exceptions.ClientError, + botocore.exceptions.BotoCoreError, + ) as e: + module.fail_json_aws( + e, "Couldn't add tags to certificate {0}".format(resource_arn) + ) + if tags_to_remove and not module.check_mode: + # remove_tags_from_certificate wants a list of key, value pairs, not a list of keys. + tags_list = [{'Key': key, 'Value': existing_tags.get(key)} for key in tags_to_remove] + try: + client.remove_tags_from_certificate( + CertificateArn=resource_arn, + Tags=tags_list, + ) + except ( + botocore.exceptions.ClientError, + botocore.exceptions.BotoCoreError, + ) as e: + module.fail_json_aws( + e, "Couldn't remove tags from certificate {0}".format(resource_arn) + ) + new_tags = deepcopy(existing_tags) + for key, value in tags_to_add.items(): + new_tags[key] = value + for key in tags_to_remove: + new_tags.pop(key, None) + return (changed, new_tags) # Takes in two text arguments @@ -293,6 +380,122 @@ def pem_chain_split(module, pem): return pem_arr +def update_imported_certificate(client, module, acm, old_cert, desired_tags): + """ + Update the existing certificate that was previously imported in ACM. + """ + module.debug("Existing certificate found in ACM") + if ('tags' not in old_cert) or ('Name' not in old_cert['tags']): + # shouldn't happen + module.fail_json(msg="Internal error, unsure which certificate to update", certificate=old_cert) + if module.params.get('name_tag') is not None and (old_cert['tags']['Name'] != module.params.get('name_tag')): + # This could happen if the user identified the certificate using 'certificate_arn' or 'domain_name', + # and the 'Name' tag in the AWS API does not match the ansible 'name_tag'. + module.fail_json(msg="Internal error, Name tag does not match", certificate=old_cert) + if 'certificate' not in old_cert: + # shouldn't happen + module.fail_json(msg="Internal error, unsure what the existing cert in ACM is", certificate=old_cert) + + cert_arn = None + # Are the existing certificate in ACM and the local certificate the same? + same = True + if module.params.get('certificate') is not None: + same &= chain_compare(module, old_cert['certificate'], module.params['certificate']) + if module.params['certificate_chain']: + # Need to test this + # not sure if Amazon appends the cert itself to the chain when self-signed + same &= chain_compare(module, old_cert['certificate_chain'], module.params['certificate_chain']) + else: + # When there is no chain with a cert + # it seems Amazon returns the cert itself as the chain + same &= chain_compare(module, old_cert['certificate_chain'], module.params['certificate']) + + if same: + module.debug("Existing certificate in ACM is the same") + cert_arn = old_cert['certificate_arn'] + changed = False + else: + absent_args = ['certificate', 'name_tag', 'private_key'] + if sum([(module.params[a] is not None) for a in absent_args]) < 3: + module.fail_json(msg="When importing a certificate, all of 'name_tag', 'certificate' and 'private_key' must be specified") + module.debug("Existing certificate in ACM is different, overwriting") + changed = True + if module.check_mode: + cert_arn = old_cert['certificate_arn'] + # note: returned domain will be the domain of the previous cert + else: + # update cert in ACM + cert_arn = acm.import_certificate( + client, + module, + certificate=module.params['certificate'], + private_key=module.params['private_key'], + certificate_chain=module.params['certificate_chain'], + arn=old_cert['certificate_arn'], + tags=desired_tags, + ) + return (changed, cert_arn) + + +def import_certificate(client, module, acm, desired_tags): + """ + Import a certificate to ACM. + """ + # Validate argument requirements + absent_args = ['certificate', 'name_tag', 'private_key'] + cert_arn = None + if sum([(module.params[a] is not None) for a in absent_args]) < 3: + module.fail_json(msg="When importing a new certificate, all of 'name_tag', 'certificate' and 'private_key' must be specified") + module.debug("No certificate in ACM. Creating new one.") + changed = True + if module.check_mode: + domain = 'example.com' + module.exit_json(certificate=dict(domain_name=domain), changed=True) + else: + cert_arn = acm.import_certificate( + client, + module, + certificate=module.params['certificate'], + private_key=module.params['private_key'], + certificate_chain=module.params['certificate_chain'], + tags=desired_tags, + ) + return (changed, cert_arn) + + +def ensure_certificates_present(client, module, acm, certificates, desired_tags, filter_tags): + cert_arn = None + changed = False + if len(certificates) > 1: + msg = "More than one certificate with Name=%s exists in ACM in this region" % module.params['name_tag'] + module.fail_json(msg=msg, certificates=certificates) + elif len(certificates) == 1: + # Update existing certificate that was previously imported to ACM. + (changed, cert_arn) = update_imported_certificate(client, module, acm, certificates[0], desired_tags) + else: # len(certificates) == 0 + # Import new certificate to ACM. + (changed, cert_arn) = import_certificate(client, module, acm, desired_tags) + + # Add/remove tags to/from certificate + try: + existing_tags = boto3_tag_list_to_ansible_dict(client.list_tags_for_certificate(CertificateArn=cert_arn)['Tags']) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, "Couldn't get tags for certificate") + + purge_tags = module.params.get('purge_tags') + (c, new_tags) = ensure_tags(client, module, cert_arn, existing_tags, desired_tags, purge_tags) + changed |= c + domain = acm.get_domain_of_cert(client=client, module=module, arn=cert_arn) + module.exit_json(certificate=dict(domain_name=domain, arn=cert_arn, tags=new_tags), changed=changed) + + +def ensure_certificates_absent(client, module, acm, certificates): + for cert in certificates: + if not module.check_mode: + acm.delete_certificate(client, module, cert['certificate_arn']) + module.exit_json(arns=[cert['certificate_arn'] for cert in certificates], changed=(len(certificates) > 0)) + + def main(): argument_spec = dict( certificate=dict(), @@ -301,112 +504,68 @@ def main(): domain_name=dict(aliases=['domain']), name_tag=dict(aliases=['name']), private_key=dict(no_log=True), - state=dict(default='present', choices=['present', 'absent']) + tags=dict(type='dict'), + purge_tags=dict(type='bool', default=False), + state=dict(default='present', choices=['present', 'absent']), + ) + module = AnsibleAWSModule( + argument_spec=argument_spec, + supports_check_mode=True, ) - required_if = [ - ['state', 'present', ['certificate', 'name_tag', 'private_key']], - ] - module = AnsibleAWSModule(argument_spec=argument_spec, supports_check_mode=True, required_if=required_if) acm = ACMServiceManager(module) # Check argument requirements if module.params['state'] == 'present': - if module.params['certificate_arn']: - module.fail_json(msg="Parameter 'certificate_arn' is only valid if parameter 'state' is specified as 'absent'") + # at least one of these should be specified. + absent_args = ['certificate_arn', 'domain_name', 'name_tag'] + if sum([(module.params[a] is not None) for a in absent_args]) < 1: + for a in absent_args: + module.debug("%s is %s" % (a, module.params[a])) + module.fail_json(msg="If 'state' is specified as 'present' then at least one of 'name_tag', 'certificate_arn' or 'domain_name' must be specified") else: # absent # exactly one of these should be specified absent_args = ['certificate_arn', 'domain_name', 'name_tag'] if sum([(module.params[a] is not None) for a in absent_args]) != 1: for a in absent_args: module.debug("%s is %s" % (a, module.params[a])) - module.fail_json(msg="If 'state' is specified as 'absent' then exactly one of 'name_tag', certificate_arn' or 'domain_name' must be specified") - - if module.params['name_tag']: - tags = dict(Name=module.params['name_tag']) - else: - tags = None + module.fail_json(msg="If 'state' is specified as 'absent' then exactly one of 'name_tag', 'certificate_arn' or 'domain_name' must be specified") + + filter_tags = None + desired_tags = None + if module.params.get('tags') is not None: + desired_tags = module.params['tags'] + if module.params.get('name_tag') is not None: + # The module was originally implemented to filter certificates based on the 'Name' tag. + # Other tags are not used to filter certificates. + # It would make sense to replace the existing name_tag, domain, certificate_arn attributes + # with a 'filter' attribute, but that would break backwards-compatibility. + filter_tags = dict(Name=module.params['name_tag']) + if desired_tags is not None: + if 'Name' in desired_tags: + if desired_tags['Name'] != module.params['name_tag']: + module.fail_json(msg="Value of 'name_tag' conflicts with value of 'tags.Name'") + else: + desired_tags['Name'] = module.params['name_tag'] + else: + desired_tags = deepcopy(filter_tags) client = module.client('acm') # fetch the list of certificates currently in ACM - certificates = acm.get_certificates(client=client, - module=module, - domain_name=module.params['domain_name'], - arn=module.params['certificate_arn'], - only_tags=tags) + certificates = acm.get_certificates( + client=client, + module=module, + domain_name=module.params['domain_name'], + arn=module.params['certificate_arn'], + only_tags=filter_tags, + ) module.debug("Found %d corresponding certificates in ACM" % len(certificates)) - if module.params['state'] == 'present': - if len(certificates) > 1: - msg = "More than one certificate with Name=%s exists in ACM in this region" % module.params['name_tag'] - module.fail_json(msg=msg, certificates=certificates) - elif len(certificates) == 1: - # update the existing certificate - module.debug("Existing certificate found in ACM") - old_cert = certificates[0] # existing cert in ACM - if ('tags' not in old_cert) or ('Name' not in old_cert['tags']) or (old_cert['tags']['Name'] != module.params['name_tag']): - # shouldn't happen - module.fail_json(msg="Internal error, unsure which certificate to update", certificate=old_cert) - - if 'certificate' not in old_cert: - # shouldn't happen - module.fail_json(msg="Internal error, unsure what the existing cert in ACM is", certificate=old_cert) - - # Are the existing certificate in ACM and the local certificate the same? - same = True - same &= chain_compare(module, old_cert['certificate'], module.params['certificate']) - if module.params['certificate_chain']: - # Need to test this - # not sure if Amazon appends the cert itself to the chain when self-signed - same &= chain_compare(module, old_cert['certificate_chain'], module.params['certificate_chain']) - else: - # When there is no chain with a cert - # it seems Amazon returns the cert itself as the chain - same &= chain_compare(module, old_cert['certificate_chain'], module.params['certificate']) - - if same: - module.debug("Existing certificate in ACM is the same, doing nothing") - domain = acm.get_domain_of_cert(client=client, module=module, arn=old_cert['certificate_arn']) - module.exit_json(certificate=dict(domain_name=domain, arn=old_cert['certificate_arn']), changed=False) - else: - module.debug("Existing certificate in ACM is different, overwriting") - - if module.check_mode: - arn = old_cert['certificate_arn'] - # note: returned domain will be the domain of the previous cert - else: - # update cert in ACM - arn = acm.import_certificate(client, module, - certificate=module.params['certificate'], - private_key=module.params['private_key'], - certificate_chain=module.params['certificate_chain'], - arn=old_cert['certificate_arn'], - tags=tags) - domain = acm.get_domain_of_cert(client=client, module=module, arn=arn) - module.exit_json(certificate=dict(domain_name=domain, arn=arn), changed=True) - else: # len(certificates) == 0 - module.debug("No certificate in ACM. Creating new one.") - if module.check_mode: - domain = 'example.com' - module.exit_json(certificate=dict(domain_name=domain), changed=True) - else: - arn = acm.import_certificate(client=client, - module=module, - certificate=module.params['certificate'], - private_key=module.params['private_key'], - certificate_chain=module.params['certificate_chain'], - tags=tags) - domain = acm.get_domain_of_cert(client=client, module=module, arn=arn) - - module.exit_json(certificate=dict(domain_name=domain, arn=arn), changed=True) + ensure_certificates_present(client, module, acm, certificates, desired_tags, filter_tags) else: # state == absent - for cert in certificates: - if not module.check_mode: - acm.delete_certificate(client, module, cert['certificate_arn']) - module.exit_json(arns=[cert['certificate_arn'] for cert in certificates], - changed=(len(certificates) > 0)) + ensure_certificates_absent(client, module, acm, certificates) if __name__ == '__main__':