-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #4 from Nr18/feat/initialize
feat: support initialization of configuration
- Loading branch information
Showing
20 changed files
with
853 additions
and
484 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from typing import Optional | ||
from abc import ABC, abstractmethod | ||
from aws_iam_login.application_context import ApplicationMessages | ||
|
||
|
||
class Action(ABC): | ||
""" | ||
Understands generic actions that are executed | ||
""" | ||
|
||
__subject: Optional[ApplicationMessages] = None | ||
|
||
def subscribe_subject(self, subject: ApplicationMessages): | ||
self.__subject = subject | ||
|
||
def info(self, message: str) -> None: | ||
if self.__subject: | ||
self.__subject.send(message_type="INFO", message=message) | ||
|
||
def warning(self, message: str) -> None: | ||
if self.__subject: | ||
self.__subject.send(message_type="WARN", message=message) | ||
|
||
def error(self, message: str) -> None: | ||
if self.__subject: | ||
self.__subject.send(message_type="ERROR", message=message) | ||
|
||
def exception(self, message: str, exception: Exception) -> None: | ||
if self.__subject: | ||
self.__subject.send( | ||
message_type="ERROR", message=message, exception=exception | ||
) | ||
|
||
@abstractmethod | ||
def execute(self) -> bool: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from typing import Optional | ||
import boto3 | ||
|
||
from aws_iam_login.actions.action import Action | ||
from aws_iam_login.aws_config import AWSConfig | ||
|
||
|
||
class InitializeConfiguration(Action): | ||
""" | ||
Understands how to configure the AWS profiles to be used by aws-iam-login. | ||
""" | ||
|
||
__cached_client: Optional[boto3.Session.client] = None | ||
|
||
def __init__(self, profile: str) -> None: | ||
self.__profile = profile | ||
|
||
self.__config = AWSConfig(profile) | ||
self.__client = boto3.Session(profile_name=profile).client("sts") | ||
|
||
def execute(self) -> bool: | ||
if self.__config.mfa_serial and self.__config.username: | ||
self.info(f"The {self.__profile} profile is already initialized.") | ||
return True | ||
|
||
try: | ||
data = self.__client.get_caller_identity() | ||
mfa_serial = data["Arn"].replace(":user/", ":mfa/") | ||
username = mfa_serial.split("/")[-1] | ||
self.info(f"Determined MFA ARN: {mfa_serial}") | ||
self.info(f"Determined IAM Username: {username}") | ||
self.__config.initialize(username=username, mfa_serial=mfa_serial) | ||
|
||
return True | ||
except Exception as exc: | ||
message = ( | ||
f"Failed to get the caller identity for the {self.__profile} profile." | ||
) | ||
self.exception(message=message, exception=exc) | ||
|
||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from typing import Optional, List | ||
import time | ||
import boto3 | ||
from aws_iam_login import AWSConfig | ||
from aws_iam_login.actions.action import Action | ||
from aws_iam_login.access_key import AccessKey | ||
from functools import lru_cache | ||
|
||
|
||
class RotateAccessKeys(Action): | ||
""" | ||
Understands how to rotate access keys | ||
""" | ||
|
||
__cached_client: Optional[boto3.Session.client] = None | ||
|
||
def __init__(self, profile: str) -> None: | ||
self.__profile = profile | ||
self.__config = AWSConfig(profile) | ||
|
||
@lru_cache() | ||
def new_key(self) -> Optional[AccessKey]: | ||
try: | ||
response = self.__client.create_access_key(UserName=self.__config.username) | ||
self.info( | ||
message="Sleep for 10 seconds to make sure the credentials are active." | ||
) | ||
time.sleep(10) | ||
return AccessKey(response["AccessKey"]) | ||
except Exception as exc: | ||
message = f"Failed to create a new access key for the user: {self.__config.username}" | ||
self.exception(message=message, exception=exc) | ||
|
||
return None | ||
|
||
def execute(self) -> bool: | ||
if not self.__rotation_possible(): | ||
return False | ||
|
||
new_key = self.new_key() | ||
|
||
if self.__config.key and new_key: | ||
try: | ||
self.__flush_client() | ||
self.__disable_key(self.__config.key) | ||
self.__config.write(f"{self.__profile}", new_key) | ||
self.__delete_key(self.__config.key) | ||
|
||
return True | ||
except Exception as exc: | ||
message = f"Failed to rotate the credentials for the user: {self.__config.username}" | ||
self.exception(message=message, exception=exc) | ||
|
||
return False | ||
|
||
def __flush_client(self) -> None: | ||
self.__cached_client = None | ||
|
||
@property | ||
def __client(self) -> boto3.Session.client: | ||
if not self.__cached_client: | ||
session = boto3.Session(profile_name=self.__profile) | ||
self.__cached_client = session.client("iam") | ||
|
||
return self.__cached_client | ||
|
||
def __get_current_keys(self) -> List[dict]: | ||
try: | ||
response = self.__client.list_access_keys(UserName=self.__config.username) | ||
return response["AccessKeyMetadata"] | ||
except Exception as exc: | ||
message = ( | ||
f"Failed to list access keys for the user: {self.__config.username}" | ||
) | ||
self.exception(message=message, exception=exc) | ||
|
||
return [] | ||
|
||
def __rotation_possible(self) -> bool: | ||
if not self.__config.valid: | ||
self.warning( | ||
f"The configuration for the {self.__profile} is invalid, please try `aws-iam-login {self.__profile} init` first!" | ||
) | ||
return False | ||
|
||
if len(self.__get_current_keys()) != 1: | ||
self.error(f"There needs to be only 1 AccessKey present!") | ||
return False | ||
|
||
return True | ||
|
||
def __disable_key(self, key: AccessKey) -> None: | ||
self.__client.update_access_key( | ||
UserName=self.__config.username, | ||
AccessKeyId=key.access_key, | ||
Status="Inactive", | ||
) | ||
|
||
def __delete_key(self, key: AccessKey) -> None: | ||
self.__client.delete_access_key( | ||
UserName=self.__config.username, AccessKeyId=key.access_key | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.