Skip to content

Commit

Permalink
refactor iam
Browse files Browse the repository at this point in the history
  • Loading branch information
tremble committed Mar 22, 2024
1 parent 2d79ffd commit 7d9bcf8
Showing 1 changed file with 86 additions and 101 deletions.
187 changes: 86 additions & 101 deletions plugins/module_utils/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
from .arn import parse_aws_arn
from .arn import validate_aws_arn
from .botocore import is_boto3_error_code
from .botocore import normalize_boto3_result
from .errors import AWSErrorHandler
from .exceptions import AnsibleAWSError
from .retries import AWSRetry
from .tagging import ansible_dict_to_boto3_tag_list
from .tagging import boto3_tag_list_to_ansible_dict
from .transformation import AnsibleAWSResource
from .transformation import AnsibleAWSResourceList
from .transformation import BotoResource
from .transformation import BotoResourceList
from .transformation import boto3_resource_list_to_ansible_dict
from .transformation import boto3_resource_to_ansible_dict


class AnsibleIAMError(AnsibleAWSError):
Expand Down Expand Up @@ -198,66 +203,6 @@ def get_iam_managed_policy_version(client, arn, version):
return client.get_policy_version(PolicyArn=arn, VersionId=version)["PolicyVersion"]


def normalize_iam_mfa_device(device):
"""Converts IAM MFA Device from the CamelCase boto3 format to the snake_case Ansible format"""
if not device:
return device
camel_device = camel_dict_to_snake_dict(device)
camel_device["tags"] = boto3_tag_list_to_ansible_dict(device.pop("Tags", []))
return camel_device


def normalize_iam_mfa_devices(devices):
"""Converts a list of IAM MFA Devices from the CamelCase boto3 format to the snake_case Ansible format"""
if not devices:
return []
devices = [normalize_iam_mfa_device(d) for d in devices]
return devices


def normalize_iam_user(user):
"""Converts IAM users from the CamelCase boto3 format to the snake_case Ansible format"""
if not user:
return user
camel_user = camel_dict_to_snake_dict(user)
camel_user["tags"] = boto3_tag_list_to_ansible_dict(user.pop("Tags", []))
return camel_user


def normalize_iam_policy(policy):
"""Converts IAM policies from the CamelCase boto3 format to the snake_case Ansible format"""
if not policy:
return policy
camel_policy = camel_dict_to_snake_dict(policy)
camel_policy["tags"] = boto3_tag_list_to_ansible_dict(policy.get("Tags", []))
return camel_policy


def normalize_iam_group(group):
"""Converts IAM Groups from the CamelCase boto3 format to the snake_case Ansible format"""
if not group:
return group
camel_group = camel_dict_to_snake_dict(normalize_boto3_result(group))
return camel_group


def normalize_iam_access_key(access_key):
"""Converts IAM access keys from the CamelCase boto3 format to the snake_case Ansible format"""
if not access_key:
return access_key
camel_key = camel_dict_to_snake_dict(normalize_boto3_result(access_key))
return camel_key


def normalize_iam_access_keys(access_keys):
"""Converts a list of IAM access keys from the CamelCase boto3 format to the snake_case Ansible format"""
if not access_keys:
return []
access_keys = [normalize_iam_access_key(k) for k in access_keys]
sorted_keys = sorted(access_keys, key=lambda d: d.get("create_date", None))
return sorted_keys


def convert_managed_policy_names_to_arns(client, policy_names):
if all(validate_aws_arn(policy, service="iam") for policy in policy_names if policy is not None):
return policy_names
Expand Down Expand Up @@ -386,46 +331,6 @@ def list_iam_instance_profiles(client, name=None, prefix=None, role=None):
return _list_iam_instance_profiles(client)


def normalize_iam_instance_profile(profile, _v7_compat=False):
"""
Converts a boto3 format IAM instance profile into "Ansible" format
_v7_compat is deprecated and will be removed in release after 2025-05-01 DO NOT USE.
"""

new_profile = camel_dict_to_snake_dict(deepcopy(profile))
if profile.get("Roles"):
new_profile["roles"] = [normalize_iam_role(role, _v7_compat=_v7_compat) for role in profile.get("Roles")]
if profile.get("Tags"):
new_profile["tags"] = boto3_tag_list_to_ansible_dict(profile.get("Tags"))
else:
new_profile["tags"] = {}
return new_profile


def normalize_iam_role(role, _v7_compat=False):
"""
Converts a boto3 format IAM instance role into "Ansible" format
_v7_compat is deprecated and will be removed in release after 2025-05-01 DO NOT USE.
"""

new_role = camel_dict_to_snake_dict(deepcopy(role))
if role.get("InstanceProfiles"):
new_role["instance_profiles"] = [
normalize_iam_instance_profile(profile, _v7_compat=_v7_compat) for profile in role.get("InstanceProfiles")
]
if role.get("AssumeRolePolicyDocument"):
if _v7_compat:
# new_role["assume_role_policy_document"] = role.get("AssumeRolePolicyDocument")
new_role["assume_role_policy_document_raw"] = role.get("AssumeRolePolicyDocument")
else:
new_role["assume_role_policy_document"] = role.get("AssumeRolePolicyDocument")

new_role["tags"] = boto3_tag_list_to_ansible_dict(role.get("Tags", []))
return new_role


@IAMErrorHandler.common_error_handler("tag instance profile")
@AWSRetry.jittered_backoff()
def tag_iam_instance_profile(client, name, tags):
Expand Down Expand Up @@ -496,3 +401,83 @@ def validate_iam_identifiers(resource_type, name=None, path=None):
return path_problem

return None


def normalize_iam_mfa_device(device: BotoResource) -> AnsibleAWSResource:
"""Converts IAM MFA Device from the CamelCase boto3 format to the snake_case Ansible format"""
# MFA Devices don't support Tags (as of 1.34.52)
return boto3_resource_to_ansible_dict(device)


def normalize_iam_mfa_devices(devices: BotoResourceList) -> AnsibleAWSResourceList:
"""Converts a list of IAM MFA Devices from the CamelCase boto3 format to the snake_case Ansible format"""
# MFA Devices don't support Tags (as of 1.34.52)
return boto3_resource_list_to_ansible_dict(devices)


def normalize_iam_user(user: BotoResource) -> AnsibleAWSResource:
"""Converts IAM users from the CamelCase boto3 format to the snake_case Ansible format"""
return boto3_resource_to_ansible_dict(user)


def normalize_iam_policy(policy: BotoResource) -> AnsibleAWSResource:
"""Converts IAM policies from the CamelCase boto3 format to the snake_case Ansible format"""
return boto3_resource_to_ansible_dict(policy)


def normalize_iam_group(group: BotoResource) -> AnsibleAWSResource:
"""Converts IAM Groups from the CamelCase boto3 format to the snake_case Ansible format"""
# Groups don't support Tags (as of 1.34.52)
return boto3_resource_to_ansible_dict(group, force_tags=False)


def normalize_iam_access_key(access_key: BotoResource) -> AnsibleAWSResource:
"""Converts IAM access keys from the CamelCase boto3 format to the snake_case Ansible format"""
# Access Keys don't support Tags (as of 1.34.52)
return boto3_resource_to_ansible_dict(access_key, force_tags=False)


def normalize_iam_access_keys(access_keys: BotoResourceList) -> AnsibleAWSResourceList:
"""Converts a list of IAM access keys from the CamelCase boto3 format to the snake_case Ansible format"""
# Access Keys don't support Tags (as of 1.34.52)
if not access_keys:
return access_keys
access_keys = boto3_resource_list_to_ansible_dict(access_keys, force_tags=False)
return sorted(access_keys, key=lambda d: d.get("create_date", None))


def normalize_iam_instance_profile(profile: BotoResource) -> AnsibleAWSResource:
"""
Converts a boto3 format IAM instance profile into "Ansible" format
_v7_compat is deprecated and will be removed in release after 2025-05-01 DO NOT USE.
"""
transforms = {"Roles": _normalize_iam_roles}
transformed_profile = boto3_resource_to_ansible_dict(profile, nested_transforms=transforms)
return transformed_profile


def normalize_iam_role(role: BotoResource, _v7_compat: bool = False) -> AnsibleAWSResource:
"""
Converts a boto3 format IAM instance role into "Ansible" format
_v7_compat is deprecated and will be removed in release after 2025-05-01 DO NOT USE.
"""
transforms = {"InstanceProfiles": _normalize_iam_instance_profiles}
ignore_list = [] if _v7_compat else ["AssumeRolePolicyDocument"]
transformed_role = boto3_resource_to_ansible_dict(role, nested_transforms=transforms, ignore_list=ignore_list)
if _v7_compat and role.get("AssumeRolePolicyDocument"):
transformed_role["assume_role_policy_document_raw"] = role["AssumeRolePolicyDocument"]
return transformed_role


def _normalize_iam_instance_profiles(profiles: BotoResourceList) -> AnsibleAWSResourceList:
if not profiles:
return profiles
return [normalize_iam_instance_profile(p) for p in profiles]


def _normalize_iam_roles(roles: BotoResourceList) -> AnsibleAWSResourceList:
if not roles:
return roles
return [normalize_iam_role(r) for r in roles]

0 comments on commit 7d9bcf8

Please sign in to comment.