From e197fa8b92125ed0081ad1e74e2518612c4852ae Mon Sep 17 00:00:00 2001 From: gruebel Date: Wed, 24 Jul 2024 21:02:05 +0200 Subject: [PATCH] leverage new type syntax --- cloudsplaining/__init__.py | 8 +-- cloudsplaining/command/download.py | 8 +-- cloudsplaining/command/scan.py | 18 +++--- cloudsplaining/command/scan_multi_account.py | 30 +++++----- cloudsplaining/command/scan_policy_file.py | 12 ++-- cloudsplaining/output/policy_finding.py | 20 +++---- cloudsplaining/output/report.py | 6 +- .../scan/assume_role_policy_document.py | 10 ++-- cloudsplaining/scan/authorization_details.py | 14 ++--- cloudsplaining/scan/group_details.py | 52 ++++++++--------- cloudsplaining/scan/inline_policy.py | 22 ++++---- cloudsplaining/scan/managed_policy_detail.py | 40 ++++++------- cloudsplaining/scan/policy_document.py | 38 +++++++------ cloudsplaining/scan/role_details.py | 50 ++++++++--------- cloudsplaining/scan/statement_detail.py | 32 ++++++----- cloudsplaining/scan/user_details.py | 56 +++++++++---------- cloudsplaining/shared/aws_login.py | 14 ++--- cloudsplaining/shared/exclusions.py | 23 ++++---- cloudsplaining/shared/utils.py | 16 +++--- cloudsplaining/shared/validation.py | 10 ++-- pyproject.toml | 3 +- 21 files changed, 247 insertions(+), 235 deletions(-) diff --git a/cloudsplaining/__init__.py b/cloudsplaining/__init__.py index e65836f4..f67989bf 100644 --- a/cloudsplaining/__init__.py +++ b/cloudsplaining/__init__.py @@ -1,4 +1,6 @@ # pylint: disable=missing-module-docstring +from __future__ import annotations + import logging import sys @@ -8,8 +10,6 @@ # Uncomment to get the full debug logs. # 2020-10-06 10:04:17,200 - root - DEBUG - Leveraging the bundled IAM Definition. # Need to figure out how to get click_log to do this for me. -from typing import Optional, Union - logger = logging.getLogger(__name__) logger.setLevel(logging.WARNING) handler = logging.StreamHandler(sys.stdout) @@ -22,7 +22,7 @@ name = "cloudsplaining" # pylint: disable=invalid-name -def change_log_level(log_level: Union[int, str]) -> None: +def change_log_level(log_level: int | str) -> None: """ "Change log level of module logger""" logger.setLevel(log_level) @@ -31,7 +31,7 @@ def change_log_level(log_level: Union[int, str]) -> None: def set_stream_logger( name: str = "cloudsplaining", level: int = logging.DEBUG, - format_string: Optional[str] = None, + format_string: str | None = None, ) -> None: """ Add a stream handler for the given name and level to the logging module. diff --git a/cloudsplaining/command/download.py b/cloudsplaining/command/download.py index f1ceee82..9daf6561 100644 --- a/cloudsplaining/command/download.py +++ b/cloudsplaining/command/download.py @@ -11,7 +11,7 @@ import json import logging import os -from typing import TYPE_CHECKING, Any, Dict, List +from typing import TYPE_CHECKING, Any import boto3 import click @@ -76,14 +76,14 @@ def download(profile: str, output: str, include_non_default_policy_versions: boo def get_account_authorization_details( - session_data: Dict[str, str], include_non_default_policy_versions: bool -) -> Dict[str, List[Any]]: + session_data: dict[str, str], include_non_default_policy_versions: bool +) -> dict[str, list[Any]]: """Runs aws-iam-get-account-authorization-details""" session = boto3.Session(**session_data) # type:ignore[arg-type] config = Config(connect_timeout=5, retries={"max_attempts": 10}) iam_client: IAMClient = session.client("iam", config=config) - results: Dict[str, List[Any]] = { + results: dict[str, list[Any]] = { "UserDetailList": [], "GroupDetailList": [], "RoleDetailList": [], diff --git a/cloudsplaining/command/scan.py b/cloudsplaining/command/scan.py index 0807bde8..e11f17f2 100644 --- a/cloudsplaining/command/scan.py +++ b/cloudsplaining/command/scan.py @@ -14,7 +14,7 @@ import os import webbrowser from pathlib import Path -from typing import Any, Dict, List, Literal, cast, overload +from typing import Any, Literal, cast, overload import click import yaml @@ -104,7 +104,7 @@ def scan( minimize: bool, flag_all_risky_actions: bool, verbosity: int, - severity: List[str], + severity: list[str], ) -> None: # pragma: no cover """ Given the path to account authorization details files and the exclusions config file, scan all inline and @@ -198,7 +198,7 @@ def scan( @overload def scan_account_authorization_details( - account_authorization_details_cfg: Dict[str, Any], + account_authorization_details_cfg: dict[str, Any], exclusions: Exclusions, account_name: str, output_directory: str, @@ -207,13 +207,13 @@ def scan_account_authorization_details( return_json_results: Literal[True], flag_conditional_statements: bool = ..., flag_resource_arn_statements: bool = ..., - severity: List[str] | None = ..., + severity: list[str] | None = ..., ) -> dict[str, Any]: ... @overload def scan_account_authorization_details( - account_authorization_details_cfg: Dict[str, Any], + account_authorization_details_cfg: dict[str, Any], exclusions: Exclusions, account_name: str = ..., output_directory: str = ..., @@ -222,12 +222,12 @@ def scan_account_authorization_details( return_json_results: Literal[False] = ..., flag_conditional_statements: bool = ..., flag_resource_arn_statements: bool = ..., - severity: List[str] | None = ..., + severity: list[str] | None = ..., ) -> str: ... def scan_account_authorization_details( - account_authorization_details_cfg: Dict[str, Any], + account_authorization_details_cfg: dict[str, Any], exclusions: Exclusions, account_name: str = "default", output_directory: str = os.getcwd(), @@ -236,7 +236,7 @@ def scan_account_authorization_details( return_json_results: bool = False, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> str | dict[str, Any]: # pragma: no cover """ Given the path to account authorization details files and the exclusions config file, scan all inline and @@ -294,7 +294,7 @@ def scan_account_authorization_details( def get_authorization_files_in_directory( directory: str, -) -> List[str]: # pragma: no cover +) -> list[str]: # pragma: no cover """Get a list of download-account-authorization-files in a directory""" file_list_with_full_path = [file.absolute() for file in Path(directory).glob("*.json")] diff --git a/cloudsplaining/command/scan_multi_account.py b/cloudsplaining/command/scan_multi_account.py index a7ae6321..0d7ebe7d 100644 --- a/cloudsplaining/command/scan_multi_account.py +++ b/cloudsplaining/command/scan_multi_account.py @@ -5,7 +5,7 @@ import json import logging import os -from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast +from typing import TYPE_CHECKING, Any, cast import click import yaml @@ -32,14 +32,14 @@ class MultiAccountConfig: """Handle the YAML file that parses the Multi-account config""" - def __init__(self, config: Dict[str, Any], role_name: str) -> None: + def __init__(self, config: dict[str, Any], role_name: str) -> None: # self.config_file = config_file self.config = config self.role_name = role_name self.accounts = self._accounts() - def _accounts(self) -> Dict[str, str]: - accounts: Dict[str, str] = self.config.get("accounts", None) + def _accounts(self) -> dict[str, str]: + accounts: dict[str, str] | None = self.config.get("accounts") if not accounts: raise Exception("Please supply a list of accounts in the multi-account config file") return accounts @@ -130,7 +130,7 @@ def scan_multi_account( write_data_file: bool, flag_all_risky_actions: bool, verbosity: int, - severity: List[str], + severity: list[str], ) -> None: """Scan multiple accounts via AssumeRole""" set_log_level(verbosity) @@ -168,10 +168,10 @@ def scan_accounts( exclusions: Exclusions, role_name: str, write_data_file: bool, - profile: Optional[str] = None, - output_directory: Optional[str] = None, - output_bucket: Optional[str] = None, - severity: List[str] | None = None, + profile: str | None = None, + output_directory: str | None = None, + output_bucket: str | None = None, + severity: list[str] | None = None, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, ) -> None: @@ -229,11 +229,11 @@ def scan_account( target_account_id: str, target_role_name: str, exclusions: Exclusions, - profile: Optional[str] = None, - severity: List[str] | None = None, + profile: str | None = None, + severity: list[str] | None = None, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, -) -> Dict[str, Dict[str, Any]]: +) -> dict[str, dict[str, Any]]: """Scan a target account in one shot""" account_authorization_details = download_account_authorization_details( target_account_id=target_account_id, @@ -253,8 +253,8 @@ def scan_account( def download_account_authorization_details( - target_account_id: str, target_role_name: str, profile: Optional[str] = None -) -> Dict[str, List[Dict[str, Any]]]: + target_account_id: str, target_role_name: str, profile: str | None = None +) -> dict[str, list[dict[str, Any]]]: """Download the account authorization details from a target account""" ( aws_access_key_id, @@ -276,7 +276,7 @@ def download_account_authorization_details( return authorization_details -def get_exclusions(exclusions_file: Optional[str] = None) -> Exclusions: +def get_exclusions(exclusions_file: str | None = None) -> Exclusions: """Get the exclusions configuration from a file""" # Get the exclusions configuration if exclusions_file: diff --git a/cloudsplaining/command/scan_policy_file.py b/cloudsplaining/command/scan_policy_file.py index 14519458..4aa7ebc2 100644 --- a/cloudsplaining/command/scan_policy_file.py +++ b/cloudsplaining/command/scan_policy_file.py @@ -12,7 +12,7 @@ import json import logging import sys -from typing import Any, Dict, List, cast +from typing import Any, cast import click import yaml @@ -70,7 +70,7 @@ def scan_policy_file( high_priority_only: bool, flag_all_risky_actions: bool, verbosity: int, - severity: List[str], + severity: list[str], ) -> None: # pragma: no cover """Scan a single policy file to identify missing resource constraints.""" set_log_level(verbosity) @@ -162,12 +162,12 @@ def scan_policy_file( def scan_policy( - policy_json: Dict[str, Any], - exclusions_config: Dict[str, List[str]] = DEFAULT_EXCLUSIONS_CONFIG, + policy_json: dict[str, Any], + exclusions_config: dict[str, list[str]] = DEFAULT_EXCLUSIONS_CONFIG, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, -) -> Dict[str, Dict[str, Any]]: + severity: list[str] | None = None, +) -> dict[str, dict[str, Any]]: """ Scan a policy document for missing resource constraints. diff --git a/cloudsplaining/output/policy_finding.py b/cloudsplaining/output/policy_finding.py index f273c77d..3a7f733d 100644 --- a/cloudsplaining/output/policy_finding.py +++ b/cloudsplaining/output/policy_finding.py @@ -8,7 +8,7 @@ from __future__ import annotations import logging -from typing import Any, Dict, List +from typing import Any from cloudsplaining.scan.policy_document import PolicyDocument from cloudsplaining.shared.constants import ( @@ -33,7 +33,7 @@ def __init__( self, policy_document: PolicyDocument, exclusions: Exclusions = DEFAULT_EXCLUSIONS, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: """ Supply a PolicyDocument object and Exclusions object to get a single policy finding @@ -47,7 +47,7 @@ def __init__( self.missing_resource_constraints_for_modify_actions = self._missing_resource_constraints_for_modify_actions() self.severity = [] if severity is None else severity - def _missing_resource_constraints_for_modify_actions(self) -> List[str]: + def _missing_resource_constraints_for_modify_actions(self) -> list[str]: """Find modify actions that lack resource ARN constraints""" actions_missing_resource_constraints = set() for statement in self.policy_document.statements: @@ -59,7 +59,7 @@ def _missing_resource_constraints_for_modify_actions(self) -> List[str]: return sorted(actions_missing_resource_constraints) @property - def services_affected(self) -> List[str]: + def services_affected(self) -> list[str]: """Return a list of AWS service prefixes affected by the policy in question.""" services_affected = set() for action in self.missing_resource_constraints_for_modify_actions: @@ -78,7 +78,7 @@ def services_affected(self) -> List[str]: return sorted(services_affected) @property - def resource_exposure(self) -> List[str]: + def resource_exposure(self) -> list[str]: """Return a list of actions that could cause resource exposure via actions at the 'Permissions management' access level, if applicable.""" if self.always_exclude_actions: @@ -92,12 +92,12 @@ def resource_exposure(self) -> List[str]: return self.policy_document.permissions_management_without_constraints @property - def privilege_escalation(self) -> List[Dict[str, Any]]: + def privilege_escalation(self) -> list[dict[str, Any]]: """Returns privilege escalation action combinations in the policy, if present""" return self.policy_document.allows_privilege_escalation @property - def data_exfiltration(self) -> List[str]: + def data_exfiltration(self) -> list[str]: """Returns data exfiltration actions in the policy, if present""" result = [ action @@ -109,12 +109,12 @@ def data_exfiltration(self) -> List[str]: return result @property - def service_wildcard(self) -> List[str]: + def service_wildcard(self) -> list[str]: """Determine if the policy gives access to all actions within a service - simple grepping""" return self.policy_document.service_wildcard @property - def credentials_exposure(self) -> List[str]: + def credentials_exposure(self) -> list[str]: """Determine if the action returns credentials""" # https://gist.github.com/kmcquade/33860a617e651104d243c324ddf7992a results = [ @@ -127,7 +127,7 @@ def credentials_exposure(self) -> List[str]: return results @property - def results(self) -> Dict[str, Any]: + def results(self) -> dict[str, Any]: """Return the results as JSON""" findings = dict( ServiceWildcard={ diff --git a/cloudsplaining/output/report.py b/cloudsplaining/output/report.py index ad55b41a..e358d033 100644 --- a/cloudsplaining/output/report.py +++ b/cloudsplaining/output/report.py @@ -1,10 +1,12 @@ """Creates the HTML Reports""" +from __future__ import annotations + import datetime import json import os.path from pathlib import Path -from typing import Any, Dict +from typing import Any from jinja2 import Environment, FileSystemLoader @@ -20,7 +22,7 @@ def __init__( self, account_id: str, account_name: str, - results: Dict[str, Dict[str, Any]], + results: dict[str, dict[str, Any]], minimize: bool = False, ) -> None: self.account_name = account_name diff --git a/cloudsplaining/scan/assume_role_policy_document.py b/cloudsplaining/scan/assume_role_policy_document.py index 5805c32c..2a2b08ec 100644 --- a/cloudsplaining/scan/assume_role_policy_document.py +++ b/cloudsplaining/scan/assume_role_policy_document.py @@ -8,7 +8,7 @@ from __future__ import annotations import logging -from typing import Any, Dict, List +from typing import Any from cloudsplaining.scan.resource_policy_document import ( ResourcePolicyDocument, @@ -38,7 +38,7 @@ def __init__(self, policy: dict[str, Any]) -> None: self.statements.append(AssumeRoleStatement(statement)) @property - def role_assumable_by_compute_services(self) -> List[str]: + def role_assumable_by_compute_services(self) -> list[str]: """Determines whether or not the role is assumed from a compute service, and if so which ones.""" assumable_by_compute_services = [] for statement in self.statements: @@ -52,7 +52,7 @@ class AssumeRoleStatement(ResourceStatement): Statements in an AssumeRole/Trust Policy document """ - def __init__(self, statement: Dict[str, Any]) -> None: + def __init__(self, statement: dict[str, Any]) -> None: super().__init__(statement=statement) # self.not_principal = statement.get("NotPrincipal") @@ -62,7 +62,7 @@ def __init__(self, statement: Dict[str, Any]) -> None: "This should NOT be used. We suggest reviewing it ASAP." ) - def _assume_role_actions(self) -> List[str]: + def _assume_role_actions(self) -> list[str]: """Verifies that this is limited to just sts:AssumeRole""" actions = self.statement.get("Action", []) if not actions: @@ -75,7 +75,7 @@ def _assume_role_actions(self) -> List[str]: return [actions] @property - def role_assumable_by_compute_services(self) -> List[str]: + def role_assumable_by_compute_services(self) -> list[str]: """Determines whether or not the role is assumed from a compute service, and if so which ones.""" # sts:AssumeRole must be there lowercase_actions = [x.lower() for x in self.actions] diff --git a/cloudsplaining/scan/authorization_details.py b/cloudsplaining/scan/authorization_details.py index c4fce938..663f988c 100644 --- a/cloudsplaining/scan/authorization_details.py +++ b/cloudsplaining/scan/authorization_details.py @@ -8,7 +8,7 @@ from __future__ import annotations import logging -from typing import Any, Dict, List +from typing import Any from policy_sentry.querying.actions import get_all_action_links from policy_sentry.querying.all import get_all_service_prefixes @@ -31,11 +31,11 @@ class AuthorizationDetails: def __init__( self, - auth_json: Dict[str, List[Dict[str, Any]]], + auth_json: dict[str, list[dict[str, Any]]], exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: """ Object to hold and analyze Account Authorization details. @@ -101,7 +101,7 @@ def __init__( self.role_detail_list.set_iam_data(iam_data) @property - def inline_policies(self) -> Dict[str, Dict[str, Any]]: + def inline_policies(self) -> dict[str, dict[str, Any]]: """Return inline policy details""" results = {} results.update(self.group_detail_list.inline_policies_json) @@ -110,7 +110,7 @@ def inline_policies(self) -> Dict[str, Dict[str, Any]]: return results @property - def links(self) -> Dict[str, str | None]: + def links(self) -> dict[str, str | None]: """Return a dictionary of the action names as keys and their API documentation links as values""" results = {} unique_action_names = set() @@ -133,9 +133,9 @@ def links(self) -> Dict[str, str | None]: return results @property - def results(self) -> Dict[str, Dict[str, Any]]: + def results(self) -> dict[str, dict[str, Any]]: """Get the new JSON format of the Principals data""" - results: Dict[str, Dict[str, Any]] = { + results: dict[str, dict[str, Any]] = { "groups": self.group_detail_list.json, "users": self.user_detail_list.json, "roles": self.role_detail_list.json, diff --git a/cloudsplaining/scan/group_details.py b/cloudsplaining/scan/group_details.py index 90dca4b1..ef377faf 100644 --- a/cloudsplaining/scan/group_details.py +++ b/cloudsplaining/scan/group_details.py @@ -3,7 +3,7 @@ from __future__ import annotations import json -from typing import Any, Dict, List, Optional +from typing import Any from cloudsplaining.scan.inline_policy import InlinePolicy from cloudsplaining.scan.managed_policy_detail import ManagedPolicyDetails @@ -24,12 +24,12 @@ class GroupDetailList: def __init__( self, - group_details: List[Dict[str, Any]], + group_details: list[dict[str, Any]], policy_details: ManagedPolicyDetails, exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: self.severity = [] if severity is None else severity if not isinstance(exclusions, Exclusions): @@ -52,32 +52,32 @@ def __init__( ) for group_detail in group_details ] - self.iam_data: Dict[str, Dict[Any, Any]] = { + self.iam_data: dict[str, dict[Any, Any]] = { "groups": {}, "users": {}, "roles": {}, } - def set_iam_data(self, iam_data: Dict[str, Dict[Any, Any]]) -> None: + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data for group in self.groups: group.set_iam_data(iam_data) - def get_group_detail(self, name: str) -> Optional["GroupDetail"]: + def get_group_detail(self, name: str) -> GroupDetail | None: """Get a GroupDetail object by providing the Name of the group. This is useful to UserDetail objects""" for group_detail in self.groups: if group_detail.group_name == name: return group_detail return None - def get_all_allowed_actions_for_group(self, name: str) -> Optional[List[str]]: + def get_all_allowed_actions_for_group(self, name: str) -> list[str] | None: """Returns a list of all allowed actions by the group across all its policies""" for group_detail in self.groups: if group_detail.group_name == name: return group_detail.all_allowed_actions return None - def get_all_iam_statements_for_group(self, name: str) -> Optional[List[StatementDetail]]: + def get_all_iam_statements_for_group(self, name: str) -> list[StatementDetail] | None: """Returns a list of all StatementDetail objects across all the policies assigned to the group""" for group_detail in self.groups: if group_detail.group_name == name: @@ -85,14 +85,14 @@ def get_all_iam_statements_for_group(self, name: str) -> Optional[List[Statement return None @property - def group_names(self) -> List[str]: + def group_names(self) -> list[str]: """Get a list of all group names in the account""" results = [group_detail.group_name for group_detail in self.groups] results.sort() return results @property - def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str]: + def all_infrastructure_modification_actions_by_inline_policies(self) -> list[str]: """Return a list of all infrastructure modification actions allowed by all inline policies in violation.""" result = set() for group in self.groups: @@ -101,7 +101,7 @@ def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str return sorted(result) @property - def inline_policies_json(self) -> Dict[str, Dict[str, Any]]: + def inline_policies_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of attached inline policies""" results = {} for group_detail in self.groups: @@ -109,7 +109,7 @@ def inline_policies_json(self) -> Dict[str, Dict[str, Any]]: return results @property - def json(self) -> Dict[str, Dict[str, Any]]: + def json(self) -> dict[str, dict[str, Any]]: """Get all JSON results""" result = {group.group_id: group.json for group in self.groups} return result @@ -121,12 +121,12 @@ class GroupDetail: def __init__( self, - group_detail: Dict[str, Any], + group_detail: dict[str, Any], policy_details: ManagedPolicyDetails, exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: """ Initialize the GroupDetail object. @@ -188,13 +188,13 @@ def __init__( except NotFoundException as e: utils.print_red(f"\tError in group {self.group_name}: {e}") - self.iam_data: Dict[str, Dict[Any, Any]] = { + self.iam_data: dict[str, dict[Any, Any]] = { "groups": {}, "users": {}, "roles": {}, } - def set_iam_data(self, iam_data: Dict[str, Dict[Any, Any]]) -> None: + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data for inline_policy in self.inline_policies: inline_policy.set_iam_data(iam_data) @@ -208,7 +208,7 @@ def _is_excluded(self, exclusions: Exclusions) -> bool: ) @property - def all_allowed_actions(self) -> List[str]: + def all_allowed_actions(self) -> list[str]: """Return a list of which actions are allowed by the principal""" actions = set() for managed_policy in self.attached_managed_policies: @@ -218,7 +218,7 @@ def all_allowed_actions(self) -> List[str]: return sorted(actions) @property - def all_iam_statements(self) -> List[StatementDetail]: + def all_iam_statements(self) -> list[StatementDetail]: """Return a list of which actions are allowed by the principal""" statements = set() for managed_policy in self.attached_managed_policies: @@ -228,19 +228,19 @@ def all_iam_statements(self) -> List[StatementDetail]: return list(statements) @property - def attached_managed_policies_json(self) -> Dict[str, Dict[str, Any]]: + def attached_managed_policies_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of attached managed policies""" policies = {policy.policy_id: policy.json_large for policy in self.attached_managed_policies} return policies @property - def attached_managed_policies_pointer_json(self) -> Dict[str, str]: + def attached_managed_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached managed policies so you can look it up in the policies section later.""" policies = {policy.policy_id: policy.policy_name for policy in self.attached_managed_policies} return policies @property - def attached_customer_managed_policies_pointer_json(self) -> Dict[str, str]: + def attached_customer_managed_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached managed policies so you can look it up in the policies section later.""" policies = { policy.policy_id: policy.policy_name @@ -250,7 +250,7 @@ def attached_customer_managed_policies_pointer_json(self) -> Dict[str, str]: return policies @property - def attached_aws_managed_policies_pointer_json(self) -> Dict[str, str]: + def attached_aws_managed_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached managed policies so you can look it up in the policies section later.""" policies = { policy.policy_id: policy.policy_name @@ -260,7 +260,7 @@ def attached_aws_managed_policies_pointer_json(self) -> Dict[str, str]: return policies @property - def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str]: + def all_infrastructure_modification_actions_by_inline_policies(self) -> list[str]: """Return a list of all infrastructure modification actions allowed by all inline policies in violation.""" result = set() for policy in self.inline_policies: @@ -268,19 +268,19 @@ def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str return sorted(result) @property - def inline_policies_json(self) -> Dict[str, Dict[str, Any]]: + def inline_policies_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of attached inline policies""" policies = {policy.policy_id: policy.json_large for policy in self.inline_policies} return policies @property - def inline_policies_pointer_json(self) -> Dict[str, str]: + def inline_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached inline policies so you can look it up in the policies section later.""" policies = {policy.policy_id: policy.policy_name for policy in self.inline_policies} return policies @property - def json(self) -> Dict[str, Any]: + def json(self) -> dict[str, Any]: """Return the JSON representation of the Group Detail""" this_group_detail = dict( arn=self.arn, diff --git a/cloudsplaining/scan/inline_policy.py b/cloudsplaining/scan/inline_policy.py index 6c9c7c83..7a8f743e 100644 --- a/cloudsplaining/scan/inline_policy.py +++ b/cloudsplaining/scan/inline_policy.py @@ -3,7 +3,7 @@ from __future__ import annotations import json -from typing import Any, Dict, List, cast +from typing import Any, cast from cloudsplaining.scan.policy_document import PolicyDocument from cloudsplaining.shared.constants import ISSUE_SEVERITY, RISK_DEFINITION @@ -18,11 +18,11 @@ class InlinePolicy: def __init__( self, - policy_detail: Dict[str, Any], + policy_detail: dict[str, Any], exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: """ Initialize the InlinePolicy object. @@ -40,7 +40,7 @@ def __init__( self.policy_name = policy_detail.get("PolicyName", "") self.policy_document = PolicyDocument( - cast(Dict[str, Any], policy_detail.get("PolicyDocument")), + cast("dict[str, Any]", policy_detail.get("PolicyDocument")), exclusions=exclusions, flag_conditional_statements=flag_conditional_statements, flag_resource_arn_statements=flag_resource_arn_statements, @@ -53,20 +53,20 @@ def __init__( self.exclusions = exclusions self.is_excluded = self._is_excluded(exclusions) self.severity = [] if severity is None else severity - self.iam_data: Dict[str, Dict[Any, Any]] = { + self.iam_data: dict[str, dict[Any, Any]] = { "groups": {}, "users": {}, "roles": {}, } - def set_iam_data(self, iam_data: Dict[str, Dict[Any, Any]]) -> None: + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data def _is_excluded(self, exclusions: Exclusions) -> bool: """Determine whether the policy name or policy ID is excluded""" return bool(exclusions.is_policy_excluded(self.policy_name) or exclusions.is_policy_excluded(self.policy_id)) - def getFindingLinks(self, findings: List[Dict[str, Any]]) -> Dict[str, str]: # noqa: N802 + def getFindingLinks(self, findings: list[dict[str, Any]]) -> dict[str, str]: # noqa: N802 links = {} for finding in findings: links[finding["type"]] = ( @@ -75,8 +75,8 @@ def getFindingLinks(self, findings: List[Dict[str, Any]]) -> Dict[str, str]: # return links @property - def getAttached(self) -> Dict[str, List[Any]]: # noqa: N802 - attached: Dict[str, List[Any]] = {"roles": [], "groups": [], "users": []} + def getAttached(self) -> dict[str, list[Any]]: # noqa: N802 + attached: dict[str, list[Any]] = {"roles": [], "groups": [], "users": []} for principal_type in ["roles", "groups", "users"]: principals = (self.iam_data[principal_type]).keys() for principal_id in principals: @@ -89,7 +89,7 @@ def getAttached(self) -> Dict[str, List[Any]]: # noqa: N802 return attached @property - def json(self) -> Dict[str, Any]: + def json(self) -> dict[str, Any]: """Return JSON output for high risk actions""" result = dict( PolicyName=self.policy_name, @@ -151,7 +151,7 @@ def json(self) -> Dict[str, Any]: return result @property - def json_large(self) -> Dict[str, Any]: + def json_large(self) -> dict[str, Any]: """Return JSON output - including Infra Modification actions, which can be large""" result = dict( PolicyName=self.policy_name, diff --git a/cloudsplaining/scan/managed_policy_detail.py b/cloudsplaining/scan/managed_policy_detail.py index 24153bf5..fb9593a3 100644 --- a/cloudsplaining/scan/managed_policy_detail.py +++ b/cloudsplaining/scan/managed_policy_detail.py @@ -8,7 +8,7 @@ from __future__ import annotations import logging -from typing import Any, Dict, List +from typing import Any from policy_sentry.util.arns import get_account_from_arn @@ -32,11 +32,11 @@ class ManagedPolicyDetails: def __init__( self, - policy_details: List[Dict[str, Any]], + policy_details: list[dict[str, Any]], exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[Any] | None = None, + severity: list[Any] | None = None, ) -> None: self.policy_details = [] if not isinstance(exclusions, Exclusions): @@ -47,7 +47,7 @@ def __init__( self.exclusions = exclusions self.flag_conditional_statements = flag_conditional_statements self.flag_resource_arn_statements = flag_resource_arn_statements - self.iam_data: Dict[str, Dict[Any, Any]] = { + self.iam_data: dict[str, dict[Any, Any]] = { "groups": {}, "users": {}, "roles": {}, @@ -92,7 +92,7 @@ def __init__( ) ) - def get_policy_detail(self, arn: str) -> "ManagedPolicy": + def get_policy_detail(self, arn: str) -> ManagedPolicy: """Get a ManagedPolicy object by providing the ARN. This is useful to PrincipalDetail objects""" for policy_detail in self.policy_details: if policy_detail.arn == arn: @@ -100,7 +100,7 @@ def get_policy_detail(self, arn: str) -> "ManagedPolicy": raise NotFoundException(f"Managed Policy ARN {arn} not found.") @property - def all_infrastructure_modification_actions(self) -> List[str]: + def all_infrastructure_modification_actions(self) -> list[str]: """Return a list of all infrastructure modification actions allowed by all managed policies in violation.""" result = set() for policy in self.policy_details: @@ -108,32 +108,32 @@ def all_infrastructure_modification_actions(self) -> List[str]: return sorted(result) @property - def json(self) -> Dict[str, Dict[str, Any]]: + def json(self) -> dict[str, dict[str, Any]]: """Get all JSON results""" result = {policy.policy_id: policy.json for policy in self.policy_details} return result @property - def json_large(self) -> Dict[str, Dict[str, Any]]: + def json_large(self) -> dict[str, dict[str, Any]]: """Get all JSON results""" result = {policy.policy_id: policy.json_large for policy in self.policy_details} return result @property - def json_large_aws_managed(self) -> Dict[str, Dict[str, Any]]: + def json_large_aws_managed(self) -> dict[str, dict[str, Any]]: """Get all JSON results""" result = {policy.policy_id: policy.json_large for policy in self.policy_details if policy.managed_by == "AWS"} return result @property - def json_large_customer_managed(self) -> Dict[str, Dict[str, Any]]: + def json_large_customer_managed(self) -> dict[str, dict[str, Any]]: """Get all JSON results""" result = { policy.policy_id: policy.json_large for policy in self.policy_details if policy.managed_by == "Customer" } return result - def set_iam_data(self, iam_data: Dict[str, Dict[Any, Any]]) -> None: + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data for policy_detail in self.policy_details: policy_detail.set_iam_data(iam_data) @@ -149,11 +149,11 @@ class ManagedPolicy: def __init__( self, - policy_detail: Dict[str, Any], + policy_detail: dict[str, Any], exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: # Store the Raw JSON data from this for safekeeping self.policy_detail = policy_detail @@ -172,7 +172,7 @@ def __init__( self.is_attachable = policy_detail.get("IsAttachable") self.create_date = policy_detail.get("CreateDate") self.update_date = policy_detail.get("UpdateDate") - self.iam_data: Dict[str, Dict[Any, Any]] = { + self.iam_data: dict[str, dict[Any, Any]] = { "groups": {}, "users": {}, "roles": {}, @@ -194,7 +194,7 @@ def __init__( self.severity = [] if severity is None else severity - def set_iam_data(self, iam_data: Dict[str, Dict[Any, Any]]) -> None: + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data def _is_excluded(self, exclusions: Exclusions) -> bool: @@ -240,7 +240,7 @@ def account_id(self) -> str: # pragma: no cover else: return get_account_from_arn(self.arn) - def getFindingLinks(self, findings: List[Dict[str, Any]]) -> Dict[Any, str]: # noqa: N802 + def getFindingLinks(self, findings: list[dict[str, Any]]) -> dict[Any, str]: # noqa: N802 links = {} for finding in findings: links[finding["type"]] = ( @@ -249,8 +249,8 @@ def getFindingLinks(self, findings: List[Dict[str, Any]]) -> Dict[Any, str]: # return links @property - def getAttached(self) -> Dict[str, Any]: # noqa: N802 - attached: Dict[str, Any] = {"roles": [], "groups": [], "users": []} + def getAttached(self) -> dict[str, Any]: # noqa: N802 + attached: dict[str, Any] = {"roles": [], "groups": [], "users": []} for principal_type in ("roles", "groups", "users"): principals = self.iam_data[principal_type].keys() for principal_id in principals: @@ -266,7 +266,7 @@ def getAttached(self) -> Dict[str, Any]: # noqa: N802 return attached @property - def json(self) -> Dict[str, Any]: + def json(self) -> dict[str, Any]: """Return JSON output for high risk actions""" result = dict( PolicyName=self.policy_name, @@ -335,7 +335,7 @@ def json(self) -> Dict[str, Any]: return result @property - def json_large(self) -> Dict[str, Any]: + def json_large(self) -> dict[str, Any]: """Return JSON output - including Infra Modification actions, which can be large""" result = dict( PolicyName=self.policy_name, diff --git a/cloudsplaining/scan/policy_document.py b/cloudsplaining/scan/policy_document.py index bd9e94b8..0d85ce9f 100644 --- a/cloudsplaining/scan/policy_document.py +++ b/cloudsplaining/scan/policy_document.py @@ -6,8 +6,10 @@ # Licensed under the BSD 3-Clause license. # For full license text, see the LICENSE file in the repo root # or https://opensource.org/licenses/BSD-3-Clause +from __future__ import annotations + import logging -from typing import Any, Dict, List, Set +from typing import Any from policy_sentry.querying.all import get_all_service_prefixes @@ -31,7 +33,7 @@ class PolicyDocument: def __init__( self, - policy: Dict[str, Any], + policy: dict[str, Any], exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, @@ -63,12 +65,12 @@ def __init__( ) @property - def json(self) -> Dict[str, Any]: + def json(self) -> dict[str, Any]: """Return the Policy in JSON""" return self.policy @property - def all_allowed_actions(self) -> List[str]: + def all_allowed_actions(self) -> list[str]: """Output all allowed IAM Actions, regardless of resource constraints""" allowed_actions = set() for statement in self.statements: @@ -78,7 +80,7 @@ def all_allowed_actions(self) -> List[str]: allowed_actions = self.filter_deny_statements(allowed_actions) return list(allowed_actions) - def filter_deny_statements(self, allowed_actions: Set[str]) -> Set[str]: + def filter_deny_statements(self, allowed_actions: set[str]) -> set[str]: """ filter all denied statements from actions """ @@ -88,7 +90,7 @@ def filter_deny_statements(self, allowed_actions: Set[str]) -> Set[str]: return allowed_actions @property - def all_allowed_unrestricted_actions(self) -> List[str]: + def all_allowed_unrestricted_actions(self) -> list[str]: """Output all IAM actions that do not practice resource constraints""" allowed_actions = set() for statement in self.statements: @@ -106,7 +108,7 @@ def all_allowed_unrestricted_actions(self) -> List[str]: return list(allowed_actions) @property - def all_allowed_unrestrictable_actions(self) -> List[str]: + def all_allowed_unrestrictable_actions(self) -> list[str]: """Output all IAM actions that cannot be restricted by resource constraints""" allowed_actions = set() for statement in self.statements: @@ -119,7 +121,7 @@ def all_allowed_unrestrictable_actions(self) -> List[str]: return list(allowed_actions) @property - def infrastructure_modification(self) -> List[str]: + def infrastructure_modification(self) -> list[str]: """Return a list of modify only missing resource constraints""" actions_missing_resource_constraints = [] for statement in self.statements: @@ -131,7 +133,7 @@ def infrastructure_modification(self) -> List[str]: return actions_missing_resource_constraints @property - def contains_statement_using_not_action(self) -> List[Dict[str, Any]]: + def contains_statement_using_not_action(self) -> list[dict[str, Any]]: """If NotAction is used, flag it so the assessor can triage manually""" not_action_statements = [] for statement in self.statements: @@ -145,7 +147,7 @@ def contains_statement_using_not_action(self) -> List[Dict[str, Any]]: return not_action_statements @property - def allows_privilege_escalation(self) -> List[Dict[str, Any]]: + def allows_privilege_escalation(self) -> list[dict[str, Any]]: """ Determines whether or not the policy allows privilege escalation action combinations published by Rhino Security Labs. @@ -162,7 +164,7 @@ def allows_privilege_escalation(self) -> List[Dict[str, Any]]: return escalations @property - def permissions_management_without_constraints(self) -> List[str]: + def permissions_management_without_constraints(self) -> list[str]: """Where applicable, returns a list of 'Permissions management' IAM actions in the statement that do not have resource constraints""" result = [] @@ -172,7 +174,7 @@ def permissions_management_without_constraints(self) -> List[str]: return result @property - def write_actions_without_constraints(self) -> List[str]: + def write_actions_without_constraints(self) -> list[str]: """Where applicable, returns a list of 'Write' level IAM actions in the statement that do not have resource constraints""" result = [] @@ -182,7 +184,7 @@ def write_actions_without_constraints(self) -> List[str]: return result @property - def tagging_actions_without_constraints(self) -> List[str]: + def tagging_actions_without_constraints(self) -> list[str]: """Where applicable, returns a list of 'Tagging' level IAM actions in the statement that do not have resource constraints""" result = [] @@ -191,9 +193,9 @@ def tagging_actions_without_constraints(self) -> List[str]: result.extend(statement.tagging_actions_without_constraints) return result - def allows_specific_actions_without_constraints(self, specific_actions: List[str]) -> List[str]: + def allows_specific_actions_without_constraints(self, specific_actions: list[str]) -> list[str]: """Determine whether or not a list of specific IAM Actions are allowed without resource constraints.""" - allowed: Set[str] = set() + allowed: set[str] = set() if not isinstance(specific_actions, list): raise Exception("Please supply a list of actions.") @@ -215,7 +217,7 @@ def allows_specific_actions_without_constraints(self, specific_actions: List[str return results @property - def allows_data_exfiltration_actions(self) -> List[str]: + def allows_data_exfiltration_actions(self) -> list[str]: """If any 'Data exfiltration' actions are allowed without resource constraints, return those actions.""" return [ action @@ -224,7 +226,7 @@ def allows_data_exfiltration_actions(self) -> List[str]: ] @property - def credentials_exposure(self) -> List[str]: + def credentials_exposure(self) -> list[str]: """Determine if the action returns credentials""" # https://gist.github.com/kmcquade/33860a617e651104d243c324ddf7992a return [ @@ -234,7 +236,7 @@ def credentials_exposure(self) -> List[str]: ] @property - def service_wildcard(self) -> List[str]: + def service_wildcard(self) -> list[str]: """Determine if the policy gives access to all actions within a service - simple grepping""" services = set() all_service_prefixes = get_all_service_prefixes() diff --git a/cloudsplaining/scan/role_details.py b/cloudsplaining/scan/role_details.py index 735fe2be..8235627e 100644 --- a/cloudsplaining/scan/role_details.py +++ b/cloudsplaining/scan/role_details.py @@ -4,7 +4,7 @@ import json import logging -from typing import Any, Dict, List, Optional +from typing import Any from cloudsplaining.scan.assume_role_policy_document import AssumeRolePolicyDocument from cloudsplaining.scan.inline_policy import InlinePolicy @@ -32,12 +32,12 @@ class RoleDetailList: def __init__( self, - role_details: List[Dict[str, Any]], + role_details: list[dict[str, Any]], policy_details: ManagedPolicyDetails, exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: self.severity = [] if severity is None else severity self.roles = [] @@ -48,7 +48,7 @@ def __init__( # Fix Issue #254 - Allow flagging risky actions even when there are resource constraints self.flag_conditional_statements = flag_conditional_statements self.flag_resource_arn_statements = flag_resource_arn_statements - self.iam_data: Dict[str, Dict[Any, Any]] = { + self.iam_data: dict[str, dict[Any, Any]] = { "groups": {}, "users": {}, "roles": {}, @@ -75,19 +75,19 @@ def __init__( ) ) - def set_iam_data(self, iam_data: Dict[str, Dict[Any, Any]]) -> None: + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data for role in self.roles: role.set_iam_data(iam_data) - def get_all_allowed_actions_for_role(self, name: str) -> Optional[List[str]]: + def get_all_allowed_actions_for_role(self, name: str) -> list[str] | None: """Returns a list of all allowed actions by the role across all its policies""" for role_detail in self.roles: if role_detail.role_name == name: return role_detail.all_allowed_actions return None - def get_all_iam_statements_for_role(self, name: str) -> Optional[List[StatementDetail]]: + def get_all_iam_statements_for_role(self, name: str) -> list[StatementDetail] | None: """Returns a list of all StatementDetail objects across all the policies assigned to the role""" for role_detail in self.roles: if role_detail.role_name == name: @@ -95,14 +95,14 @@ def get_all_iam_statements_for_role(self, name: str) -> Optional[List[StatementD return None @property - def role_names(self) -> List[str]: + def role_names(self) -> list[str]: """Get a list of all role names in the account""" results = [role_detail.role_name for role_detail in self.roles] results.sort() return results @property - def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str]: + def all_infrastructure_modification_actions_by_inline_policies(self) -> list[str]: """Return a list of all infrastructure modification actions allowed by all inline policies in violation.""" result = set() for role in self.roles: @@ -111,7 +111,7 @@ def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str return sorted(result) @property - def inline_policies_json(self) -> Dict[str, Dict[str, Any]]: + def inline_policies_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of attached inline policies""" results = {} for role_detail in self.roles: @@ -119,7 +119,7 @@ def inline_policies_json(self) -> Dict[str, Dict[str, Any]]: return results @property - def json(self) -> Dict[str, Dict[str, Any]]: + def json(self) -> dict[str, dict[str, Any]]: """Get all JSON results""" result = {role.role_id: role.json for role in self.roles} return result @@ -131,12 +131,12 @@ class RoleDetail: def __init__( self, - role_detail: Dict[str, Any], + role_detail: dict[str, Any], policy_details: ManagedPolicyDetails, exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: """ Initialize the RoleDetail object. @@ -164,7 +164,7 @@ def __init__( self.flag_conditional_statements = flag_conditional_statements self.flag_resource_arn_statements = flag_resource_arn_statements - self.iam_data: Dict[str, Dict[Any, Any]] = { + self.iam_data: dict[str, dict[Any, Any]] = { "groups": {}, "users": {}, "roles": {}, @@ -214,7 +214,7 @@ def __init__( except NotFoundException as e: utils.print_red(f"\tError in role {self.role_name}: {e}") - def set_iam_data(self, iam_data: Dict[str, Dict[Any, Any]]) -> None: + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data for inline_policy in self.inline_policies: inline_policy.set_iam_data(iam_data) @@ -229,7 +229,7 @@ def _is_excluded(self, exclusions: Exclusions) -> bool: ) @property - def all_allowed_actions(self) -> List[str]: + def all_allowed_actions(self) -> list[str]: """Return a list of which actions are allowed by the principal""" actions = set() for managed_policy in self.attached_managed_policies: @@ -239,7 +239,7 @@ def all_allowed_actions(self) -> List[str]: return sorted(actions) @property - def all_iam_statements(self) -> List[StatementDetail]: + def all_iam_statements(self) -> list[StatementDetail]: """Return a list of which actions are allowed by the principal""" statements = set() for managed_policy in self.attached_managed_policies: @@ -249,7 +249,7 @@ def all_iam_statements(self) -> List[StatementDetail]: return list(statements) @property - def attached_managed_policies_json(self) -> Dict[str, Dict[str, Any]]: + def attached_managed_policies_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of attached managed policies""" policies = {} for policy in self.attached_managed_policies: @@ -260,7 +260,7 @@ def attached_managed_policies_json(self) -> Dict[str, Dict[str, Any]]: return policies @property - def attached_managed_policies_pointer_json(self) -> Dict[str, str]: + def attached_managed_policies_pointer_json(self) -> dict[str, str]: """Return JSON representation of attached managed policies - but just with pointers to the Policy ID""" policies = {} for policy in self.attached_managed_policies: @@ -271,7 +271,7 @@ def attached_managed_policies_pointer_json(self) -> Dict[str, str]: return policies @property - def attached_customer_managed_policies_pointer_json(self) -> Dict[str, str]: + def attached_customer_managed_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached managed policies so you can look it up in the policies section later.""" policies = { policy.policy_id: policy.policy_name @@ -281,7 +281,7 @@ def attached_customer_managed_policies_pointer_json(self) -> Dict[str, str]: return policies @property - def attached_aws_managed_policies_pointer_json(self) -> Dict[str, str]: + def attached_aws_managed_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached managed policies so you can look it up in the policies section later.""" policies = { policy.policy_id: policy.policy_name @@ -291,7 +291,7 @@ def attached_aws_managed_policies_pointer_json(self) -> Dict[str, str]: return policies @property - def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str]: + def all_infrastructure_modification_actions_by_inline_policies(self) -> list[str]: """Return a list of all infrastructure modification actions allowed by all inline policies in violation.""" result = set() for policy in self.inline_policies: @@ -299,19 +299,19 @@ def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str return sorted(result) @property - def inline_policies_json(self) -> Dict[str, Dict[str, Any]]: + def inline_policies_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of attached inline policies""" policies = {policy.policy_id: policy.json_large for policy in self.inline_policies} return policies @property - def inline_policies_pointer_json(self) -> Dict[str, str]: + def inline_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached inline policies so you can look it up in the policies section later.""" policies = {policy.policy_id: policy.policy_name for policy in self.inline_policies} return policies @property - def json(self) -> Dict[str, Any]: + def json(self) -> dict[str, Any]: """Return the JSON representation of the Role Detail""" assume_role_json = self.assume_role_policy_document.json if self.assume_role_policy_document else {} this_role_detail = dict( diff --git a/cloudsplaining/scan/statement_detail.py b/cloudsplaining/scan/statement_detail.py index 785d5370..f8be8819 100644 --- a/cloudsplaining/scan/statement_detail.py +++ b/cloudsplaining/scan/statement_detail.py @@ -1,7 +1,9 @@ """Abstracts evaluation of IAM Policy statements.""" +from __future__ import annotations + import logging -from typing import Any, Dict, List, Optional +from typing import Any from cached_property import cached_property from policy_sentry.analysis.expand import determine_actions_to_expand @@ -36,7 +38,7 @@ class StatementDetail: def __init__( self, - statement: Dict[str, Any], + statement: dict[str, Any], flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, ) -> None: @@ -61,7 +63,7 @@ def __init__( self.unrestrictable_actions = list(set(self.expanded_actions or []) - set(self.restrictable_actions)) self.has_resource_constraints = self._has_resource_constraints() - def _actions(self) -> List[str]: + def _actions(self) -> list[str]: """Holds the actions in a statement""" actions = self.statement.get("Action") if not actions: @@ -70,7 +72,7 @@ def _actions(self) -> List[str]: return [actions] return actions - def _resources(self) -> List[str]: + def _resources(self) -> list[str]: """Holds the resource ARNs in a statement""" resources = self.statement.get("Resource") if not resources: @@ -80,7 +82,7 @@ def _resources(self) -> List[str]: return [resources] return resources - def _not_action(self) -> List[str]: + def _not_action(self) -> list[str]: """Holds the NotAction details. We won't do anything with it - but we will flag it as something for the assessor to triage. """ @@ -91,7 +93,7 @@ def _not_action(self) -> List[str]: return [not_action] return not_action - def _not_resource(self) -> List[str]: + def _not_resource(self) -> list[str]: """Holds the NotResource details. We won't do anything with it - but we will flag it as something for the assessor to triage. """ @@ -103,7 +105,7 @@ def _not_resource(self) -> List[str]: return not_resource # @property - def _not_action_effective_actions(self) -> Optional[List[str]]: + def _not_action_effective_actions(self) -> list[str] | None: """If NotAction is used, calculate the allowed actions - i.e., what it would be""" effective_actions = [] if not self.not_action: @@ -164,11 +166,11 @@ def has_not_resource_with_allow(self) -> bool: return False @cached_property - def expanded_actions(self) -> Optional[List[str]]: + def expanded_actions(self) -> list[str] | None: """Expands the full list of allowed actions from the Policy/""" if self.actions: - expanded: List[str] = determine_actions_to_expand(self.actions) + expanded: list[str] = determine_actions_to_expand(self.actions) expanded.sort() return expanded elif self.not_action: @@ -189,7 +191,7 @@ def effect_allow(self) -> bool: return bool(self.effect == "Allow") @property - def services_in_use(self) -> List[str]: + def services_in_use(self) -> list[str]: """Get a list of the services in use by the statement.""" service_prefixes = set() for action in self.expanded_actions: @@ -198,7 +200,7 @@ def services_in_use(self) -> List[str]: return sorted(service_prefixes) @property - def permissions_management_actions_without_constraints(self) -> List[str]: + def permissions_management_actions_without_constraints(self) -> list[str]: """Where applicable, returns a list of 'Permissions management' IAM actions in the statement that do not have resource constraints""" result = [] @@ -208,7 +210,7 @@ def permissions_management_actions_without_constraints(self) -> List[str]: return result @property - def write_actions_without_constraints(self) -> List[str]: + def write_actions_without_constraints(self) -> list[str]: """Where applicable, returns a list of 'Write' level IAM actions in the statement that do not have resource constraints""" result = [] @@ -218,7 +220,7 @@ def write_actions_without_constraints(self) -> List[str]: return result @property - def tagging_actions_without_constraints(self) -> List[str]: + def tagging_actions_without_constraints(self) -> list[str]: """Where applicable, returns a list of 'Tagging' level IAM actions in the statement that do not have resource constraints""" result = [] @@ -226,7 +228,7 @@ def tagging_actions_without_constraints(self) -> List[str]: result = remove_actions_not_matching_access_level(self.restrictable_actions, "Tagging") return result - def missing_resource_constraints(self, exclusions: Exclusions = DEFAULT_EXCLUSIONS) -> List[str]: + def missing_resource_constraints(self, exclusions: Exclusions = DEFAULT_EXCLUSIONS) -> list[str]: """Return a list of any actions - regardless of access level - allowed by the statement that do not leverage resource constraints.""" if not isinstance(exclusions, Exclusions): @@ -245,7 +247,7 @@ def missing_resource_constraints(self, exclusions: Exclusions = DEFAULT_EXCLUSIO result.sort() return result - def missing_resource_constraints_for_modify_actions(self, exclusions: Exclusions = DEFAULT_EXCLUSIONS) -> List[str]: + def missing_resource_constraints_for_modify_actions(self, exclusions: Exclusions = DEFAULT_EXCLUSIONS) -> list[str]: """ Determine whether or not any actions at the 'Write', 'Permissions management', or 'Tagging' access levels are allowed by the statement without resource constraints. diff --git a/cloudsplaining/scan/user_details.py b/cloudsplaining/scan/user_details.py index 8947abf4..fdd1aeee 100644 --- a/cloudsplaining/scan/user_details.py +++ b/cloudsplaining/scan/user_details.py @@ -3,7 +3,7 @@ from __future__ import annotations import json -from typing import Any, Dict, List, Optional +from typing import Any from cloudsplaining.scan.group_details import GroupDetail, GroupDetailList from cloudsplaining.scan.inline_policy import InlinePolicy @@ -25,13 +25,13 @@ class UserDetailList: def __init__( self, - user_details: List[Dict[str, Any]], + user_details: list[dict[str, Any]], policy_details: ManagedPolicyDetails, all_group_details: GroupDetailList, exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: self.severity = [] if severity is None else severity if not isinstance(exclusions, Exclusions): @@ -56,25 +56,25 @@ def __init__( for user_detail in user_details ] - self.iam_data: Dict[str, Dict[Any, Any]] = { + self.iam_data: dict[str, dict[Any, Any]] = { "groups": {}, "users": {}, "roles": {}, } - def set_iam_data(self, iam_data: Dict[str, Dict[Any, Any]]) -> None: + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data for user in self.users: user.set_iam_data(iam_data) - def get_all_allowed_actions_for_user(self, name: str) -> Optional[List[str]]: + def get_all_allowed_actions_for_user(self, name: str) -> list[str] | None: """Returns a list of all allowed actions by the user across all its policies""" for user_detail in self.users: if user_detail.user_name == name: return user_detail.all_allowed_actions return None - def get_all_iam_statements_for_user(self, name: str) -> Optional[List[StatementDetail]]: + def get_all_iam_statements_for_user(self, name: str) -> list[StatementDetail] | None: """Returns a list of all StatementDetail objects across all the policies assigned to the user""" for user_detail in self.users: if user_detail.user_name == name: @@ -82,14 +82,14 @@ def get_all_iam_statements_for_user(self, name: str) -> Optional[List[StatementD return None @property - def user_names(self) -> List[str]: + def user_names(self) -> list[str]: """Get a list of all user names in the account""" results = [user_detail.user_name for user_detail in self.users] results.sort() return results @property - def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str]: + def all_infrastructure_modification_actions_by_inline_policies(self) -> list[str]: """Return a list of all infrastructure modification actions allowed by all inline policies in violation.""" result = set() for user in self.users: @@ -98,7 +98,7 @@ def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str return sorted(result) @property - def inline_policies_json(self) -> Dict[str, Dict[str, Any]]: + def inline_policies_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of attached inline policies""" results = {} for user_detail in self.users: @@ -106,7 +106,7 @@ def inline_policies_json(self) -> Dict[str, Dict[str, Any]]: return results @property - def json(self) -> Dict[str, Dict[str, Any]]: + def json(self) -> dict[str, dict[str, Any]]: """Get all JSON results""" result = {user.user_id: user.json for user in self.users} return result @@ -118,13 +118,13 @@ class UserDetail: def __init__( self, - user_detail: Dict[str, Any], + user_detail: dict[str, Any], policy_details: ManagedPolicyDetails, all_group_details: GroupDetailList, exclusions: Exclusions = DEFAULT_EXCLUSIONS, flag_conditional_statements: bool = False, flag_resource_arn_statements: bool = False, - severity: List[str] | None = None, + severity: list[str] | None = None, ) -> None: """ Initialize the UserDetail object. @@ -141,7 +141,7 @@ def __init__( self.user_id = user_detail["UserId"] self.user_name = user_detail["UserName"] - self.iam_data: Dict[str, Dict[Any, Any]] = { + self.iam_data: dict[str, dict[Any, Any]] = { "groups": {}, "users": {}, "roles": {}, @@ -159,7 +159,7 @@ def __init__( self.flag_resource_arn_statements = flag_resource_arn_statements # Groups - self.groups: List[GroupDetail] = [] + self.groups: list[GroupDetail] = [] group_list = user_detail.get("GroupList") if group_list: self._add_group_details(group_list, all_group_details) @@ -201,7 +201,7 @@ def __init__( except NotFoundException as e: utils.print_red(f"\tError in user {self.user_name}: {e}") - def set_iam_data(self, iam_data: Dict[str, Dict[Any, Any]]) -> None: + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data for inline_policy in self.inline_policies: inline_policy.set_iam_data(iam_data) @@ -214,14 +214,14 @@ def _is_excluded(self, exclusions: Exclusions) -> bool: or exclusions.is_principal_excluded(self.path, "User") ) - def _add_group_details(self, group_list: List[str], all_group_details: GroupDetailList) -> None: + def _add_group_details(self, group_list: list[str], all_group_details: GroupDetailList) -> None: for group in group_list: this_group_detail = all_group_details.get_group_detail(group) if this_group_detail: self.groups.append(this_group_detail) @property - def all_allowed_actions(self) -> List[str]: + def all_allowed_actions(self) -> list[str]: """Return a list of which actions are allowed by the principal""" actions = set() for managed_policy in self.attached_managed_policies: @@ -233,7 +233,7 @@ def all_allowed_actions(self) -> List[str]: return sorted(actions) @property - def all_iam_statements(self) -> List[StatementDetail]: + def all_iam_statements(self) -> list[StatementDetail]: """Return a list of which actions are allowed by the principal""" statements = set() for managed_policy in self.attached_managed_policies: @@ -245,19 +245,19 @@ def all_iam_statements(self) -> List[StatementDetail]: return list(statements) @property - def attached_managed_policies_json(self) -> Dict[str, Dict[str, Any]]: + def attached_managed_policies_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of attached managed policies""" policies = {policy.policy_id: policy.json for policy in self.attached_managed_policies} return policies @property - def attached_managed_policies_pointer_json(self) -> Dict[str, str]: + def attached_managed_policies_pointer_json(self) -> dict[str, str]: """Return JSON representation of attached managed policies - but just with pointers to the Policy ID""" policies = {policy.policy_id: policy.policy_name for policy in self.attached_managed_policies} return policies @property - def attached_customer_managed_policies_pointer_json(self) -> Dict[str, str]: + def attached_customer_managed_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached managed policies so you can look it up in the policies section later.""" policies = { policy.policy_id: policy.policy_name @@ -267,7 +267,7 @@ def attached_customer_managed_policies_pointer_json(self) -> Dict[str, str]: return policies @property - def attached_aws_managed_policies_pointer_json(self) -> Dict[str, str]: + def attached_aws_managed_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached managed policies so you can look it up in the policies section later.""" policies = { policy.policy_id: policy.policy_name @@ -277,7 +277,7 @@ def attached_aws_managed_policies_pointer_json(self) -> Dict[str, str]: return policies @property - def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str]: + def all_infrastructure_modification_actions_by_inline_policies(self) -> list[str]: """Return a list of all infrastructure modification actions allowed by all inline policies in violation.""" result = set() for policy in self.inline_policies: @@ -285,19 +285,19 @@ def all_infrastructure_modification_actions_by_inline_policies(self) -> List[str return sorted(result) @property - def inline_policies_json(self) -> Dict[str, Dict[str, Any]]: + def inline_policies_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of attached inline policies""" policies = {policy.policy_id: policy.json_large for policy in self.inline_policies} return policies @property - def inline_policies_pointer_json(self) -> Dict[str, str]: + def inline_policies_pointer_json(self) -> dict[str, str]: """Return metadata on attached inline policies so you can look it up in the policies section later.""" policies = {policy.policy_id: policy.policy_name for policy in self.inline_policies} return policies @property - def groups_json(self) -> Dict[str, Dict[str, Any]]: + def groups_json(self) -> dict[str, dict[str, Any]]: """Return JSON representation of group object""" these_groups = { group.group_name: group.json # TODO: Change this to a group pointer? @@ -306,7 +306,7 @@ def groups_json(self) -> Dict[str, Dict[str, Any]]: return these_groups @property - def json(self) -> Dict[str, Any]: + def json(self) -> dict[str, Any]: """Return the JSON representation of the User Detail""" this_user_detail = dict( diff --git a/cloudsplaining/shared/aws_login.py b/cloudsplaining/shared/aws_login.py index 7ff5b455..90a17daf 100644 --- a/cloudsplaining/shared/aws_login.py +++ b/cloudsplaining/shared/aws_login.py @@ -4,7 +4,7 @@ import logging import os -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING import boto3 from botocore.config import Config @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -def get_boto3_client(service: str, profile: Optional[str] = None, region: str = "us-east-1") -> BaseClient: +def get_boto3_client(service: str, profile: str | None = None, region: str = "us-east-1") -> BaseClient: """Get a boto3 client for a given service""" logging.getLogger("botocore").setLevel(logging.CRITICAL) session_data = {"region_name": region} @@ -38,7 +38,7 @@ def get_boto3_client(service: str, profile: Optional[str] = None, region: str = return client -def get_boto3_resource(service: str, profile: Optional[str] = None, region: str = "us-east-1") -> ServiceResource: +def get_boto3_resource(service: str, profile: str | None = None, region: str = "us-east-1") -> ServiceResource: """Get a boto3 resource for a given service""" logging.getLogger("botocore").setLevel(logging.CRITICAL) session_data = {"region_name": region} @@ -58,9 +58,9 @@ def get_current_account_id(sts_client: STSClient) -> str: return current_account_id -def get_available_regions(service: str) -> List[str]: +def get_available_regions(service: str) -> list[str]: """AWS exposes their list of regions as an API. Gather the list.""" - regions: List[str] = boto3.session.Session().get_available_regions(service) + regions = boto3.session.Session().get_available_regions(service) logger.debug("The service %s does not have available regions. Returning us-east-1 as default") if not regions: regions = ["us-east-1"] @@ -71,8 +71,8 @@ def get_target_account_credentials( target_account_role_name: str, target_account_id: str, role_session_name: str = "Cloudsplaining", - profile: Optional[str] = None, -) -> Tuple[str, str, str]: + profile: str | None = None, +) -> tuple[str, str, str]: """ Get a boto3 client for a given AWS service diff --git a/cloudsplaining/shared/exclusions.py b/cloudsplaining/shared/exclusions.py index e6eb1792..fb9d2af3 100644 --- a/cloudsplaining/shared/exclusions.py +++ b/cloudsplaining/shared/exclusions.py @@ -5,8 +5,9 @@ # Licensed under the BSD 3-Clause license. # For full license text, see the LICENSE file in the repo root # or https://opensource.org/licenses/BSD-3-Clause +from __future__ import annotations + import logging -from typing import Dict, List, Union from cloudsplaining.shared import utils from cloudsplaining.shared.constants import DEFAULT_EXCLUSIONS_CONFIG @@ -18,7 +19,7 @@ class Exclusions: """Contains the exclusions configuration as an object""" - def __init__(self, exclusions_config: Dict[str, List[str]] = DEFAULT_EXCLUSIONS_CONFIG) -> None: + def __init__(self, exclusions_config: dict[str, list[str]] = DEFAULT_EXCLUSIONS_CONFIG) -> None: check_exclusions_schema(exclusions_config) self.config = exclusions_config self.include_actions = self._include_actions() @@ -28,43 +29,43 @@ def __init__(self, exclusions_config: Dict[str, List[str]] = DEFAULT_EXCLUSIONS_ self.groups = self._groups() self.policies = self._policies() - def _roles(self) -> List[str]: + def _roles(self) -> list[str]: provided_roles = self.config.get("roles", []) # Normalize for comparisons roles = [role.lower() for role in provided_roles] return roles - def _users(self) -> List[str]: + def _users(self) -> list[str]: provided_users = self.config.get("users", []) # Normalize for comparisons users = [user.lower() for user in provided_users] return users - def _groups(self) -> List[str]: + def _groups(self) -> list[str]: provided_groups = self.config.get("groups", []) # Normalize for comparisons groups = [group.lower() for group in provided_groups] return groups - def _policies(self) -> List[str]: + def _policies(self) -> list[str]: provided_policies = self.config.get("policies", []) # Normalize for comparisons policies = [policy.lower() for policy in provided_policies] return policies - def _include_actions(self) -> List[str]: + def _include_actions(self) -> list[str]: include_actions = self.config.get("include-actions", []) # Set to lowercase so subsequent evaluations are faster. always_include_actions = [x.lower() for x in include_actions] return always_include_actions - def _exclude_actions(self) -> List[str]: + def _exclude_actions(self) -> list[str]: exclude_actions = self.config.get("exclude-actions", []) # Set to lowercase so subsequent evaluations are faster. always_exclude_actions = [x.lower() for x in exclude_actions] return always_exclude_actions - def is_action_always_included(self, action_in_question: str) -> Union[bool, str]: + def is_action_always_included(self, action_in_question: str) -> bool | str: """ Supply an IAM action, and get a decision about whether or not it is excluded. @@ -112,7 +113,7 @@ def is_principal_excluded(self, principal: str, principal_type: str) -> bool: else: # pragma: no cover raise Exception("Please supply User, Group, or Role as the principal argument.") - def get_allowed_actions(self, requested_actions: List[str]) -> List[str]: + def get_allowed_actions(self, requested_actions: list[str]) -> list[str]: """Given a list of actions, it will evaluate those actions against the exclusions configuration and return a list of actions after filtering for exclusions.""" @@ -130,7 +131,7 @@ def get_allowed_actions(self, requested_actions: List[str]) -> List[str]: # pylint: disable=inconsistent-return-statements -def is_name_excluded(name: str, exclusions_list: Union[str, List[str]]) -> bool: +def is_name_excluded(name: str, exclusions_list: str | list[str]) -> bool: """ :param name: The name of the policy, role, user, or group :param exclusions_list: List of exclusions diff --git a/cloudsplaining/shared/utils.py b/cloudsplaining/shared/utils.py index 752f7610..01f47dc1 100644 --- a/cloudsplaining/shared/utils.py +++ b/cloudsplaining/shared/utils.py @@ -5,12 +5,14 @@ # Licensed under the BSD 3-Clause license. # For full license text, see the LICENSE file in the repo root # or https://opensource.org/licenses/BSD-3-Clause +from __future__ import annotations + import json import logging import os from hashlib import sha256 from pathlib import Path -from typing import Any, Dict, List +from typing import Any import yaml from policy_sentry.querying.actions import ( @@ -27,7 +29,7 @@ END = "\033[0m" -def remove_wildcard_only_actions(actions_list: List[str]) -> List[str]: +def remove_wildcard_only_actions(actions_list: list[str]) -> list[str]: """Given a list of actions, remove the ones that CANNOT be restricted to ARNs, leaving only the ones that CAN.""" try: actions_list_unique = set(actions_list) @@ -55,11 +57,11 @@ def remove_wildcard_only_actions(actions_list: List[str]) -> List[str]: return results -def remove_read_level_actions(actions_list: List[str]) -> List[str]: +def remove_read_level_actions(actions_list: list[str]) -> list[str]: """Given a set of actions, return that list of actions, but only with actions at the 'Write', 'Tagging', or 'Permissions management' levels """ - modify_actions: List[str] = remove_actions_not_matching_access_level(actions_list, "Write") + modify_actions: list[str] = remove_actions_not_matching_access_level(actions_list, "Write") modify_actions.extend(remove_actions_not_matching_access_level(actions_list, "Permissions management")) modify_actions.extend(remove_actions_not_matching_access_level(actions_list, "Tagging")) return modify_actions @@ -118,7 +120,7 @@ def is_aws_managed(arn: str) -> bool: # pragma: no cover -def write_results_data_file(results: Dict[str, Dict[str, Any]], raw_data_file: str) -> str: +def write_results_data_file(results: dict[str, dict[str, Any]], raw_data_file: str) -> str: """ Writes the raw data file containing all the results for an AWS account @@ -131,9 +133,9 @@ def write_results_data_file(results: Dict[str, Dict[str, Any]], raw_data_file: s return raw_data_file -def read_yaml_file(filename: str) -> Dict[str, Any]: +def read_yaml_file(filename: str) -> dict[str, Any]: """Reads a YAML file, safe loads, and returns the dictionary""" - cfg: Dict[str, Any] = yaml.safe_load(Path(filename).read_text(encoding="utf-8")) + cfg: dict[str, Any] = yaml.safe_load(Path(filename).read_text(encoding="utf-8")) return cfg diff --git a/cloudsplaining/shared/validation.py b/cloudsplaining/shared/validation.py index b0fcef9c..678c6c57 100644 --- a/cloudsplaining/shared/validation.py +++ b/cloudsplaining/shared/validation.py @@ -5,8 +5,10 @@ # Licensed under the BSD 3-Clause license. # For full license text, see the LICENSE file in the repo root # or https://opensource.org/licenses/BSD-3-Clause +from __future__ import annotations + import logging -from typing import Any, Dict, List +from typing import Any from schema import Optional, Schema, SchemaError @@ -34,7 +36,7 @@ # pragma: no cover -def check(conf_schema: Schema, conf: Dict[str, List[Any]]) -> bool: +def check(conf_schema: Schema, conf: dict[str, list[Any]]) -> bool: """ Validates a user-supplied JSON vs a defined schema. :param conf_schema: The Schema object that defines the required structure. @@ -55,7 +57,7 @@ def check(conf_schema: Schema, conf: Dict[str, List[Any]]) -> bool: return False -def check_exclusions_schema(cfg: Dict[str, List[str]]) -> bool: +def check_exclusions_schema(cfg: dict[str, list[str]]) -> bool: """Determine whether or not the exclusions file meets the required format""" result = check(EXCLUSIONS_TEMPLATE_SCHEMA, cfg) if result: @@ -64,7 +66,7 @@ def check_exclusions_schema(cfg: Dict[str, List[str]]) -> bool: raise Exception("The required format of the exclusions template is incorrect. Please try again.") -def check_authorization_details_schema(cfg: Dict[str, List[Any]]) -> bool: +def check_authorization_details_schema(cfg: dict[str, list[Any]]) -> bool: """Determine whether or not the file meets the required format of the authorizations file""" result = check(AUTHORIZATION_DETAILS_SCHEMA, cfg) return result diff --git a/pyproject.toml b/pyproject.toml index a3acdaeb..5293913d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ select = [ "B", "E", "F", + "FA", "FURB", "I", "ISC", @@ -37,7 +38,7 @@ select = [ "S", "SIM", "T10", -# "UP", + "UP", "W", "YTT", ]