Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support initialization of configuration #4

Merged
merged 1 commit into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@ stored in the `~/.aws/credentials` file for re-use.
## Configuration

You will need to configure your roles and IAM User credentials in the same places as you are used to. So in your
`~/.aws/credentials` file you will need to have the following:
`~/.aws/credentials` file. To make this process as easy as possible you could use the following command:

```bash
aws-iam-login my-profile init
```

This command will fetch the ARN of the caller identity. Based on this identity we will determin the `username` and
`mfa_serial` of the IAM User. These will then be stored in the `~/.aws/credentials` file. For example:

```ini
[my-profile]
aws_access_key_id = XXXXXXX
aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXX
mfa_serial = arn:aws:iam::111122223333:mfa/my-iam-user
username = my-iam-user
```

The only addition is the `mfa_serial` field.
The only addition is the `username` and `mfa_serial` fields.

### AWS Least privileged

Expand Down
31 changes: 24 additions & 7 deletions aws_iam_login/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from botocore.exceptions import ParamValidationError, ClientError
from click import Context

from aws_iam_login.actions.initialize_configuration import InitializeConfiguration
from aws_iam_login.observer import Observer
from .aws_config import AWSConfig
from .rotate_access_keys import RotateAccessKeys
from aws_iam_login.actions.rotate_access_keys import RotateAccessKeys
from .credentials import Credentials
from .application_context import ApplicationContext, ApplicationMessages

Expand Down Expand Up @@ -46,19 +47,35 @@ def credentials(ctx: ApplicationContext) -> None:


@main.command()
@click.argument("username")
@click.pass_obj
def rotate(ctx: ApplicationContext, username: str) -> None:
def init(ctx: ApplicationContext) -> None:
"""
Initialize your `.aws/configuration`
"""
click.echo(f"Looking up additional required data...")

action = InitializeConfiguration(profile=ctx.profile)
action.subscribe_subject(ctx.subject)

if not action.execute():
click.echo(f"Failed to initialize the {ctx.profile} profile")
exit(1)

click.echo(f"The {ctx.profile} profile has been successfully initialized!")


@main.command()
@click.pass_obj
def rotate(ctx: ApplicationContext) -> None:
"""
Rotate your IAM User credentials
"""
click.echo(f"Key rotation process in progress")

action = RotateAccessKeys(
profile=ctx.profile, username=username, subject=ctx.subject
)
action = RotateAccessKeys(profile=ctx.profile)
action.subscribe_subject(ctx.subject)

if not action.rotate():
if not action.execute():
click.echo(f"Failed to rotate the access keys for the {ctx.profile} profile")
exit(1)

Expand Down
8 changes: 0 additions & 8 deletions aws_iam_login/access_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ def __iter__(self):
yield "aws_access_key_id", self.access_key
yield "aws_secret_access_key", self.secret_access_key

@property
def profile(self) -> str:
return str(self.__raw_data.get("Profile", ""))

@profile.setter
def profile(self, value: str) -> None:
self.__raw_data["Profile"] = value

@property
def username(self) -> str:
return str(self.__raw_data.get("UserName", ""))
Expand Down
Empty file.
36 changes: 36 additions & 0 deletions aws_iam_login/actions/action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Optional
from abc import ABC, abstractmethod
from aws_iam_login.application_context import ApplicationMessages


class Action(ABC):
"""
Understands generic actions that are executed
"""

__subject: Optional[ApplicationMessages] = None

def subscribe_subject(self, subject: ApplicationMessages):
self.__subject = subject

def info(self, message: str) -> None:
if self.__subject:
self.__subject.send(message_type="INFO", message=message)

def warning(self, message: str) -> None:
if self.__subject:
self.__subject.send(message_type="WARN", message=message)

def error(self, message: str) -> None:
if self.__subject:
self.__subject.send(message_type="ERROR", message=message)

def exception(self, message: str, exception: Exception) -> None:
if self.__subject:
self.__subject.send(
message_type="ERROR", message=message, exception=exception
)

@abstractmethod
def execute(self) -> bool:
pass
41 changes: 41 additions & 0 deletions aws_iam_login/actions/initialize_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Optional
import boto3

from aws_iam_login.actions.action import Action
from aws_iam_login.aws_config import AWSConfig


class InitializeConfiguration(Action):
"""
Understands how to configure the AWS profiles to be used by aws-iam-login.
"""

__cached_client: Optional[boto3.Session.client] = None

def __init__(self, profile: str) -> None:
self.__profile = profile

self.__config = AWSConfig(profile)
self.__client = boto3.Session(profile_name=profile).client("sts")

def execute(self) -> bool:
if self.__config.mfa_serial and self.__config.username:
self.info(f"The {self.__profile} profile is already initialized.")
return True

try:
data = self.__client.get_caller_identity()
mfa_serial = data["Arn"].replace(":user/", ":mfa/")
username = mfa_serial.split("/")[-1]
self.info(f"Determined MFA ARN: {mfa_serial}")
self.info(f"Determined IAM Username: {username}")
self.__config.initialize(username=username, mfa_serial=mfa_serial)

return True
except Exception as exc:
message = (
f"Failed to get the caller identity for the {self.__profile} profile."
)
self.exception(message=message, exception=exc)

return False
102 changes: 102 additions & 0 deletions aws_iam_login/actions/rotate_access_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from typing import Optional, List
import time
import boto3
from aws_iam_login import AWSConfig
from aws_iam_login.actions.action import Action
from aws_iam_login.access_key import AccessKey
from functools import lru_cache


class RotateAccessKeys(Action):
"""
Understands how to rotate access keys
"""

__cached_client: Optional[boto3.Session.client] = None

def __init__(self, profile: str) -> None:
self.__profile = profile
self.__config = AWSConfig(profile)

@lru_cache()
def new_key(self) -> Optional[AccessKey]:
try:
response = self.__client.create_access_key(UserName=self.__config.username)
self.info(
message="Sleep for 10 seconds to make sure the credentials are active."
)
time.sleep(10)
return AccessKey(response["AccessKey"])
except Exception as exc:
message = f"Failed to create a new access key for the user: {self.__config.username}"
self.exception(message=message, exception=exc)

return None

def execute(self) -> bool:
if not self.__rotation_possible():
return False

new_key = self.new_key()

if self.__config.key and new_key:
try:
self.__flush_client()
self.__disable_key(self.__config.key)
self.__config.write(f"{self.__profile}", new_key)
self.__delete_key(self.__config.key)

return True
except Exception as exc:
message = f"Failed to rotate the credentials for the user: {self.__config.username}"
self.exception(message=message, exception=exc)

return False

def __flush_client(self) -> None:
self.__cached_client = None

@property
def __client(self) -> boto3.Session.client:
if not self.__cached_client:
session = boto3.Session(profile_name=self.__profile)
self.__cached_client = session.client("iam")

return self.__cached_client

def __get_current_keys(self) -> List[dict]:
try:
response = self.__client.list_access_keys(UserName=self.__config.username)
return response["AccessKeyMetadata"]
except Exception as exc:
message = (
f"Failed to list access keys for the user: {self.__config.username}"
)
self.exception(message=message, exception=exc)

return []

def __rotation_possible(self) -> bool:
if not self.__config.valid:
self.warning(
f"The configuration for the {self.__profile} is invalid, please try `aws-iam-login {self.__profile} init` first!"
)
return False

if len(self.__get_current_keys()) != 1:
self.error(f"There needs to be only 1 AccessKey present!")
return False

return True

def __disable_key(self, key: AccessKey) -> None:
self.__client.update_access_key(
UserName=self.__config.username,
AccessKeyId=key.access_key,
Status="Inactive",
)

def __delete_key(self, key: AccessKey) -> None:
self.__client.delete_access_key(
UserName=self.__config.username, AccessKeyId=key.access_key
)
33 changes: 22 additions & 11 deletions aws_iam_login/aws_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ def __init__(self, profile: str) -> None:

@property
def valid(self) -> bool:
valid = bool(
return bool(
self.__profile_name
and self.__profile_name in self.__config
and self.mfa_serial
and self.username
)
return valid

@property
def __config(self) -> configparser.ConfigParser:
Expand All @@ -41,8 +41,12 @@ def __profile(self) -> configparser.SectionProxy:
return self.__config[self.__profile_name]

@property
def mfa_serial(self) -> Optional[str]:
return self.__profile.get("mfa_serial", None)
def mfa_serial(self) -> str:
return self.__profile.get("mfa_serial", "")

@property
def username(self) -> str:
return self.__profile.get("username", "")

@property
def key(self) -> AccessKey:
Expand All @@ -58,18 +62,25 @@ def key(self) -> AccessKey:

return self.__cached_key

def initialize(self, username: str, mfa_serial: str) -> None:
self.__profile["username"] = username
self.__profile["mfa_serial"] = mfa_serial
self.__save_configuration()

def __save_configuration(self):
with open(self.__credential_file, "w") as fh:
self.__config.write(fh)

def write(
self, profile: str, credentials: Union[None, Credentials, AccessKey]
) -> None:
if not credentials:
return

data = dict(credentials)

if self.mfa_serial:
data["mfa_serial"] = self.mfa_serial
for key, value in dict(credentials).items():
if profile not in self.__config:
self.__config[profile] = {}

self.__config[profile] = data
self.__config[profile][key] = value

with open(self.__credential_file, "w") as fh:
self.__config.write(fh)
self.__save_configuration()
Loading