diff --git a/README.md b/README.md index 2570fb5..0cf3733 100644 --- a/README.md +++ b/README.md @@ -6,16 +6,24 @@ stored in the `~/.aws/credentials` file for re-use. ## Configuration You will need to configure your roles and IAM User credentials in the same places as you are used to. So in your -`~/.aws/credentials` file you will need to have the following: +`~/.aws/credentials` file. To make this process as easy as possible you could use the following command: + +```bash +aws-iam-login my-profile init +``` + +This command will fetch the ARN of the caller identity. Based on this identity we will determin the `username` and +`mfa_serial` of the IAM User. These will then be stored in the `~/.aws/credentials` file. For example: ```ini [my-profile] aws_access_key_id = XXXXXXX aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXX mfa_serial = arn:aws:iam::111122223333:mfa/my-iam-user +username = my-iam-user ``` -The only addition is the `mfa_serial` field. +The only addition is the `username` and `mfa_serial` fields. ### AWS Least privileged diff --git a/aws_iam_login/__init__.py b/aws_iam_login/__init__.py index 83ccff7..7381b80 100644 --- a/aws_iam_login/__init__.py +++ b/aws_iam_login/__init__.py @@ -3,9 +3,10 @@ from botocore.exceptions import ParamValidationError, ClientError from click import Context +from aws_iam_login.actions.initialize_configuration import InitializeConfiguration from aws_iam_login.observer import Observer from .aws_config import AWSConfig -from .rotate_access_keys import RotateAccessKeys +from aws_iam_login.actions.rotate_access_keys import RotateAccessKeys from .credentials import Credentials from .application_context import ApplicationContext, ApplicationMessages @@ -46,19 +47,35 @@ def credentials(ctx: ApplicationContext) -> None: @main.command() -@click.argument("username") @click.pass_obj -def rotate(ctx: ApplicationContext, username: str) -> None: +def init(ctx: ApplicationContext) -> None: + """ + Initialize your `.aws/configuration` + """ + click.echo(f"Looking up additional required data...") + + action = InitializeConfiguration(profile=ctx.profile) + action.subscribe_subject(ctx.subject) + + if not action.execute(): + click.echo(f"Failed to initialize the {ctx.profile} profile") + exit(1) + + click.echo(f"The {ctx.profile} profile has been successfully initialized!") + + +@main.command() +@click.pass_obj +def rotate(ctx: ApplicationContext) -> None: """ Rotate your IAM User credentials """ click.echo(f"Key rotation process in progress") - action = RotateAccessKeys( - profile=ctx.profile, username=username, subject=ctx.subject - ) + action = RotateAccessKeys(profile=ctx.profile) + action.subscribe_subject(ctx.subject) - if not action.rotate(): + if not action.execute(): click.echo(f"Failed to rotate the access keys for the {ctx.profile} profile") exit(1) diff --git a/aws_iam_login/access_key.py b/aws_iam_login/access_key.py index ebc87b7..4aea757 100644 --- a/aws_iam_login/access_key.py +++ b/aws_iam_login/access_key.py @@ -14,14 +14,6 @@ def __iter__(self): yield "aws_access_key_id", self.access_key yield "aws_secret_access_key", self.secret_access_key - @property - def profile(self) -> str: - return str(self.__raw_data.get("Profile", "")) - - @profile.setter - def profile(self, value: str) -> None: - self.__raw_data["Profile"] = value - @property def username(self) -> str: return str(self.__raw_data.get("UserName", "")) diff --git a/aws_iam_login/actions/__init__.py b/aws_iam_login/actions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aws_iam_login/actions/action.py b/aws_iam_login/actions/action.py new file mode 100644 index 0000000..188f2b7 --- /dev/null +++ b/aws_iam_login/actions/action.py @@ -0,0 +1,36 @@ +from typing import Optional +from abc import ABC, abstractmethod +from aws_iam_login.application_context import ApplicationMessages + + +class Action(ABC): + """ + Understands generic actions that are executed + """ + + __subject: Optional[ApplicationMessages] = None + + def subscribe_subject(self, subject: ApplicationMessages): + self.__subject = subject + + def info(self, message: str) -> None: + if self.__subject: + self.__subject.send(message_type="INFO", message=message) + + def warning(self, message: str) -> None: + if self.__subject: + self.__subject.send(message_type="WARN", message=message) + + def error(self, message: str) -> None: + if self.__subject: + self.__subject.send(message_type="ERROR", message=message) + + def exception(self, message: str, exception: Exception) -> None: + if self.__subject: + self.__subject.send( + message_type="ERROR", message=message, exception=exception + ) + + @abstractmethod + def execute(self) -> bool: + pass diff --git a/aws_iam_login/actions/initialize_configuration.py b/aws_iam_login/actions/initialize_configuration.py new file mode 100644 index 0000000..cbe5ae3 --- /dev/null +++ b/aws_iam_login/actions/initialize_configuration.py @@ -0,0 +1,41 @@ +from typing import Optional +import boto3 + +from aws_iam_login.actions.action import Action +from aws_iam_login.aws_config import AWSConfig + + +class InitializeConfiguration(Action): + """ + Understands how to configure the AWS profiles to be used by aws-iam-login. + """ + + __cached_client: Optional[boto3.Session.client] = None + + def __init__(self, profile: str) -> None: + self.__profile = profile + + self.__config = AWSConfig(profile) + self.__client = boto3.Session(profile_name=profile).client("sts") + + def execute(self) -> bool: + if self.__config.mfa_serial and self.__config.username: + self.info(f"The {self.__profile} profile is already initialized.") + return True + + try: + data = self.__client.get_caller_identity() + mfa_serial = data["Arn"].replace(":user/", ":mfa/") + username = mfa_serial.split("/")[-1] + self.info(f"Determined MFA ARN: {mfa_serial}") + self.info(f"Determined IAM Username: {username}") + self.__config.initialize(username=username, mfa_serial=mfa_serial) + + return True + except Exception as exc: + message = ( + f"Failed to get the caller identity for the {self.__profile} profile." + ) + self.exception(message=message, exception=exc) + + return False diff --git a/aws_iam_login/actions/rotate_access_keys.py b/aws_iam_login/actions/rotate_access_keys.py new file mode 100644 index 0000000..f4af066 --- /dev/null +++ b/aws_iam_login/actions/rotate_access_keys.py @@ -0,0 +1,102 @@ +from typing import Optional, List +import time +import boto3 +from aws_iam_login import AWSConfig +from aws_iam_login.actions.action import Action +from aws_iam_login.access_key import AccessKey +from functools import lru_cache + + +class RotateAccessKeys(Action): + """ + Understands how to rotate access keys + """ + + __cached_client: Optional[boto3.Session.client] = None + + def __init__(self, profile: str) -> None: + self.__profile = profile + self.__config = AWSConfig(profile) + + @lru_cache() + def new_key(self) -> Optional[AccessKey]: + try: + response = self.__client.create_access_key(UserName=self.__config.username) + self.info( + message="Sleep for 10 seconds to make sure the credentials are active." + ) + time.sleep(10) + return AccessKey(response["AccessKey"]) + except Exception as exc: + message = f"Failed to create a new access key for the user: {self.__config.username}" + self.exception(message=message, exception=exc) + + return None + + def execute(self) -> bool: + if not self.__rotation_possible(): + return False + + new_key = self.new_key() + + if self.__config.key and new_key: + try: + self.__flush_client() + self.__disable_key(self.__config.key) + self.__config.write(f"{self.__profile}", new_key) + self.__delete_key(self.__config.key) + + return True + except Exception as exc: + message = f"Failed to rotate the credentials for the user: {self.__config.username}" + self.exception(message=message, exception=exc) + + return False + + def __flush_client(self) -> None: + self.__cached_client = None + + @property + def __client(self) -> boto3.Session.client: + if not self.__cached_client: + session = boto3.Session(profile_name=self.__profile) + self.__cached_client = session.client("iam") + + return self.__cached_client + + def __get_current_keys(self) -> List[dict]: + try: + response = self.__client.list_access_keys(UserName=self.__config.username) + return response["AccessKeyMetadata"] + except Exception as exc: + message = ( + f"Failed to list access keys for the user: {self.__config.username}" + ) + self.exception(message=message, exception=exc) + + return [] + + def __rotation_possible(self) -> bool: + if not self.__config.valid: + self.warning( + f"The configuration for the {self.__profile} is invalid, please try `aws-iam-login {self.__profile} init` first!" + ) + return False + + if len(self.__get_current_keys()) != 1: + self.error(f"There needs to be only 1 AccessKey present!") + return False + + return True + + def __disable_key(self, key: AccessKey) -> None: + self.__client.update_access_key( + UserName=self.__config.username, + AccessKeyId=key.access_key, + Status="Inactive", + ) + + def __delete_key(self, key: AccessKey) -> None: + self.__client.delete_access_key( + UserName=self.__config.username, AccessKeyId=key.access_key + ) diff --git a/aws_iam_login/aws_config.py b/aws_iam_login/aws_config.py index 7e4751c..d978344 100644 --- a/aws_iam_login/aws_config.py +++ b/aws_iam_login/aws_config.py @@ -21,12 +21,12 @@ def __init__(self, profile: str) -> None: @property def valid(self) -> bool: - valid = bool( + return bool( self.__profile_name and self.__profile_name in self.__config and self.mfa_serial + and self.username ) - return valid @property def __config(self) -> configparser.ConfigParser: @@ -41,8 +41,12 @@ def __profile(self) -> configparser.SectionProxy: return self.__config[self.__profile_name] @property - def mfa_serial(self) -> Optional[str]: - return self.__profile.get("mfa_serial", None) + def mfa_serial(self) -> str: + return self.__profile.get("mfa_serial", "") + + @property + def username(self) -> str: + return self.__profile.get("username", "") @property def key(self) -> AccessKey: @@ -58,18 +62,25 @@ def key(self) -> AccessKey: return self.__cached_key + def initialize(self, username: str, mfa_serial: str) -> None: + self.__profile["username"] = username + self.__profile["mfa_serial"] = mfa_serial + self.__save_configuration() + + def __save_configuration(self): + with open(self.__credential_file, "w") as fh: + self.__config.write(fh) + def write( self, profile: str, credentials: Union[None, Credentials, AccessKey] ) -> None: if not credentials: return - data = dict(credentials) - - if self.mfa_serial: - data["mfa_serial"] = self.mfa_serial + for key, value in dict(credentials).items(): + if profile not in self.__config: + self.__config[profile] = {} - self.__config[profile] = data + self.__config[profile][key] = value - with open(self.__credential_file, "w") as fh: - self.__config.write(fh) + self.__save_configuration() diff --git a/aws_iam_login/rotate_access_keys.py b/aws_iam_login/rotate_access_keys.py deleted file mode 100644 index f0fdf1c..0000000 --- a/aws_iam_login/rotate_access_keys.py +++ /dev/null @@ -1,122 +0,0 @@ -from typing import Optional -import time -import boto3 -from aws_iam_login import AWSConfig -from aws_iam_login.application_context import ApplicationMessages -from aws_iam_login.access_key import AccessKey -from functools import lru_cache - - -class RotateAccessKeys: - """ - Understands how to rotate access keys - """ - - __cached_client: Optional[boto3.Session.client] = None - - def __init__( - self, profile: str, username: str, subject: ApplicationMessages - ) -> None: - super().__init__() - self.__subject = subject - self.__profile = profile - self.__config = AWSConfig(profile) - self.__username = username - - def __message( - self, message_type: str, message: str, exception: Optional[Exception] = None - ) -> None: - self.__subject.send( - message_type=message_type, message=message, exception=exception - ) - - @lru_cache() - def current_key(self) -> Optional[AccessKey]: - try: - response = self.__client.list_access_keys(UserName=self.__username) - - if len(response["AccessKeyMetadata"]) != 1: - raise Exception( - f"There are {len(response['AccessKeyMetadata'])} keys available, 1 is expected!" - ) - - key = AccessKey(response["AccessKeyMetadata"][0]) - key.secret_access_key = self.__config.key.secret_access_key - key.profile = self.__config.key.profile - return key - - except Exception as exc: - self.__message( - message_type="exception", - message=f"Failed to list access keys for the user: {self.__username}", - exception=exc, - ) - - return None - - @lru_cache() - def new_key(self) -> Optional[AccessKey]: - try: - response = self.__client.create_access_key(UserName=self.__username) - self.__message( - message_type="info", - message="Sleep for 10 seconds to make sure the credentials are active.", - ) - time.sleep(10) - return AccessKey(response["AccessKey"]) - except Exception as exc: - self.__message( - message_type="exception", - message=f"Failed to create a new access key for the user: {self.__username}", - exception=exc, - ) - - return None - - def rotate(self) -> bool: - if not self.__rotation_possible(): - return False - - current_key = self.current_key() - new_key = self.new_key() - - if current_key and new_key: - try: - self.__flush_client() - self.__disable_key(current_key) - self.__config.write(f"{self.__profile}", new_key) - self.__delete_key(current_key) - - return True - except Exception as exc: - self.__message( - message_type="exception", - message=f"Failed to rotate the credentials for the user: {self.__username}", - exception=exc, - ) - - return False - - def __flush_client(self) -> None: - self.__cached_client = None - - @property - def __client(self) -> boto3.Session.client: - if not self.__cached_client: - session = boto3.Session(profile_name=self.__profile) - self.__cached_client = session.client("iam") - - return self.__cached_client - - def __rotation_possible(self) -> bool: - return bool(self.__config.valid and self.current_key()) - - def __disable_key(self, key: AccessKey) -> None: - self.__client.update_access_key( - UserName=self.__username, AccessKeyId=key.access_key, Status="Inactive" - ) - - def __delete_key(self, key: AccessKey) -> None: - self.__client.delete_access_key( - UserName=self.__username, AccessKeyId=key.access_key - ) diff --git a/poetry.lock b/poetry.lock index d84d64c..0b236e7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,6 +1,6 @@ [[package]] name = "atomicwrites" -version = "1.4.0" +version = "1.4.1" description = "Atomic file writes." category = "dev" optional = false @@ -60,14 +60,14 @@ dev = ["build (==0.8.0)", "flake8 (==4.0.1)", "hashin (==0.17.0)", "pip-tools (= [[package]] name = "boto3" -version = "1.24.11" +version = "1.24.34" description = "The AWS SDK for Python" category = "main" optional = false python-versions = ">= 3.7" [package.dependencies] -botocore = ">=1.27.11,<1.28.0" +botocore = ">=1.27.34,<1.28.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.6.0,<0.7.0" @@ -76,7 +76,7 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.27.11" +version = "1.27.34" description = "Low-level, data-driven core of boto 3." category = "main" optional = false @@ -152,7 +152,7 @@ test = ["flake8 (==3.7.8)", "hypothesis (==3.55.3)"] [[package]] name = "coverage" -version = "6.4.1" +version = "6.4.2" description = "Code coverage measurement for Python" category = "dev" optional = false @@ -245,7 +245,7 @@ trio = ["trio", "async-generator"] [[package]] name = "jmespath" -version = "1.0.0" +version = "1.0.1" description = "JSON Matching Expressions" category = "main" optional = false @@ -253,7 +253,7 @@ python-versions = ">=3.7" [[package]] name = "keyring" -version = "23.6.0" +version = "23.7.0" description = "Store and access your passwords safely." category = "dev" optional = false @@ -267,7 +267,7 @@ SecretStorage = {version = ">=3.2", markers = "sys_platform == \"linux\""} [package.extras] docs = ["sphinx", "jaraco.packaging (>=9)", "rst.linker (>=1.9)", "jaraco.tidelift (>=1.4)"] -testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "pytest-black (>=0.3.7)", "pytest-mypy (>=0.9.1)"] +testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-black (>=0.3.7)", "pytest-mypy (>=0.9.1)"] [[package]] name = "mando" @@ -532,7 +532,7 @@ idna2008 = ["idna"] [[package]] name = "rich" -version = "12.4.4" +version = "12.5.1" description = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal" category = "dev" optional = false @@ -617,11 +617,11 @@ python-versions = ">=3.7" [[package]] name = "urllib3" -version = "1.26.9" +version = "1.26.10" description = "HTTP library with thread-safe connection pooling, file post, and more." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, <4" [package.extras] brotli = ["brotlicffi (>=0.8.0)", "brotli (>=1.0.9)", "brotlipy (>=0.6.0)"] @@ -651,15 +651,15 @@ requests = ">=2.0,<3.0" [[package]] name = "zipp" -version = "3.8.0" +version = "3.8.1" description = "Backport of pathlib-compatible object wrapper for zip files" category = "dev" optional = false python-versions = ">=3.7" [package.extras] -docs = ["sphinx", "jaraco.packaging (>=9)", "rst.linker (>=1.9)"] -testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "jaraco.itertools", "func-timeout", "pytest-black (>=0.3.7)", "pytest-mypy (>=0.9.1)"] +docs = ["sphinx", "jaraco.packaging (>=9)", "rst.linker (>=1.9)", "jaraco.tidelift (>=1.4)"] +testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.3)", "jaraco.itertools", "func-timeout", "pytest-black (>=0.3.7)", "pytest-mypy (>=0.9.1)"] [metadata] lock-version = "1.1" @@ -667,10 +667,7 @@ python-versions = "^3.8" content-hash = "c261a3107e9edf65515ebe49595fc058f0b92e51ec723a7f6d4f7d24a395f12f" [metadata.files] -atomicwrites = [ - {file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"}, - {file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"}, -] +atomicwrites = [] attrs = [ {file = "attrs-21.4.0-py2.py3-none-any.whl", hash = "sha256:2d27e3784d7a565d36ab851fe94887c5eccd6a463168875832a1be79c82828b4"}, {file = "attrs-21.4.0.tar.gz", hash = "sha256:626ba8234211db98e869df76230a137c4c40a12d72445c45d5f5b716f076e2fd"}, @@ -704,14 +701,8 @@ bleach = [ {file = "bleach-5.0.1-py3-none-any.whl", hash = "sha256:085f7f33c15bd408dd9b17a4ad77c577db66d76203e5984b1bd59baeee948b2a"}, {file = "bleach-5.0.1.tar.gz", hash = "sha256:0d03255c47eb9bd2f26aa9bb7f2107732e7e8fe195ca2f64709fcf3b0a4a085c"}, ] -boto3 = [ - {file = "boto3-1.24.11-py3-none-any.whl", hash = "sha256:19d6fb2b5e51f10e7b5d551a111cf9c64b9a5144b2838493ac41be0706e590cf"}, - {file = "boto3-1.24.11.tar.gz", hash = "sha256:79fc9699006af26de4413105e458af5f1626ba32d1f00fa0b3e8b94c2b16e2dc"}, -] -botocore = [ - {file = "botocore-1.27.11-py3-none-any.whl", hash = "sha256:8efab7f85156705cbe532aeb17b065b67ba32addc3270d9000964b98c07bb20a"}, - {file = "botocore-1.27.11.tar.gz", hash = "sha256:92f099a36df832d7f151682e1efa8e1d47d23a5cedde8692adcaa6420bcb18aa"}, -] +boto3 = [] +botocore = [] certifi = [ {file = "certifi-2022.6.15-py3-none-any.whl", hash = "sha256:fe86415d55e84719d75f8b69414f6438ac3547d2078ab91b67e779ef69378412"}, {file = "certifi-2022.6.15.tar.gz", hash = "sha256:84c85a9078b11105f04f3036a9482ae10e4621616db313fe045dd24743a0820d"}, @@ -798,49 +789,7 @@ commonmark = [ {file = "commonmark-0.9.1-py2.py3-none-any.whl", hash = "sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9"}, {file = "commonmark-0.9.1.tar.gz", hash = "sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60"}, ] -coverage = [ - {file = "coverage-6.4.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f1d5aa2703e1dab4ae6cf416eb0095304f49d004c39e9db1d86f57924f43006b"}, - {file = "coverage-6.4.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4ce1b258493cbf8aec43e9b50d89982346b98e9ffdfaae8ae5793bc112fb0068"}, - {file = "coverage-6.4.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:83c4e737f60c6936460c5be330d296dd5b48b3963f48634c53b3f7deb0f34ec4"}, - {file = "coverage-6.4.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:84e65ef149028516c6d64461b95a8dbcfce95cfd5b9eb634320596173332ea84"}, - {file = "coverage-6.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f69718750eaae75efe506406c490d6fc5a6161d047206cc63ce25527e8a3adad"}, - {file = "coverage-6.4.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:e57816f8ffe46b1df8f12e1b348f06d164fd5219beba7d9433ba79608ef011cc"}, - {file = "coverage-6.4.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:01c5615d13f3dd3aa8543afc069e5319cfa0c7d712f6e04b920431e5c564a749"}, - {file = "coverage-6.4.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:75ab269400706fab15981fd4bd5080c56bd5cc07c3bccb86aab5e1d5a88dc8f4"}, - {file = "coverage-6.4.1-cp310-cp310-win32.whl", hash = "sha256:a7f3049243783df2e6cc6deafc49ea123522b59f464831476d3d1448e30d72df"}, - {file = "coverage-6.4.1-cp310-cp310-win_amd64.whl", hash = "sha256:ee2ddcac99b2d2aec413e36d7a429ae9ebcadf912946b13ffa88e7d4c9b712d6"}, - {file = "coverage-6.4.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:fb73e0011b8793c053bfa85e53129ba5f0250fdc0392c1591fd35d915ec75c46"}, - {file = "coverage-6.4.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:106c16dfe494de3193ec55cac9640dd039b66e196e4641fa8ac396181578b982"}, - {file = "coverage-6.4.1-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:87f4f3df85aa39da00fd3ec4b5abeb7407e82b68c7c5ad181308b0e2526da5d4"}, - {file = "coverage-6.4.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:961e2fb0680b4f5ad63234e0bf55dfb90d302740ae9c7ed0120677a94a1590cb"}, - {file = "coverage-6.4.1-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:cec3a0f75c8f1031825e19cd86ee787e87cf03e4fd2865c79c057092e69e3a3b"}, - {file = "coverage-6.4.1-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:129cd05ba6f0d08a766d942a9ed4b29283aff7b2cccf5b7ce279d50796860bb3"}, - {file = "coverage-6.4.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:bf5601c33213d3cb19d17a796f8a14a9eaa5e87629a53979a5981e3e3ae166f6"}, - {file = "coverage-6.4.1-cp37-cp37m-win32.whl", hash = "sha256:269eaa2c20a13a5bf17558d4dc91a8d078c4fa1872f25303dddcbba3a813085e"}, - {file = "coverage-6.4.1-cp37-cp37m-win_amd64.whl", hash = "sha256:f02cbbf8119db68455b9d763f2f8737bb7db7e43720afa07d8eb1604e5c5ae28"}, - {file = "coverage-6.4.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ffa9297c3a453fba4717d06df579af42ab9a28022444cae7fa605af4df612d54"}, - {file = "coverage-6.4.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:145f296d00441ca703a659e8f3eb48ae39fb083baba2d7ce4482fb2723e050d9"}, - {file = "coverage-6.4.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d67d44996140af8b84284e5e7d398e589574b376fb4de8ccd28d82ad8e3bea13"}, - {file = "coverage-6.4.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2bd9a6fc18aab8d2e18f89b7ff91c0f34ff4d5e0ba0b33e989b3cd4194c81fd9"}, - {file = "coverage-6.4.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3384f2a3652cef289e38100f2d037956194a837221edd520a7ee5b42d00cc605"}, - {file = "coverage-6.4.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:9b3e07152b4563722be523e8cd0b209e0d1a373022cfbde395ebb6575bf6790d"}, - {file = "coverage-6.4.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:1480ff858b4113db2718848d7b2d1b75bc79895a9c22e76a221b9d8d62496428"}, - {file = "coverage-6.4.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:865d69ae811a392f4d06bde506d531f6a28a00af36f5c8649684a9e5e4a85c83"}, - {file = "coverage-6.4.1-cp38-cp38-win32.whl", hash = "sha256:664a47ce62fe4bef9e2d2c430306e1428ecea207ffd68649e3b942fa8ea83b0b"}, - {file = "coverage-6.4.1-cp38-cp38-win_amd64.whl", hash = "sha256:26dff09fb0d82693ba9e6231248641d60ba606150d02ed45110f9ec26404ed1c"}, - {file = "coverage-6.4.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:d9c80df769f5ec05ad21ea34be7458d1dc51ff1fb4b2219e77fe24edf462d6df"}, - {file = "coverage-6.4.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:39ee53946bf009788108b4dd2894bf1349b4e0ca18c2016ffa7d26ce46b8f10d"}, - {file = "coverage-6.4.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f5b66caa62922531059bc5ac04f836860412f7f88d38a476eda0a6f11d4724f4"}, - {file = "coverage-6.4.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fd180ed867e289964404051a958f7cccabdeed423f91a899829264bb7974d3d3"}, - {file = "coverage-6.4.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:84631e81dd053e8a0d4967cedab6db94345f1c36107c71698f746cb2636c63e3"}, - {file = "coverage-6.4.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:8c08da0bd238f2970230c2a0d28ff0e99961598cb2e810245d7fc5afcf1254e8"}, - {file = "coverage-6.4.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:d42c549a8f41dc103a8004b9f0c433e2086add8a719da00e246e17cbe4056f72"}, - {file = "coverage-6.4.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:309ce4a522ed5fca432af4ebe0f32b21d6d7ccbb0f5fcc99290e71feba67c264"}, - {file = "coverage-6.4.1-cp39-cp39-win32.whl", hash = "sha256:fdb6f7bd51c2d1714cea40718f6149ad9be6a2ee7d93b19e9f00934c0f2a74d9"}, - {file = "coverage-6.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:342d4aefd1c3e7f620a13f4fe563154d808b69cccef415415aece4c786665397"}, - {file = "coverage-6.4.1-pp36.pp37.pp38-none-any.whl", hash = "sha256:4803e7ccf93230accb928f3a68f00ffa80a88213af98ed338a57ad021ef06815"}, - {file = "coverage-6.4.1.tar.gz", hash = "sha256:4321f075095a096e70aff1d002030ee612b65a205a0a0f5b815280d5dc58100c"}, -] +coverage = [] cryptography = [ {file = "cryptography-37.0.4-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:549153378611c0cca1042f20fd9c5030d37a72f634c9326e225c9f666d472884"}, {file = "cryptography-37.0.4-cp36-abi3-macosx_10_10_x86_64.whl", hash = "sha256:a958c52505c8adf0d3822703078580d2c0456dd1d27fabfb6f76fe63d2971cd6"}, @@ -888,14 +837,8 @@ jeepney = [ {file = "jeepney-0.8.0-py3-none-any.whl", hash = "sha256:c0a454ad016ca575060802ee4d590dd912e35c122fa04e70306de3d076cce755"}, {file = "jeepney-0.8.0.tar.gz", hash = "sha256:5efe48d255973902f6badc3ce55e2aa6c5c3b3bc642059ef3a91247bcfcc5806"}, ] -jmespath = [ - {file = "jmespath-1.0.0-py3-none-any.whl", hash = "sha256:e8dcd576ed616f14ec02eed0005c85973b5890083313860136657e24784e4c04"}, - {file = "jmespath-1.0.0.tar.gz", hash = "sha256:a490e280edd1f57d6de88636992d05b71e97d69a26a19f058ecf7d304474bf5e"}, -] -keyring = [ - {file = "keyring-23.6.0-py3-none-any.whl", hash = "sha256:372ff2fc43ab779e3f87911c26e6c7acc8bb440cbd82683e383ca37594cb0617"}, - {file = "keyring-23.6.0.tar.gz", hash = "sha256:3ac00c26e4c93739e19103091a9986a9f79665a78cf15a4df1dba7ea9ac8da2f"}, -] +jmespath = [] +keyring = [] mando = [ {file = "mando-0.6.4-py2.py3-none-any.whl", hash = "sha256:4ce09faec7e5192ffc3c57830e26acba0fd6cd11e1ee81af0d4df0657463bd1c"}, {file = "mando-0.6.4.tar.gz", hash = "sha256:79feb19dc0f097daa64a1243db578e7674909b75f88ac2220f1c065c10a0d960"}, @@ -1036,10 +979,7 @@ rfc3986 = [ {file = "rfc3986-2.0.0-py2.py3-none-any.whl", hash = "sha256:50b1502b60e289cb37883f3dfd34532b8873c7de9f49bb546641ce9cbd256ebd"}, {file = "rfc3986-2.0.0.tar.gz", hash = "sha256:97aacf9dbd4bfd829baad6e6309fa6573aaf1be3f6fa735c8ab05e46cecb261c"}, ] -rich = [ - {file = "rich-12.4.4-py3-none-any.whl", hash = "sha256:d2bbd99c320a2532ac71ff6a3164867884357da3e3301f0240090c5d2fdac7ec"}, - {file = "rich-12.4.4.tar.gz", hash = "sha256:4c586de507202505346f3e32d1363eb9ed6932f0c2f63184dea88983ff4971e2"}, -] +rich = [] s3transfer = [ {file = "s3transfer-0.6.0-py3-none-any.whl", hash = "sha256:06176b74f3a15f61f1b4f25a1fc29a4429040b7647133a463da8fa5bd28d5ecd"}, {file = "s3transfer-0.6.0.tar.gz", hash = "sha256:2ed07d3866f523cc561bf4a00fc5535827981b117dd7876f036b0c1aca42c947"}, @@ -1064,10 +1004,7 @@ typing-extensions = [ {file = "typing_extensions-4.3.0-py3-none-any.whl", hash = "sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02"}, {file = "typing_extensions-4.3.0.tar.gz", hash = "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"}, ] -urllib3 = [ - {file = "urllib3-1.26.9-py2.py3-none-any.whl", hash = "sha256:44ece4d53fb1706f667c9bd1c648f5469a2ec925fcf3a776667042d645472c14"}, - {file = "urllib3-1.26.9.tar.gz", hash = "sha256:aabaf16477806a5e1dd19aa41f8c2b7950dd3c746362d7e3223dbe6de6ac448e"}, -] +urllib3 = [] webencodings = [ {file = "webencodings-0.5.1-py2.py3-none-any.whl", hash = "sha256:a0af1213f3c2226497a97e2b3aa01a7e4bee4f403f95be16fc9acd2947514a78"}, {file = "webencodings-0.5.1.tar.gz", hash = "sha256:b36a1c245f2d304965eb4e0a82848379241dc04b865afcc4aab16748587e1923"}, @@ -1076,7 +1013,4 @@ xenon = [ {file = "xenon-0.9.0-py2.py3-none-any.whl", hash = "sha256:994c80c7f1c6d40596b600b93734d85a5739208f31895ef99f1e4d362caf9e35"}, {file = "xenon-0.9.0.tar.gz", hash = "sha256:d2b9cb6c6260f771a432c1e588e51fddb17858f88f73ef641e7532f7a5f58fb8"}, ] -zipp = [ - {file = "zipp-3.8.0-py3-none-any.whl", hash = "sha256:c4f6e5bbf48e74f7a38e7cc5b0480ff42b0ae5178957d564d18932525d5cf099"}, - {file = "zipp-3.8.0.tar.gz", hash = "sha256:56bf8aadb83c24db6c4b577e13de374ccfb67da2078beba1d037c17980bf43ad"}, -] +zipp = [] diff --git a/tests/.DS_Store b/tests/.DS_Store new file mode 100644 index 0000000..cafa12a Binary files /dev/null and b/tests/.DS_Store differ diff --git a/tests/actions/.DS_Store b/tests/actions/.DS_Store new file mode 100644 index 0000000..71c059d Binary files /dev/null and b/tests/actions/.DS_Store differ diff --git a/tests/actions/__init__.py b/tests/actions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/actions/test_action.py b/tests/actions/test_action.py new file mode 100644 index 0000000..2cf6bc5 --- /dev/null +++ b/tests/actions/test_action.py @@ -0,0 +1,118 @@ +from typing import Callable + +import pytest + +from aws_iam_login import ApplicationMessages +from aws_iam_login.actions.action import Action +from aws_iam_login.observer import Observer, Subject + + +class CustomAction(Action): + def execute(self) -> bool: + return True + + +class CustomObserver(Observer): + def __init__(self, callback: Callable[[ApplicationMessages], None]) -> None: + self.__callback = callback + + def update(self, subject: Subject) -> None: + if isinstance(subject, ApplicationMessages): + self.__callback(subject) + + +@pytest.mark.parametrize( + "message", + [ + "My info message", + "My other info message", + ], +) +def test_info_messages(message: str) -> None: + def validate(subject: ApplicationMessages) -> None: + assert subject.type == "INFO" + assert subject.message == message + + observer = CustomObserver(callback=validate) + subject = ApplicationMessages() + subject.attach(observer) + + action = CustomAction() + action.info("Uncaught message") + action.subscribe_subject(subject) + action.info(message) + subject.detach(observer) + action.info("Uncaught message") + + +@pytest.mark.parametrize( + "message", + [ + "My warning message", + "My other warning message", + ], +) +def test_warning_messages(message: str) -> None: + def validate(subject: ApplicationMessages) -> None: + assert subject.type == "WARN" + assert subject.message == message + + observer = CustomObserver(callback=validate) + subject = ApplicationMessages() + subject.attach(observer) + + action = CustomAction() + action.warning("Uncaught message") + action.subscribe_subject(subject) + action.warning(message) + subject.detach(observer) + action.warning("Uncaught message") + + +@pytest.mark.parametrize( + "message", + [ + "My error message", + "My other error message", + ], +) +def test_error_messages(message: str) -> None: + def validate(subject: ApplicationMessages) -> None: + assert subject.type == "ERROR" + assert subject.message == message + + observer = CustomObserver(callback=validate) + subject = ApplicationMessages() + subject.attach(observer) + + action = CustomAction() + action.error("Uncaught message") + action.subscribe_subject(subject) + action.error(message) + subject.detach(observer) + action.error("Uncaught message") + + +@pytest.mark.parametrize( + "message, exception", + [ + ("Something went wrong", Exception("My Exception message")), + ("Something else went wrong", Exception("My Exception message")), + ], +) +def test_exception_messages(message: str, exception: Exception) -> None: + def validate(subject: ApplicationMessages) -> None: + assert subject.type == "ERROR" + assert subject.message == message + assert subject.exception == exception + + observer = CustomObserver(callback=validate) + subject = ApplicationMessages() + subject.attach(observer) + + action = CustomAction() + action.exception("Uncaught message", exception) + action.subscribe_subject(subject) + action.exception(message, exception) + subject.detach(observer) + action.exception("Uncaught message", exception) diff --git a/tests/actions/test_initialize_configuration.py b/tests/actions/test_initialize_configuration.py new file mode 100644 index 0000000..709ad25 --- /dev/null +++ b/tests/actions/test_initialize_configuration.py @@ -0,0 +1,99 @@ +import boto3 +import botocore +import pytest + +from unittest.mock import patch, MagicMock +from botocore.stub import Stubber +from aws_iam_login.aws_config import AWSConfig +from aws_iam_login.actions.initialize_configuration import InitializeConfiguration + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "_AWSConfig__save_configuration") +@pytest.mark.parametrize( + "profile, username", + [ + ("my-profile-no-mfa-serial", "no-mfa-serial"), + ("my-profile-no-username", "no-username"), + ("my-profile-no-username-and-mfa-serial", "no-username-and-mfa-serial"), + ], +) +def test_successful_initialize( + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, +) -> None: + mock_client.return_value = botocore.session.get_session().create_client("sts") + + with Stubber(mock_client.return_value) as stubber: + stubber.add_response( + method="get_caller_identity", + service_response={ + "UserId": "AKIAI44QH8DHBEXAMPLE", + "Account": "111122223333", + "Arn": f"arn:aws:iam::111122223333:mfa/{username}", + }, + expected_params={}, + ) + + action = InitializeConfiguration(profile=profile) + assert action.execute() is True + assert mock_config.called is True + stubber.assert_no_pending_responses() + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "_AWSConfig__save_configuration") +@pytest.mark.parametrize( + "profile, username", + [ + ("my-profile", "johndoe"), + ("other-profile", "janedoe"), + ], +) +def test_initialize_already_initialized( + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, +) -> None: + mock_client.return_value = botocore.session.get_session().create_client("sts") + + with Stubber(mock_client.return_value) as stubber: + action = InitializeConfiguration(profile=profile) + assert action.execute() is True + assert mock_config.called is False + stubber.assert_no_pending_responses() + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "_AWSConfig__save_configuration") +@pytest.mark.parametrize( + "profile, username", + [ + ("my-profile-no-mfa-serial", "no-mfa-serial"), + ("my-profile-no-username", "no-username"), + ("my-profile-no-username-and-mfa-serial", "no-username-and-mfa-serial"), + ], +) +def test_initialize_get_caller_identity_failure( + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, +) -> None: + mock_client.return_value = botocore.session.get_session().create_client("sts") + + with Stubber(mock_client.return_value) as stubber: + stubber.add_client_error( + method="get_caller_identity", + service_error_code="AccessDenied", + service_message="Forbidden", + http_status_code=403, + ) + + action = InitializeConfiguration(profile=profile) + assert action.execute() is False + assert mock_config.called is False + stubber.assert_no_pending_responses() diff --git a/tests/actions/test_rotate_access_keys.py b/tests/actions/test_rotate_access_keys.py new file mode 100644 index 0000000..298419b --- /dev/null +++ b/tests/actions/test_rotate_access_keys.py @@ -0,0 +1,321 @@ +import time +from typing import List +import boto3 +import botocore +import pytest +from unittest.mock import patch, MagicMock +from botocore.stub import Stubber +from aws_iam_login.aws_config import AWSConfig +from aws_iam_login.actions.rotate_access_keys import RotateAccessKeys +from tests.test_key import generate_key_data + +TEST_SET_FIELDS = ",".join( + ["profile", "username", "current_access_key", "new_access_key"] +) +TEST_SET_DATA = [ + ("my-profile", "johndoe", "XXXXXXXXXXXXXXX1", "XXXXXXXXXXXXXXX2"), + ("other-profile", "janedoe", "XXXXXXXXXXXXXXX2", "XXXXXXXXXXXXXXX3"), +] + + +def catch_list_access_keys(username: str, keys: List[dict]) -> dict: + return { + "method": "list_access_keys", + "service_response": {"AccessKeyMetadata": keys}, + "expected_params": {"UserName": username}, + } + + +def catch_create_access_key(username: str, key: dict) -> dict: + return { + "method": "create_access_key", + "service_response": {"AccessKey": key}, + "expected_params": {"UserName": username}, + } + + +def catch_update_access_key(username: str, key_id: str) -> dict: + return { + "method": "update_access_key", + "service_response": {}, + "expected_params": { + "UserName": username, + "AccessKeyId": key_id, + "Status": "Inactive", + }, + } + + +def catch_delete_access_key(username: str, key_id: str) -> dict: + return { + "method": "delete_access_key", + "service_response": {}, + "expected_params": {"UserName": username, "AccessKeyId": key_id}, + } + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "write") +@patch.object(time, "sleep") +@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) +def test_successful_rotate( + mock_sleep: MagicMock, + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, + current_access_key: str, + new_access_key: str, +) -> None: + stubbed_client = botocore.session.get_session().create_client("iam") + mock_client.return_value = stubbed_client + + with Stubber(stubbed_client) as stubber: + stubber.add_response( + **catch_list_access_keys( + username=username, + keys=[ + generate_key_data(access_key=current_access_key, username=username) + ], + ) + ) + stubber.add_response( + **catch_create_access_key( + username=username, + key=generate_key_data( + access_key=new_access_key, username=username, include_secret=True + ), + ) + ) + stubber.add_response( + **catch_update_access_key(username=username, key_id=current_access_key) + ) + stubber.add_response( + **catch_delete_access_key(username=username, key_id=current_access_key) + ) + + action = RotateAccessKeys(profile=profile) + assert action.execute() is True + stubber.assert_no_pending_responses() + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "write") +@patch.object(time, "sleep") +@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) +def test_rotate_while_2_access_keys_are_available( + mock_sleep: MagicMock, + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, + current_access_key: str, + new_access_key: str, +) -> None: + stubbed_client = botocore.session.get_session().create_client("iam") + mock_client.return_value = stubbed_client + + with Stubber(stubbed_client) as stubber: + stubber.add_response( + **catch_list_access_keys( + username=username, + keys=[ + generate_key_data(access_key=current_access_key, username=username), + generate_key_data(access_key=new_access_key, username=username), + ], + ) + ) + + action = RotateAccessKeys(profile=profile) + assert action.execute() is False + stubber.assert_no_pending_responses() + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "write") +@patch.object(time, "sleep") +@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) +def test_rotate_new_key_creation_failed( + mock_sleep: MagicMock, + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, + current_access_key: str, + new_access_key: str, +) -> None: + stubbed_client = botocore.session.get_session().create_client("iam") + mock_client.return_value = stubbed_client + + with Stubber(stubbed_client) as stubber: + # Prepare client for API calls + stubber.add_response( + **catch_list_access_keys( + username=username, + keys=[ + generate_key_data(access_key=current_access_key, username=username) + ], + ) + ) + + stubber.add_client_error( + method="create_access_key", + service_error_code="AccessDenied", + service_message="Forbidden", + http_status_code=403, + ) + + action = RotateAccessKeys(profile=profile) + assert action.execute() is False + stubber.assert_no_pending_responses() + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "write") +@patch.object(time, "sleep") +@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) +def test_rotate_no_keys_available( + mock_sleep: MagicMock, + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, + current_access_key: str, + new_access_key: str, +) -> None: + stubbed_client = botocore.session.get_session().create_client("iam") + mock_client.return_value = stubbed_client + + with Stubber(stubbed_client) as stubber: + stubber.add_response( + **catch_list_access_keys( + username=username, + keys=[], + ) + ) + + action = RotateAccessKeys(profile=profile) + assert action.execute() is False + stubber.assert_no_pending_responses() + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "write") +@patch.object(time, "sleep") +@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) +def test_rotate_list_access_keys_fails( + mock_sleep: MagicMock, + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, + current_access_key: str, + new_access_key: str, +) -> None: + stubbed_client = botocore.session.get_session().create_client("iam") + mock_client.return_value = stubbed_client + + with Stubber(stubbed_client) as stubber: + stubber.add_client_error( + method="list_access_keys", + service_error_code="AccessDenied", + service_message="Forbidden", + http_status_code=403, + ) + + action = RotateAccessKeys(profile=profile) + assert action.execute() is False + stubber.assert_no_pending_responses() + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "write") +@patch.object(time, "sleep") +@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) +def test_deactivation_fails( + mock_sleep: MagicMock, + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, + current_access_key: str, + new_access_key: str, +) -> None: + stubbed_client = botocore.session.get_session().create_client("iam") + mock_client.return_value = stubbed_client + + with Stubber(stubbed_client) as stubber: + stubber.add_response( + **catch_list_access_keys( + username=username, + keys=[ + generate_key_data(access_key=current_access_key, username=username) + ], + ) + ) + stubber.add_response( + **catch_create_access_key( + username=username, + key=generate_key_data( + access_key=new_access_key, username=username, include_secret=True + ), + ) + ) + stubber.add_client_error( + method="update_access_key", + service_error_code="AccessDenied", + service_message="Forbidden", + http_status_code=403, + ) + + action = RotateAccessKeys(profile=profile) + assert action.execute() is False + stubber.assert_no_pending_responses() + + +@patch.object(boto3.Session, "client") +@patch.object(AWSConfig, "write") +@patch.object(time, "sleep") +@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) +def test_deletion_fails( + mock_sleep: MagicMock, + mock_config: MagicMock, + mock_client: MagicMock, + profile: str, + username: str, + current_access_key: str, + new_access_key: str, +) -> None: + stubbed_client = botocore.session.get_session().create_client("iam") + mock_client.return_value = stubbed_client + + with Stubber(stubbed_client) as stubber: + stubber.add_response( + **catch_list_access_keys( + username=username, + keys=[ + generate_key_data(access_key=current_access_key, username=username) + ], + ) + ) + stubber.add_response( + **catch_create_access_key( + username=username, + key=generate_key_data( + access_key=new_access_key, username=username, include_secret=True + ), + ) + ) + stubber.add_response( + **catch_update_access_key(username=username, key_id=current_access_key) + ) + stubber.add_client_error( + method="delete_access_key", + service_error_code="AccessDenied", + service_message="Forbidden", + http_status_code=403, + ) + + action = RotateAccessKeys(profile=profile) + assert action.execute() is False + stubber.assert_no_pending_responses() diff --git a/tests/credentials b/tests/credentials index aeb4ee2..d1f8451 100644 --- a/tests/credentials +++ b/tests/credentials @@ -2,12 +2,24 @@ aws_access_key_id = XXXXXXXXXXXXXXX1 aws_secret_access_key = XXXXXXXXXXXXXXXX mfa_serial = arn:aws:iam::111122223333:mfa/johndoe +username = johndoe [other-profile] aws_access_key_id = XXXXXXXXXXXXXXX2 aws_secret_access_key = XXXXXXXXXXXXXXXX mfa_serial = arn:aws:iam::111122223333:mfa/janedoe +username = janedoe [my-profile-no-mfa-serial] aws_access_key_id = XXXXXXXXXXXXXXX3 aws_secret_access_key = XXXXXXXXXXXXXXXX +username = no-mfa-serial + +[my-profile-no-username] +aws_access_key_id = XXXXXXXXXXXXXXX4 +aws_secret_access_key = XXXXXXXXXXXXXXXX +mfa_serial = arn:aws:iam::111122223333:mfa/no-username + +[my-profile-no-username-and-mfa-serial] +aws_access_key_id = XXXXXXXXXXXXXXX5 +aws_secret_access_key = XXXXXXXXXXXXXXXX diff --git a/tests/test_aws_config.py b/tests/test_aws_config.py index 5affd66..f3f6be5 100644 --- a/tests/test_aws_config.py +++ b/tests/test_aws_config.py @@ -90,3 +90,26 @@ def test_write_credentials( config.write(f"{profile}-sts", credentials) assert mock_file.called is True assert call().__enter__().write(f"[{profile}-sts]\n") in mock_file.mock_calls + + +@pytest.mark.parametrize( + "profile, username, mfa_serial", + [ + ("my-profile", "johndoe", "arn:aws:iam::111122223333:mfa/johndoe"), + ("other-profile", "janedoe", "arn:aws:iam::111122223333:mfa/janedoe"), + ], +) +def test_initialize(profile: str, username: str, mfa_serial: str) -> None: + config = AWSConfig(profile=profile) + + with patch("aws_iam_login.aws_config.open") as mock_file: + config.initialize(username=username, mfa_serial=mfa_serial) + assert mock_file.called is True + assert call().__enter__().write(f"[{profile}]\n") in mock_file.mock_calls + assert ( + call().__enter__().write(f"username = {username}\n") in mock_file.mock_calls + ) + assert ( + call().__enter__().write(f"mfa_serial = {mfa_serial}\n") + in mock_file.mock_calls + ) diff --git a/tests/test_cli.py b/tests/test_cli.py index 118bfee..ba1d5e2 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3,7 +3,7 @@ from unittest.mock import patch, MagicMock from botocore.exceptions import ParamValidationError, ClientError from click.testing import CliRunner -from aws_iam_login import main, RotateAccessKeys +from aws_iam_login import main, RotateAccessKeys, InitializeConfiguration @patch("aws_iam_login.AWSConfig") @@ -46,21 +46,35 @@ def test_exceptions(mock_session, mock_input, mock_config, exception) -> None: assert result.exit_code == 1 -def test_rotate_no_username() -> None: +def test_rotate_no_username_and_mfa_device() -> None: runner = CliRunner() - result = runner.invoke(main, ["my-profile", "rotate"]) - assert result.exit_code == 2 + result = runner.invoke(main, ["my-profile-no-mfa-serial", "rotate"]) + assert result.exit_code == 1 -@patch.object(RotateAccessKeys, "rotate", return_value=False) +@patch.object(RotateAccessKeys, "execute", return_value=False) def test_rotate_failure(mock_rotate: MagicMock) -> None: runner = CliRunner() - result = runner.invoke(main, ["my-profile", "rotate", "johndoe"]) + result = runner.invoke(main, ["my-profile", "rotate"]) assert result.exit_code == 1 -@patch.object(RotateAccessKeys, "rotate", return_value=True) +@patch.object(RotateAccessKeys, "execute", return_value=True) def test_rotate_success(mock_rotate: MagicMock) -> None: runner = CliRunner() - result = runner.invoke(main, ["my-profile", "rotate", "johndoe"]) + result = runner.invoke(main, ["my-profile", "rotate"]) + assert result.exit_code == 0 + + +@patch.object(InitializeConfiguration, "execute", return_value=False) +def test_init_failure(mock_rotate: MagicMock) -> None: + runner = CliRunner() + result = runner.invoke(main, ["my-profile", "init"]) + assert result.exit_code == 1 + + +@patch.object(InitializeConfiguration, "execute", return_value=True) +def test_init_success(mock_rotate: MagicMock) -> None: + runner = CliRunner() + result = runner.invoke(main, ["my-profile", "init"]) assert result.exit_code == 0 diff --git a/tests/test_rotate_access_keys.py b/tests/test_rotate_access_keys.py deleted file mode 100644 index 285820f..0000000 --- a/tests/test_rotate_access_keys.py +++ /dev/null @@ -1,237 +0,0 @@ -import time -import boto3 -import botocore -import pytest - -from unittest.mock import patch, MagicMock - -from botocore.stub import Stubber - -from aws_iam_login import RotateAccessKeys, AWSConfig -from tests.test_key import generate_key_data - -TEST_SET_FIELDS = ",".join( - ["profile", "username", "current_access_key", "new_access_key"] -) -TEST_SET_DATA = [ - ("my-profile", "johndoe", "XXXXXXXXXXXXXXX1", "XXXXXXXXXXXXXXX2"), - ("other-profile", "janedoe", "XXXXXXXXXXXXXXX2", "XXXXXXXXXXXXXXX3"), -] - - -@patch.object(boto3.Session, "client") -@patch.object(AWSConfig, "write") -@patch.object(time, "sleep") -@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) -def test_successful_rotate( - mock_sleep: MagicMock, - mock_config: MagicMock, - mock_client: MagicMock, - profile: str, - username: str, - current_access_key: str, - new_access_key: str, -) -> None: - stubbed_client = botocore.session.get_session().create_client("iam") - mock_client.return_value = stubbed_client - - with Stubber(stubbed_client) as stubber: - # Prepare client for API calls - stubber.add_response( - "list_access_keys", - { - "AccessKeyMetadata": [ - generate_key_data(access_key=current_access_key, username=username) - ] - }, - {"UserName": username}, - ) - stubber.add_response( - "create_access_key", - { - "AccessKey": generate_key_data( - access_key=new_access_key, username=username, include_secret=True - ) - }, - {"UserName": username}, - ) - stubber.add_response( - "update_access_key", - {}, - { - "UserName": username, - "AccessKeyId": current_access_key, - "Status": "Inactive", - }, - ) - stubber.add_response( - "delete_access_key", - {}, - {"UserName": username, "AccessKeyId": current_access_key}, - ) - - action = RotateAccessKeys( - profile=profile, username=username, subject=MagicMock() - ) - assert action.rotate() is True - stubber.assert_no_pending_responses() - - -@patch.object(boto3.Session, "client") -@patch.object(AWSConfig, "write") -@patch.object(time, "sleep") -@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) -def test_rotate_while_2_access_keys_are_available( - mock_sleep: MagicMock, - mock_config: MagicMock, - mock_client: MagicMock, - profile: str, - username: str, - current_access_key: str, - new_access_key: str, -) -> None: - stubbed_client = botocore.session.get_session().create_client("iam") - mock_client.return_value = stubbed_client - - with Stubber(stubbed_client) as stubber: - # Prepare client for API calls - stubber.add_response( - method="list_access_keys", - service_response={ - "AccessKeyMetadata": [ - generate_key_data(access_key=current_access_key, username=username), - generate_key_data(access_key=new_access_key, username=username), - ] - }, - expected_params={"UserName": username}, - ) - - action = RotateAccessKeys( - profile=profile, username=username, subject=MagicMock() - ) - assert action.rotate() is False - stubber.assert_no_pending_responses() - - -@patch.object(boto3.Session, "client") -@patch.object(AWSConfig, "write") -@patch.object(time, "sleep") -@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) -def test_rotate_new_key_creation_failed( - mock_sleep: MagicMock, - mock_config: MagicMock, - mock_client: MagicMock, - profile: str, - username: str, - current_access_key: str, - new_access_key: str, -) -> None: - stubbed_client = botocore.session.get_session().create_client("iam") - mock_client.return_value = stubbed_client - - with Stubber(stubbed_client) as stubber: - # Prepare client for API calls - stubber.add_response( - method="list_access_keys", - service_response={ - "AccessKeyMetadata": [ - generate_key_data(access_key=current_access_key, username=username) - ] - }, - expected_params={"UserName": username}, - ) - - stubber.add_client_error( - method="create_access_key", - service_error_code="AccessDenied", - service_message="Forbidden", - http_status_code=403, - ) - - action = RotateAccessKeys( - profile=profile, username=username, subject=MagicMock() - ) - assert action.rotate() is False - stubber.assert_no_pending_responses() - - -@patch.object(boto3.Session, "client") -@patch.object(AWSConfig, "write") -@patch.object(time, "sleep") -@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) -def test_rotate_no_keys_available( - mock_sleep: MagicMock, - mock_config: MagicMock, - mock_client: MagicMock, - profile: str, - username: str, - current_access_key: str, - new_access_key: str, -) -> None: - stubbed_client = botocore.session.get_session().create_client("iam") - mock_client.return_value = stubbed_client - - with Stubber(stubbed_client) as stubber: - # Prepare client for API calls - stubber.add_response( - method="list_access_keys", - service_response={"AccessKeyMetadata": []}, - expected_params={"UserName": username}, - ) - - action = RotateAccessKeys( - profile=profile, username=username, subject=MagicMock() - ) - assert action.rotate() is False - stubber.assert_no_pending_responses() - - -@patch.object(boto3.Session, "client") -@patch.object(AWSConfig, "write") -@patch.object(time, "sleep") -@pytest.mark.parametrize(TEST_SET_FIELDS, TEST_SET_DATA) -def test_deactivation_fails( - mock_sleep: MagicMock, - mock_config: MagicMock, - mock_client: MagicMock, - profile: str, - username: str, - current_access_key: str, - new_access_key: str, -) -> None: - stubbed_client = botocore.session.get_session().create_client("iam") - mock_client.return_value = stubbed_client - - with Stubber(stubbed_client) as stubber: - # Prepare client for API calls - stubber.add_response( - "list_access_keys", - { - "AccessKeyMetadata": [ - generate_key_data(access_key=current_access_key, username=username) - ] - }, - {"UserName": username}, - ) - stubber.add_response( - "create_access_key", - { - "AccessKey": generate_key_data( - access_key=new_access_key, username=username, include_secret=True - ) - }, - {"UserName": username}, - ) - - stubber.add_client_error( - method="update_access_key", - service_error_code="AccessDenied", - service_message="Forbidden", - http_status_code=403, - ) - - action = RotateAccessKeys( - profile=profile, username=username, subject=MagicMock() - ) - assert action.rotate() is False - stubber.assert_no_pending_responses()