diff --git a/aws_lambda_powertools/utilities/data_classes/active_mq_event.py b/aws_lambda_powertools/utilities/data_classes/active_mq_event.py index ec66918e2ce..8f5c952da3f 100644 --- a/aws_lambda_powertools/utilities/data_classes/active_mq_event.py +++ b/aws_lambda_powertools/utilities/data_classes/active_mq_event.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from functools import cached_property -from typing import Any, Dict, Iterator, Optional +from typing import Any, Iterator from aws_lambda_powertools.utilities.data_classes.common import DictWrapper from aws_lambda_powertools.utilities.data_classes.shared_functions import base64_decode @@ -62,32 +64,32 @@ def destination_physicalname(self) -> str: return self["destination"]["physicalName"] @property - def delivery_mode(self) -> Optional[int]: + def delivery_mode(self) -> int | None: """persistent or non-persistent delivery""" return self.get("deliveryMode") @property - def correlation_id(self) -> Optional[str]: + def correlation_id(self) -> str | None: """User defined correlation id""" return self.get("correlationID") @property - def reply_to(self) -> Optional[str]: + def reply_to(self) -> str | None: """User defined reply to""" return self.get("replyTo") @property - def get_type(self) -> Optional[str]: + def get_type(self) -> str | None: """User defined message type""" return self.get("type") @property - def expiration(self) -> Optional[int]: + def expiration(self) -> int | None: """Expiration attribute whose value is given in milliseconds""" return self.get("expiration") @property - def priority(self) -> Optional[int]: + def priority(self) -> int | None: """ JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as the highest. In addition, clients should consider priorities 0-4 as @@ -110,9 +112,9 @@ class ActiveMQEvent(DictWrapper): - https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/ """ - def __init__(self, data: Dict[str, Any]): + def __init__(self, data: dict[str, Any]): super().__init__(data) - self._messages: Optional[Iterator[ActiveMQMessage]] = None + self._messages: Iterator[ActiveMQMessage] | None = None @property def event_source(self) -> str: diff --git a/aws_lambda_powertools/utilities/data_classes/alb_event.py b/aws_lambda_powertools/utilities/data_classes/alb_event.py index d5aa076c36a..1e403d6f692 100644 --- a/aws_lambda_powertools/utilities/data_classes/alb_event.py +++ b/aws_lambda_powertools/utilities/data_classes/alb_event.py @@ -1,4 +1,6 @@ -from typing import Any, Dict, List +from __future__ import annotations + +from typing import Any from aws_lambda_powertools.shared.headers_serializer import ( BaseHeadersSerializer, @@ -33,19 +35,19 @@ def request_context(self) -> ALBEventRequestContext: return ALBEventRequestContext(self._data) @property - def multi_value_query_string_parameters(self) -> Dict[str, List[str]]: + def multi_value_query_string_parameters(self) -> dict[str, list[str]]: return self.get("multiValueQueryStringParameters") or {} @property - def resolved_query_string_parameters(self) -> Dict[str, List[str]]: + def resolved_query_string_parameters(self) -> dict[str, list[str]]: return self.multi_value_query_string_parameters or super().resolved_query_string_parameters @property - def multi_value_headers(self) -> Dict[str, List[str]]: + def multi_value_headers(self) -> dict[str, list[str]]: return CaseInsensitiveDict(self.get("multiValueHeaders")) @property - def resolved_headers_field(self) -> Dict[str, Any]: + def resolved_headers_field(self) -> dict[str, Any]: return self.multi_value_headers or self.headers def header_serializer(self) -> BaseHeadersSerializer: diff --git a/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py b/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py index a9c0bf90e23..09d7b7ecb85 100644 --- a/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py +++ b/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import enum import re -from typing import Any, Dict, List, Optional +from typing import Any from aws_lambda_powertools.utilities.data_classes.common import ( BaseRequestContext, @@ -141,19 +143,19 @@ def http_method(self) -> str: return self["httpMethod"] @property - def headers(self) -> Dict[str, str]: + def headers(self) -> dict[str, str]: return CaseInsensitiveDict(self["headers"]) @property - def query_string_parameters(self) -> Dict[str, str]: + def query_string_parameters(self) -> dict[str, str]: return self["queryStringParameters"] @property - def path_parameters(self) -> Dict[str, str]: + def path_parameters(self) -> dict[str, str]: return self["pathParameters"] @property - def stage_variables(self) -> Dict[str, str]: + def stage_variables(self) -> dict[str, str]: return self["stageVariables"] @property @@ -193,7 +195,7 @@ def parsed_arn(self) -> APIGatewayRouteArn: return parse_api_gateway_arn(self.route_arn) @property - def identity_source(self) -> List[str]: + def identity_source(self) -> list[str]: """The identity source for which authorization is requested. For a REQUEST authorizer, this is optional. The value is a set of one or more mapping expressions of the @@ -217,17 +219,17 @@ def raw_query_string(self) -> str: return self["rawQueryString"] @property - def cookies(self) -> List[str]: + def cookies(self) -> list[str]: """Cookies""" return self["cookies"] @property - def headers(self) -> Dict[str, str]: + def headers(self) -> dict[str, str]: """Http headers""" return CaseInsensitiveDict(self["headers"]) @property - def query_string_parameters(self) -> Dict[str, str]: + def query_string_parameters(self) -> dict[str, str]: return self["queryStringParameters"] @property @@ -235,11 +237,11 @@ def request_context(self) -> BaseRequestContextV2: return BaseRequestContextV2(self._data) @property - def path_parameters(self) -> Dict[str, str]: + def path_parameters(self) -> dict[str, str]: return self.get("pathParameters") or {} @property - def stage_variables(self) -> Dict[str, str]: + def stage_variables(self) -> dict[str, str]: return self.get("stageVariables") or {} @@ -253,7 +255,7 @@ class APIGatewayAuthorizerResponseV2: is authorized to make calls to the GraphQL API. If this value is true, execution of the GraphQL API continues. If this value is false, an UnauthorizedException is raised - context: Dict[str, Any], optional + context: dict[str, Any], optional A JSON object visible as `event.requestContext.authorizer` lambda event The context object only supports key-value pairs. Nested keys are not supported. @@ -264,14 +266,14 @@ class APIGatewayAuthorizerResponseV2: def __init__( self, authorize: bool = False, - context: Optional[Dict[str, Any]] = None, + context: dict[str, Any] | None = None, ): self.authorize = authorize self.context = context def asdict(self) -> dict: """Return the response as a dict""" - response: Dict = {"isAuthorized": self.authorize} + response: dict = {"isAuthorized": self.authorize} if self.context: response["context"] = self.context @@ -329,8 +331,8 @@ def __init__( aws_account_id: str, api_id: str, stage: str, - context: Optional[Dict] = None, - usage_identifier_key: Optional[str] = None, + context: dict | None = None, + usage_identifier_key: str | None = None, partition: str = "aws", ): """ @@ -357,7 +359,7 @@ def __init__( greedily expand over '/' or other separators. See https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_resource.html for more details. - context : Dict, optional + context : dict, optional Optional, context. Note: only names of type string and values of type int, string or boolean are supported usage_identifier_key: str, optional @@ -375,8 +377,8 @@ def __init__( self.stage = stage self.context = context self.usage_identifier_key = usage_identifier_key - self._allow_routes: List[Dict] = [] - self._deny_routes: List[Dict] = [] + self._allow_routes: list[dict] = [] + self._deny_routes: list[dict] = [] self._resource_pattern = re.compile(self.path_regex) self.partition = partition @@ -384,9 +386,9 @@ def __init__( def from_route_arn( arn: str, principal_id: str, - context: Optional[Dict] = None, - usage_identifier_key: Optional[str] = None, - ) -> "APIGatewayAuthorizerResponse": + context: dict | None = None, + usage_identifier_key: str | None = None, + ) -> APIGatewayAuthorizerResponse: parsed_arn = parse_api_gateway_arn(arn) return APIGatewayAuthorizerResponse( principal_id, @@ -398,7 +400,7 @@ def from_route_arn( usage_identifier_key, ) - def _add_route(self, effect: str, http_method: str, resource: str, conditions: Optional[List[Dict]] = None): + def _add_route(self, effect: str, http_method: str, resource: str, conditions: list[dict] | None = None): """Adds a route to the internal lists of allowed or denied routes. Each object in the internal list contains a resource ARN and a condition statement. The condition statement can be null.""" @@ -427,17 +429,17 @@ def _add_route(self, effect: str, http_method: str, resource: str, conditions: O self._deny_routes.append(route) @staticmethod - def _get_empty_statement(effect: str) -> Dict[str, Any]: + def _get_empty_statement(effect: str) -> dict[str, Any]: """Returns an empty statement object prepopulated with the correct action and the desired effect.""" return {"Action": "execute-api:Invoke", "Effect": effect.capitalize(), "Resource": []} - def _get_statement_for_effect(self, effect: str, routes: List[Dict]) -> List[Dict]: + def _get_statement_for_effect(self, effect: str, routes: list[dict]) -> list[dict]: """This function loops over an array of objects containing a `resourceArn` and `conditions` statement and generates the array of statements for the policy.""" if not routes: return [] - statements: List[Dict] = [] + statements: list[dict] = [] statement = self._get_empty_statement(effect) for route in routes: @@ -476,7 +478,7 @@ def deny_all_routes(self, http_method: str = HttpVerb.ALL.value): self._add_route(effect="Deny", http_method=http_method, resource="*") - def allow_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None): + def allow_route(self, http_method: str, resource: str, conditions: list[dict] | None = None): """Adds an API Gateway method (Http verb + Resource path) to the list of allowed methods for the policy. @@ -484,7 +486,7 @@ def allow_route(self, http_method: str, resource: str, conditions: Optional[List conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition""" self._add_route(effect="Allow", http_method=http_method, resource=resource, conditions=conditions) - def deny_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None): + def deny_route(self, http_method: str, resource: str, conditions: list[dict] | None = None): """Adds an API Gateway method (Http verb + Resource path) to the list of denied methods for the policy. @@ -492,7 +494,7 @@ def deny_route(self, http_method: str, resource: str, conditions: Optional[List[ conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition""" self._add_route(effect="Deny", http_method=http_method, resource=resource, conditions=conditions) - def asdict(self) -> Dict[str, Any]: + def asdict(self) -> dict[str, Any]: """Generates the policy document based on the internal lists of allowed and denied conditions. This will generate a policy with two main statements for the effect: one statement for Allow and one statement for Deny. @@ -500,7 +502,7 @@ def asdict(self) -> Dict[str, Any]: if len(self._allow_routes) == 0 and len(self._deny_routes) == 0: raise ValueError("No statements defined for the policy") - response: Dict[str, Any] = { + response: dict[str, Any] = { "principalId": self.principal_id, "policyDocument": {"Version": "2012-10-17", "Statement": []}, } diff --git a/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py b/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py index f010dad80c3..f173742fff3 100644 --- a/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py +++ b/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from functools import cached_property -from typing import Any, Dict, List, Optional +from typing import Any from aws_lambda_powertools.shared.headers_serializer import ( BaseHeadersSerializer, @@ -17,11 +19,11 @@ class APIGatewayEventAuthorizer(DictWrapper): @property - def claims(self) -> Dict[str, Any]: + def claims(self) -> dict[str, Any]: return self.get("claims") or {} # key might exist but can be `null` @property - def scopes(self) -> List[str]: + def scopes(self) -> list[str]: return self.get("scopes") or [] # key might exist but can be `null` @property @@ -31,11 +33,11 @@ def principal_id(self) -> str: return self.get("principalId") or "" # key might exist but can be `null` @property - def integration_latency(self) -> Optional[int]: + def integration_latency(self) -> int | None: """The authorizer latency in ms.""" return self.get("integrationLatency") - def get_context(self) -> Dict[str, Any]: + def get_context(self) -> dict[str, Any]: """Retrieve the authorization context details injected by a Lambda Authorizer. Example @@ -49,7 +51,7 @@ def get_context(self) -> Dict[str, Any]: Returns: -------- - Dict[str, Any] + dict[str, Any] A dictionary containing Lambda authorization context details. """ return self._data @@ -57,37 +59,37 @@ def get_context(self) -> Dict[str, Any]: class APIGatewayEventRequestContext(BaseRequestContext): @property - def connected_at(self) -> Optional[int]: + def connected_at(self) -> int | None: """The Epoch-formatted connection time. (WebSocket API)""" return self["requestContext"].get("connectedAt") @property - def connection_id(self) -> Optional[str]: + def connection_id(self) -> str | None: """A unique ID for the connection that can be used to make a callback to the client. (WebSocket API)""" return self["requestContext"].get("connectionId") @property - def event_type(self) -> Optional[str]: + def event_type(self) -> str | None: """The event type: `CONNECT`, `MESSAGE`, or `DISCONNECT`. (WebSocket API)""" return self["requestContext"].get("eventType") @property - def message_direction(self) -> Optional[str]: + def message_direction(self) -> str | None: """Message direction (WebSocket API)""" return self["requestContext"].get("messageDirection") @property - def message_id(self) -> Optional[str]: + def message_id(self) -> str | None: """A unique server-side ID for a message. Available only when the `eventType` is `MESSAGE`.""" return self["requestContext"].get("messageId") @property - def operation_name(self) -> Optional[str]: + def operation_name(self) -> str | None: """The name of the operation being performed""" return self["requestContext"].get("operationName") @property - def route_key(self) -> Optional[str]: + def route_key(self) -> str | None: """The selected route key.""" return self["requestContext"].get("routeKey") @@ -114,22 +116,22 @@ def resource(self) -> str: return self["resource"] @property - def multi_value_headers(self) -> Dict[str, List[str]]: + def multi_value_headers(self) -> dict[str, list[str]]: return CaseInsensitiveDict(self.get("multiValueHeaders")) @property - def multi_value_query_string_parameters(self) -> Dict[str, List[str]]: + def multi_value_query_string_parameters(self) -> dict[str, list[str]]: return self.get("multiValueQueryStringParameters") or {} # key might exist but can be `null` @property - def resolved_query_string_parameters(self) -> Dict[str, List[str]]: + def resolved_query_string_parameters(self) -> dict[str, list[str]]: if self.multi_value_query_string_parameters: return self.multi_value_query_string_parameters return super().resolved_query_string_parameters @property - def resolved_headers_field(self) -> Dict[str, Any]: + def resolved_headers_field(self) -> dict[str, Any]: return self.multi_value_headers or self.headers @property @@ -137,11 +139,11 @@ def request_context(self) -> APIGatewayEventRequestContext: return APIGatewayEventRequestContext(self._data) @property - def path_parameters(self) -> Dict[str, str]: + def path_parameters(self) -> dict[str, str]: return self.get("pathParameters") or {} @property - def stage_variables(self) -> Dict[str, str]: + def stage_variables(self) -> dict[str, str]: return self.get("stageVariables") or {} def header_serializer(self) -> BaseHeadersSerializer: @@ -164,11 +166,11 @@ def caller_id(self) -> str: """The principal identifier of the caller making the request.""" return self.get("callerId") or "" # key might exist but can be `null` - def _cognito_identity(self) -> Dict: + def _cognito_identity(self) -> dict: return self.get("cognitoIdentity") or {} # not available in FunctionURL; key might exist but can be `null` @property - def cognito_amr(self) -> List[str]: + def cognito_amr(self) -> list[str]: """This represents how the user was authenticated. AMR stands for Authentication Methods References as per the openid spec""" return self._cognito_identity().get("amr", []) @@ -203,21 +205,21 @@ def user_id(self) -> str: class RequestContextV2Authorizer(DictWrapper): @property - def jwt_claim(self) -> Dict[str, Any]: + def jwt_claim(self) -> dict[str, Any]: jwt = self.get("jwt") or {} # not available in FunctionURL; key might exist but can be `null` return jwt.get("claims") or {} # key might exist but can be `null` @property - def jwt_scopes(self) -> List[str]: + def jwt_scopes(self) -> list[str]: jwt = self.get("jwt") or {} # not available in FunctionURL; key might exist but can be `null` return jwt.get("scopes", []) @property - def get_lambda(self) -> Dict[str, Any]: + def get_lambda(self) -> dict[str, Any]: """Lambda authorization context details""" return self.get("lambda") or {} # key might exist but can be `null` - def get_context(self) -> Dict[str, Any]: + def get_context(self) -> dict[str, Any]: """Retrieve the authorization context details injected by a Lambda Authorizer. Example @@ -231,7 +233,7 @@ def get_context(self) -> Dict[str, Any]: Returns: -------- - Dict[str, Any] + dict[str, Any] A dictionary containing Lambda authorization context details. """ return self.get_lambda @@ -284,7 +286,7 @@ def raw_query_string(self) -> str: return self["rawQueryString"] @property - def cookies(self) -> List[str]: + def cookies(self) -> list[str]: return self.get("cookies") or [] @property @@ -292,11 +294,11 @@ def request_context(self) -> RequestContextV2: return RequestContextV2(self._data) @property - def path_parameters(self) -> Dict[str, str]: + def path_parameters(self) -> dict[str, str]: return self.get("pathParameters") or {} @property - def stage_variables(self) -> Dict[str, str]: + def stage_variables(self) -> dict[str, str]: return self.get("stageVariables") or {} @property @@ -315,5 +317,5 @@ def header_serializer(self): return HttpApiHeadersSerializer() @cached_property - def resolved_headers_field(self) -> Dict[str, Any]: + def resolved_headers_field(self) -> dict[str, Any]: return CaseInsensitiveDict((k, v.split(",") if "," in v else v) for k, v in self.headers.items()) diff --git a/aws_lambda_powertools/utilities/data_classes/appsync_authorizer_event.py b/aws_lambda_powertools/utilities/data_classes/appsync_authorizer_event.py index b9366871211..a111084b306 100644 --- a/aws_lambda_powertools/utilities/data_classes/appsync_authorizer_event.py +++ b/aws_lambda_powertools/utilities/data_classes/appsync_authorizer_event.py @@ -1,4 +1,6 @@ -from typing import Any, Dict, List, Optional +from __future__ import annotations + +from typing import Any from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -27,12 +29,12 @@ def query_string(self) -> str: return self["requestContext"]["queryString"] @property - def operation_name(self) -> Optional[str]: + def operation_name(self) -> str | None: """GraphQL operation name, optional""" return self["requestContext"].get("operationName") @property - def variables(self) -> Dict: + def variables(self) -> dict: """GraphQL variables""" return self["requestContext"]["variables"] @@ -73,13 +75,13 @@ class AppSyncAuthorizerResponse: cached for. If no value is returned, the value from the API (if configured) or the default of 300 seconds (five minutes) is used. If this is 0, the response is not cached. - resolver_context: Dict[str, Any], optional + resolver_context: dict[str, Any], optional A JSON object visible as `$ctx.identity.resolverContext` in resolver templates The resolverContext object only supports key-value pairs. Nested keys are not supported. Warning: The total size of this JSON object must not exceed 5MB. - deny_fields: List[str], optional + deny_fields: list[str], optional A list of fields that will be set to `null` regardless of the resolver's return. A field is either `TypeName.FieldName`, or an ARN such as @@ -91,9 +93,9 @@ class AppSyncAuthorizerResponse: def __init__( self, authorize: bool = False, - max_age: Optional[int] = None, - resolver_context: Optional[Dict[str, Any]] = None, - deny_fields: Optional[List[str]] = None, + max_age: int | None = None, + resolver_context: dict[str, Any] | None = None, + deny_fields: list[str] | None = None, ): self.authorize = authorize self.max_age = max_age @@ -102,7 +104,7 @@ def __init__( def asdict(self) -> dict: """Return the response as a dict""" - response: Dict = {"isAuthorized": self.authorize} + response: dict = {"isAuthorized": self.authorize} if self.max_age is not None: response["ttlOverride"] = self.max_age diff --git a/aws_lambda_powertools/utilities/data_classes/appsync_resolver_event.py b/aws_lambda_powertools/utilities/data_classes/appsync_resolver_event.py index 4a02177b62e..99e3b7015a4 100644 --- a/aws_lambda_powertools/utilities/data_classes/appsync_resolver_event.py +++ b/aws_lambda_powertools/utilities/data_classes/appsync_resolver_event.py @@ -1,9 +1,11 @@ -from typing import Any, Dict, List, Optional, Union +from __future__ import annotations + +from typing import Any from aws_lambda_powertools.utilities.data_classes.common import CaseInsensitiveDict, DictWrapper -def get_identity_object(identity: Optional[dict]) -> Any: +def get_identity_object(identity: dict | None) -> Any: """Get the identity object based on the best detected type""" # API_KEY authorization if identity is None: @@ -21,7 +23,7 @@ class AppSyncIdentityIAM(DictWrapper): """AWS_IAM authorization""" @property - def source_ip(self) -> List[str]: + def source_ip(self) -> list[str]: """The source IP address of the caller received by AWS AppSync.""" return self["sourceIp"] @@ -66,7 +68,7 @@ class AppSyncIdentityCognito(DictWrapper): """AMAZON_COGNITO_USER_POOLS authorization""" @property - def source_ip(self) -> List[str]: + def source_ip(self) -> list[str]: """The source IP address of the caller received by AWS AppSync.""" return self["sourceIp"] @@ -81,7 +83,7 @@ def sub(self) -> str: return self["sub"] @property - def claims(self) -> Dict[str, str]: + def claims(self) -> dict[str, str]: """The claims that the user has.""" return self["claims"] @@ -91,7 +93,7 @@ def default_auth_strategy(self) -> str: return self["defaultAuthStrategy"] @property - def groups(self) -> List[str]: + def groups(self) -> list[str]: """List of OIDC groups""" return self["groups"] @@ -115,18 +117,18 @@ def parent_type_name(self) -> str: return self["parentTypeName"] @property - def variables(self) -> Dict[str, str]: + def variables(self) -> dict[str, str]: """A map which holds all variables that are passed into the GraphQL request.""" return self.get("variables") or {} @property - def selection_set_list(self) -> List[str]: + def selection_set_list(self) -> list[str]: """A list representation of the fields in the GraphQL selection set. Fields that are aliased will only be referenced by the alias name, not the field name.""" return self.get("selectionSetList") or [] @property - def selection_set_graphql(self) -> Optional[str]: + def selection_set_graphql(self) -> str | None: """A string representation of the selection set, formatted as GraphQL schema definition language (SDL). Although fragments are not be merged into the selection set, inline fragments are preserved.""" return self.get("selectionSetGraphQL") @@ -147,7 +149,7 @@ class AppSyncResolverEvent(DictWrapper): def __init__(self, data: dict): super().__init__(data) - info: Optional[dict] = data.get("info") + info: dict | None = data.get("info") if not info: info = {"fieldName": self.get("fieldName"), "parentTypeName": self.get("typeName")} @@ -164,12 +166,12 @@ def field_name(self) -> str: return self.info.field_name @property - def arguments(self) -> Dict[str, Any]: + def arguments(self) -> dict[str, Any]: """A map that contains all GraphQL arguments for this field.""" return self["arguments"] @property - def identity(self) -> Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]: + def identity(self) -> AppSyncIdentityIAM | AppSyncIdentityCognito | None: """An object that contains information about the caller. Depending on the type of identify found: @@ -181,17 +183,17 @@ def identity(self) -> Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]: return get_identity_object(self.get("identity")) @property - def source(self) -> Dict[str, Any]: + def source(self) -> dict[str, Any]: """A map that contains the resolution of the parent field.""" return self.get("source") or {} @property - def request_headers(self) -> Dict[str, str]: + def request_headers(self) -> dict[str, str]: """Request headers""" return CaseInsensitiveDict(self["request"]["headers"]) @property - def prev_result(self) -> Optional[Dict[str, Any]]: + def prev_result(self) -> dict[str, Any] | None: """It represents the result of whatever previous operation was executed in a pipeline resolver.""" prev = self.get("prev") if not prev: diff --git a/aws_lambda_powertools/utilities/data_classes/aws_config_rule_event.py b/aws_lambda_powertools/utilities/data_classes/aws_config_rule_event.py index f8d4f991cc0..040228ee884 100644 --- a/aws_lambda_powertools/utilities/data_classes/aws_config_rule_event.py +++ b/aws_lambda_powertools/utilities/data_classes/aws_config_rule_event.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Dict, List, Optional +from typing import Any from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -36,7 +36,7 @@ def get_invoke_event( class AWSConfigConfigurationChanged(DictWrapper): @property - def configuration_item_diff(self) -> Dict: + def configuration_item_diff(self) -> dict: """The configuration item diff of the ConfigurationItemChangeNotification event.""" return self["configurationItemDiff"] @@ -46,7 +46,7 @@ def configuration_item(self) -> AWSConfigConfigurationItemChanged: return AWSConfigConfigurationItemChanged(self["configurationItem"]) @property - def raw_configuration_item(self) -> Dict: + def raw_configuration_item(self) -> dict: """The raw configuration item of the ConfigurationItemChangeNotification event.""" return self["configurationItem"] @@ -68,27 +68,27 @@ def notification_creation_time(self) -> str: class AWSConfigConfigurationItemChanged(DictWrapper): @property - def related_events(self) -> List: + def related_events(self) -> list: """The related events of the ConfigurationItemChangeNotification event.""" return self["relatedEvents"] @property - def relationships(self) -> List: + def relationships(self) -> list: """The relationships of the ConfigurationItemChangeNotification event.""" return self["relationships"] @property - def configuration(self) -> Dict: + def configuration(self) -> dict: """The configuration of the ConfigurationItemChangeNotification event.""" return self["configuration"] @property - def supplementary_configuration(self) -> Dict: + def supplementary_configuration(self) -> dict: """The supplementary configuration of the ConfigurationItemChangeNotification event.""" return self["supplementaryConfiguration"] @property - def tags(self) -> Dict: + def tags(self) -> dict: """The tags of the ConfigurationItemChangeNotification event.""" return self["tags"] @@ -286,10 +286,10 @@ class AWSConfigRuleEvent(DictWrapper): - https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_lambda-functions.html """ - def __init__(self, data: Dict[str, Any]): + def __init__(self, data: dict[str, Any]): super().__init__(data) - self._invoking_event: Optional[Any] = None - self._rule_parameters: Optional[Any] = None + self._invoking_event: Any | None = None + self._rule_parameters: Any | None = None @property def version(self) -> str: @@ -312,7 +312,7 @@ def raw_invoking_event(self) -> str: return self["invokingEvent"] @property - def rule_parameters(self) -> Dict: + def rule_parameters(self) -> dict: """The parameters of the event.""" if self._rule_parameters is None: self._rule_parameters = self._json_deserializer(self["ruleParameters"]) @@ -355,6 +355,6 @@ def accountid(self) -> str: return self["accountId"] @property - def evalution_mode(self) -> Optional[str]: + def evalution_mode(self) -> str | None: """The evalution mode of the event.""" return self.get("evaluationMode") diff --git a/aws_lambda_powertools/utilities/data_classes/bedrock_agent_event.py b/aws_lambda_powertools/utilities/data_classes/bedrock_agent_event.py index 71c6f44aa1b..388e556a812 100644 --- a/aws_lambda_powertools/utilities/data_classes/bedrock_agent_event.py +++ b/aws_lambda_powertools/utilities/data_classes/bedrock_agent_event.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from functools import cached_property -from typing import Any, Dict, List, Optional +from typing import Any from aws_lambda_powertools.utilities.data_classes.common import BaseProxyEvent, DictWrapper @@ -38,13 +40,13 @@ def value(self) -> str: class BedrockAgentRequestMedia(DictWrapper): @property - def properties(self) -> List[BedrockAgentProperty]: + def properties(self) -> list[BedrockAgentProperty]: return [BedrockAgentProperty(x) for x in self["properties"]] class BedrockAgentRequestBody(DictWrapper): @property - def content(self) -> Dict[str, BedrockAgentRequestMedia]: + def content(self) -> dict[str, BedrockAgentRequestMedia]: return {k: BedrockAgentRequestMedia(v) for k, v in self["content"].items()} @@ -80,12 +82,12 @@ def http_method(self) -> str: return self["httpMethod"] @property - def parameters(self) -> List[BedrockAgentProperty]: + def parameters(self) -> list[BedrockAgentProperty]: parameters = self.get("parameters") or [] return [BedrockAgentProperty(x) for x in parameters] @property - def request_body(self) -> Optional[BedrockAgentRequestBody]: + def request_body(self) -> BedrockAgentRequestBody | None: return BedrockAgentRequestBody(self["requestBody"]) if self.get("requestBody") else None @property @@ -93,11 +95,11 @@ def agent(self) -> BedrockAgentInfo: return BedrockAgentInfo(self["agent"]) @property - def session_attributes(self) -> Dict[str, str]: + def session_attributes(self) -> dict[str, str]: return self["sessionAttributes"] @property - def prompt_session_attributes(self) -> Dict[str, str]: + def prompt_session_attributes(self) -> dict[str, str]: return self["promptSessionAttributes"] # The following methods add compatibility with BaseProxyEvent @@ -106,14 +108,14 @@ def path(self) -> str: return self["apiPath"] @cached_property - def query_string_parameters(self) -> Dict[str, str]: + def query_string_parameters(self) -> dict[str, str]: # In Bedrock Agent events, query string parameters are passed as undifferentiated parameters, # together with the other parameters. So we just return all parameters here. parameters = self.get("parameters") or [] return {x["name"]: x["value"] for x in parameters} @property - def resolved_headers_field(self) -> Dict[str, Any]: + def resolved_headers_field(self) -> dict[str, Any]: return {} @cached_property diff --git a/aws_lambda_powertools/utilities/data_classes/cloud_watch_alarm_event.py b/aws_lambda_powertools/utilities/data_classes/cloud_watch_alarm_event.py index 78106b576e0..7604a38bf63 100644 --- a/aws_lambda_powertools/utilities/data_classes/cloud_watch_alarm_event.py +++ b/aws_lambda_powertools/utilities/data_classes/cloud_watch_alarm_event.py @@ -1,7 +1,7 @@ from __future__ import annotations from functools import cached_property -from typing import Any, List, Literal, Optional +from typing import Any, Literal from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -30,7 +30,7 @@ def reason_data(self) -> str: return self["reasonData"] @cached_property - def reason_data_decoded(self) -> Optional[Any]: + def reason_data_decoded(self) -> Any | None: """ Deserialized version of reason_data. """ @@ -38,7 +38,7 @@ def reason_data_decoded(self) -> Optional[Any]: return self._json_deserializer(self.reason_data) if self.reason_data else None @property - def actions_suppressed_by(self) -> Optional[Literal["Alarm", "ExtensionPeriod", "WaitPeriod"]]: + def actions_suppressed_by(self) -> Literal["Alarm", "ExtensionPeriod", "WaitPeriod"] | None: """ Describes why the actions when the value is `ALARM` are suppressed in a composite alarm. @@ -46,7 +46,7 @@ def actions_suppressed_by(self) -> Optional[Literal["Alarm", "ExtensionPeriod", return self.get("actionsSuppressedBy", None) @property - def actions_suppressed_reason(self) -> Optional[str]: + def actions_suppressed_reason(self) -> str | None: """ Captures the reason for action suppression. """ @@ -69,14 +69,14 @@ def metric_id(self) -> str: return self["id"] @property - def expression(self) -> Optional[str]: + def expression(self) -> str | None: """ Optional expression of the alarm metric. """ return self.get("expression", None) @property - def label(self) -> Optional[str]: + def label(self) -> str | None: """ Optional label of the alarm metric. """ @@ -96,21 +96,21 @@ def metric_stat(self) -> CloudWatchAlarmMetricStat: class CloudWatchAlarmMetricStat(DictWrapper): @property - def period(self) -> Optional[int]: + def period(self) -> int | None: """ Metric evaluation period, in seconds. """ return self.get("period", None) @property - def stat(self) -> Optional[str]: + def stat(self) -> str | None: """ Statistical aggregation of metric points, e.g. Average, SampleCount, etc. """ return self.get("stat", None) @property - def unit(self) -> Optional[str]: + def unit(self) -> str | None: """ Unit for metric. """ @@ -156,42 +156,42 @@ def configuration(self) -> CloudWatchAlarmConfiguration: class CloudWatchAlarmConfiguration(DictWrapper): @property - def description(self) -> Optional[str]: + def description(self) -> str | None: """ Optional description for the Alarm. """ return self.get("description", None) @property - def alarm_rule(self) -> Optional[str]: + def alarm_rule(self) -> str | None: """ Optional description for the Alarm rule in case of composite alarm. """ return self.get("alarmRule", None) @property - def alarm_actions_suppressor(self) -> Optional[str]: + def alarm_actions_suppressor(self) -> str | None: """ Optional action suppression for the Alarm rule in case of composite alarm. """ return self.get("actionsSuppressor", None) @property - def alarm_actions_suppressor_wait_period(self) -> Optional[str]: + def alarm_actions_suppressor_wait_period(self) -> str | None: """ Optional action suppression wait period for the Alarm rule in case of composite alarm. """ return self.get("actionsSuppressorWaitPeriod", None) @property - def alarm_actions_suppressor_extension_period(self) -> Optional[str]: + def alarm_actions_suppressor_extension_period(self) -> str | None: """ Optional action suppression extension period for the Alarm rule in case of composite alarm. """ return self.get("actionsSuppressorExtensionPeriod", None) @property - def metrics(self) -> List[CloudWatchAlarmMetric]: + def metrics(self) -> list[CloudWatchAlarmMetric]: """ The metrics evaluated for the Alarm. """ diff --git a/aws_lambda_powertools/utilities/data_classes/cloud_watch_custom_widget_event.py b/aws_lambda_powertools/utilities/data_classes/cloud_watch_custom_widget_event.py index 40219f944ba..b1f1503f104 100644 --- a/aws_lambda_powertools/utilities/data_classes/cloud_watch_custom_widget_event.py +++ b/aws_lambda_powertools/utilities/data_classes/cloud_watch_custom_widget_event.py @@ -1,4 +1,6 @@ -from typing import Any, Dict, Optional +from __future__ import annotations + +from typing import Any from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -37,17 +39,17 @@ def end(self) -> int: return self["end"] @property - def relative_start(self) -> Optional[int]: + def relative_start(self) -> int | None: """The relative start time within the time range""" return self.get("relativeStart") @property - def zoom_start(self) -> Optional[int]: + def zoom_start(self) -> int | None: """The start time within the zoomed time range""" return (self.get("zoom") or {}).get("start") @property - def zoom_end(self) -> Optional[int]: + def zoom_end(self) -> int | None: """The end time within the zoomed time range""" return (self.get("zoom") or {}).get("end") @@ -114,12 +116,12 @@ def title(self) -> str: return self["title"] @property - def params(self) -> Dict[str, Any]: + def params(self) -> dict[str, Any]: """Get widget parameters""" return self["params"] @property - def forms(self) -> Dict[str, Any]: + def forms(self) -> dict[str, Any]: """Get widget form data""" return self["forms"]["all"] @@ -150,7 +152,7 @@ def describe(self) -> bool: return bool(self.get("describe", False)) @property - def widget_context(self) -> Optional[CloudWatchWidgetContext]: + def widget_context(self) -> CloudWatchWidgetContext | None: """The widget context""" if self.get("widgetContext"): return CloudWatchWidgetContext(self["widgetContext"]) diff --git a/aws_lambda_powertools/utilities/data_classes/cloud_watch_logs_event.py b/aws_lambda_powertools/utilities/data_classes/cloud_watch_logs_event.py index 7a5fe7cec76..d48648e6976 100644 --- a/aws_lambda_powertools/utilities/data_classes/cloud_watch_logs_event.py +++ b/aws_lambda_powertools/utilities/data_classes/cloud_watch_logs_event.py @@ -1,6 +1,7 @@ +from __future__ import annotations + import base64 import zlib -from typing import Dict, List, Optional from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -23,7 +24,7 @@ def message(self) -> str: return self["message"] @property - def extracted_fields(self) -> Dict[str, str]: + def extracted_fields(self) -> dict[str, str]: """Get the `extractedFields` property""" return self.get("extractedFields") or {} @@ -45,7 +46,7 @@ def log_stream(self) -> str: return self["logStream"] @property - def subscription_filters(self) -> List[str]: + def subscription_filters(self) -> list[str]: """The list of subscription filter names that matched with the originating log data.""" return self["subscriptionFilters"] @@ -59,12 +60,12 @@ def message_type(self) -> str: return self["messageType"] @property - def policy_level(self) -> Optional[str]: + def policy_level(self) -> str | None: """The level at which the policy was enforced.""" return self.get("policyLevel") @property - def log_events(self) -> List[CloudWatchLogsLogEvent]: + def log_events(self) -> list[CloudWatchLogsLogEvent]: """The actual log data, represented as an array of log event records. The ID property is a unique identifier for every log event. diff --git a/aws_lambda_powertools/utilities/data_classes/cloudformation_custom_resource_event.py b/aws_lambda_powertools/utilities/data_classes/cloudformation_custom_resource_event.py index 7787a719f76..175766a9d8d 100644 --- a/aws_lambda_powertools/utilities/data_classes/cloudformation_custom_resource_event.py +++ b/aws_lambda_powertools/utilities/data_classes/cloudformation_custom_resource_event.py @@ -1,4 +1,6 @@ -from typing import Any, Dict, Literal +from __future__ import annotations + +from typing import Any, Literal from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -37,9 +39,9 @@ def resource_type(self) -> str: return self["ResourceType"] @property - def resource_properties(self) -> Dict[str, Any]: + def resource_properties(self) -> dict[str, Any]: return self.get("ResourceProperties") or {} @property - def old_resource_properties(self) -> Dict[str, Any]: + def old_resource_properties(self) -> dict[str, Any]: return self.get("OldResourceProperties") or {} diff --git a/aws_lambda_powertools/utilities/data_classes/code_pipeline_job_event.py b/aws_lambda_powertools/utilities/data_classes/code_pipeline_job_event.py index 1cc409c6988..8e5fa9ebcb4 100644 --- a/aws_lambda_powertools/utilities/data_classes/code_pipeline_job_event.py +++ b/aws_lambda_powertools/utilities/data_classes/code_pipeline_job_event.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import tempfile import zipfile from functools import cached_property -from typing import Any, Dict, List, Optional +from typing import Any from urllib.parse import unquote_plus from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -14,12 +16,12 @@ def function_name(self) -> str: return self["FunctionName"] @property - def user_parameters(self) -> Optional[str]: + def user_parameters(self) -> str | None: """User parameters""" return self.get("UserParameters", None) @cached_property - def decoded_user_parameters(self) -> Dict[str, Any]: + def decoded_user_parameters(self) -> dict[str, Any]: """Json Decoded user parameters""" if self.user_parameters is not None: return self._json_deserializer(self.user_parameters) @@ -69,7 +71,7 @@ def name(self) -> str: return self["name"] @property - def revision(self) -> Optional[str]: + def revision(self) -> str | None: return self.get("revision") @property @@ -93,7 +95,7 @@ def session_token(self) -> str: return self["sessionToken"] @property - def expiration_time(self) -> Optional[int]: + def expiration_time(self) -> int | None: return self.get("expirationTime") @@ -116,12 +118,12 @@ def action_configuration(self) -> CodePipelineActionConfiguration: return CodePipelineActionConfiguration(self["actionConfiguration"]) @property - def input_artifacts(self) -> List[CodePipelineArtifact]: + def input_artifacts(self) -> list[CodePipelineArtifact]: """Represents a CodePipeline input artifact""" return [CodePipelineArtifact(item) for item in self["inputArtifacts"]] @property - def output_artifacts(self) -> List[CodePipelineArtifact]: + def output_artifacts(self) -> list[CodePipelineArtifact]: """Represents a CodePipeline output artifact""" return [CodePipelineArtifact(item) for item in self["outputArtifacts"]] @@ -131,12 +133,12 @@ def artifact_credentials(self) -> CodePipelineArtifactCredentials: return CodePipelineArtifactCredentials(self["artifactCredentials"]) @property - def continuation_token(self) -> Optional[str]: + def continuation_token(self) -> str | None: """A continuation token if continuing job""" return self.get("continuationToken") @property - def encryption_key(self) -> Optional[CodePipelineEncryptionKey]: + def encryption_key(self) -> CodePipelineEncryptionKey | None: """Represents a CodePipeline encryption key""" key_data = self.get("encryptionKey") return CodePipelineEncryptionKey(key_data) if key_data is not None else None @@ -151,7 +153,7 @@ class CodePipelineJobEvent(DictWrapper): - https://docs.aws.amazon.com/lambda/latest/dg/services-codepipeline.html """ - def __init__(self, data: Dict[str, Any]): + def __init__(self, data: dict[str, Any]): super().__init__(data) self._job = self["CodePipeline.job"] @@ -171,12 +173,12 @@ def data(self) -> CodePipelineData: return CodePipelineData(self._job["data"]) @property - def user_parameters(self) -> Optional[str]: + def user_parameters(self) -> str | None: """Action configuration user parameters""" return self.data.action_configuration.configuration.user_parameters @property - def decoded_user_parameters(self) -> Dict[str, Any]: + def decoded_user_parameters(self) -> dict[str, Any]: """Json Decoded action configuration user parameters""" return self.data.action_configuration.configuration.decoded_user_parameters @@ -216,7 +218,7 @@ def setup_s3_client(self): user_agent.register_feature_to_client(client=s3, feature="data_classes") return s3 - def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifact]: + def find_input_artifact(self, artifact_name: str) -> CodePipelineArtifact | None: """Find an input artifact by artifact name Parameters @@ -234,7 +236,7 @@ def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifa return artifact return None - def get_artifact(self, artifact_name: str, filename: str) -> Optional[str]: + def get_artifact(self, artifact_name: str, filename: str) -> str | None: """Get a file within an artifact zip on s3 Parameters diff --git a/aws_lambda_powertools/utilities/data_classes/cognito_user_pool_event.py b/aws_lambda_powertools/utilities/data_classes/cognito_user_pool_event.py index 86cf3b0601d..773422ed2ff 100644 --- a/aws_lambda_powertools/utilities/data_classes/cognito_user_pool_event.py +++ b/aws_lambda_powertools/utilities/data_classes/cognito_user_pool_event.py @@ -1,4 +1,6 @@ -from typing import Any, Dict, List, Optional +from __future__ import annotations + +from typing import Any from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -56,17 +58,17 @@ def caller_context(self) -> CallerContext: class PreSignUpTriggerEventRequest(DictWrapper): @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" return self["request"]["userAttributes"] @property - def validation_data(self) -> Dict[str, str]: + def validation_data(self) -> dict[str, str]: """One or more name-value pairs containing the validation data in the request to register a user.""" return self["request"].get("validationData") or {} @property - def client_metadata(self) -> Dict[str, str]: + def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the pre sign-up trigger.""" return self["request"].get("clientMetadata") or {} @@ -128,12 +130,12 @@ def response(self) -> PreSignUpTriggerEventResponse: class PostConfirmationTriggerEventRequest(DictWrapper): @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" return self["request"]["userAttributes"] @property - def client_metadata(self) -> Dict[str, str]: + def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the post confirmation trigger.""" return self["request"].get("clientMetadata") or {} @@ -165,12 +167,12 @@ def password(self) -> str: return self["request"]["password"] @property - def validation_data(self) -> Dict[str, str]: + def validation_data(self) -> dict[str, str]: """One or more name-value pairs containing the validation data in the request to register a user.""" return self["request"].get("validationData") or {} @property - def client_metadata(self) -> Dict[str, str]: + def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the pre sign-up trigger.""" return self["request"].get("clientMetadata") or {} @@ -178,18 +180,18 @@ def client_metadata(self) -> Dict[str, str]: class UserMigrationTriggerEventResponse(DictWrapper): @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: return self["response"]["userAttributes"] @user_attributes.setter - def user_attributes(self, value: Dict[str, str]): + def user_attributes(self, value: dict[str, str]): """It must contain one or more name-value pairs representing user attributes to be stored in the user profile in your user pool. You can include both standard and custom user attributes. Custom attributes require the custom: prefix to distinguish them from standard attributes.""" self["response"]["userAttributes"] = value @property - def final_user_status(self) -> Optional[str]: + def final_user_status(self) -> str | None: return self["response"].get("finalUserStatus") @final_user_status.setter @@ -203,7 +205,7 @@ def final_user_status(self, value: str): self["response"]["finalUserStatus"] = value @property - def message_action(self) -> Optional[str]: + def message_action(self) -> str | None: return self["response"].get("messageAction") @message_action.setter @@ -213,17 +215,17 @@ def message_action(self, value: str): self["response"]["messageAction"] = value @property - def desired_delivery_mediums(self) -> List[str]: + def desired_delivery_mediums(self) -> list[str]: return self["response"].get("desiredDeliveryMediums") or [] @desired_delivery_mediums.setter - def desired_delivery_mediums(self, value: List[str]): + def desired_delivery_mediums(self, value: list[str]): """This attribute can be set to "EMAIL" to send the welcome message by email, or "SMS" to send the welcome message by SMS. If this attribute is not returned, the welcome message will be sent by SMS.""" self["response"]["desiredDeliveryMediums"] = value @property - def force_alias_creation(self) -> Optional[bool]: + def force_alias_creation(self) -> bool | None: return self["response"].get("forceAliasCreation") @force_alias_creation.setter @@ -276,12 +278,12 @@ def username_parameter(self) -> str: return self["request"]["usernameParameter"] @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" return self["request"]["userAttributes"] @property - def client_metadata(self) -> Dict[str, str]: + def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the pre sign-up trigger.""" return self["request"].get("clientMetadata") or {} @@ -351,17 +353,17 @@ def response(self) -> CustomMessageTriggerEventResponse: class PreAuthenticationTriggerEventRequest(DictWrapper): @property - def user_not_found(self) -> Optional[bool]: + def user_not_found(self) -> bool | None: """This boolean is populated when PreventUserExistenceErrors is set to ENABLED for your User Pool client.""" return self["request"].get("userNotFound") @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes.""" return self["request"]["userAttributes"] @property - def validation_data(self) -> Dict[str, str]: + def validation_data(self) -> dict[str, str]: """One or more key-value pairs containing the validation data in the user's sign-in request.""" return self["request"].get("validationData") or {} @@ -397,12 +399,12 @@ def new_device_used(self) -> bool: return self["request"]["newDeviceUsed"] @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes.""" return self["request"]["userAttributes"] @property - def client_metadata(self) -> Dict[str, str]: + def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the post authentication trigger.""" return self["request"].get("clientMetadata") or {} @@ -433,17 +435,17 @@ def request(self) -> PostAuthenticationTriggerEventRequest: class GroupOverrideDetails(DictWrapper): @property - def groups_to_override(self) -> List[str]: + def groups_to_override(self) -> list[str]: """A list of the group names that are associated with the user that the identity token is issued for.""" return self.get("groupsToOverride") or [] @property - def iam_roles_to_override(self) -> List[str]: + def iam_roles_to_override(self) -> list[str]: """A list of the current IAM roles associated with these groups.""" return self.get("iamRolesToOverride") or [] @property - def preferred_role(self) -> Optional[str]: + def preferred_role(self) -> str | None: """A string indicating the preferred IAM role.""" return self.get("preferredRole") @@ -455,12 +457,12 @@ def group_configuration(self) -> GroupOverrideDetails: return GroupOverrideDetails(self["request"]["groupConfiguration"]) @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes.""" return self["request"]["userAttributes"] @property - def client_metadata(self) -> Dict[str, str]: + def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the pre token generation trigger.""" return self["request"].get("clientMetadata") or {} @@ -468,31 +470,31 @@ def client_metadata(self) -> Dict[str, str]: class ClaimsOverrideDetails(DictWrapper): @property - def claims_to_add_or_override(self) -> Dict[str, str]: + def claims_to_add_or_override(self) -> dict[str, str]: return self.get("claimsToAddOrOverride") or {} @claims_to_add_or_override.setter - def claims_to_add_or_override(self, value: Dict[str, str]): + def claims_to_add_or_override(self, value: dict[str, str]): """A map of one or more key-value pairs of claims to add or override. For group related claims, use groupOverrideDetails instead.""" self._data["claimsToAddOrOverride"] = value @property - def claims_to_suppress(self) -> List[str]: + def claims_to_suppress(self) -> list[str]: return self.get("claimsToSuppress") or [] @claims_to_suppress.setter - def claims_to_suppress(self, value: List[str]): + def claims_to_suppress(self, value: list[str]): """A list that contains claims to be suppressed from the identity token.""" self._data["claimsToSuppress"] = value @property - def group_configuration(self) -> Optional[GroupOverrideDetails]: + def group_configuration(self) -> GroupOverrideDetails | None: group_override_details = self.get("groupOverrideDetails") return None if group_override_details is None else GroupOverrideDetails(group_override_details) @group_configuration.setter - def group_configuration(self, value: Dict[str, Any]): + def group_configuration(self, value: dict[str, Any]): """The output object containing the current group configuration. It includes groupsToOverride, iamRolesToOverride, and preferredRole. @@ -504,12 +506,12 @@ def group_configuration(self, value: Dict[str, Any]): """ self._data["groupOverrideDetails"] = value - def set_group_configuration_groups_to_override(self, value: List[str]): + def set_group_configuration_groups_to_override(self, value: list[str]): """A list of the group names that are associated with the user that the identity token is issued for.""" self._data.setdefault("groupOverrideDetails", {}) self["groupOverrideDetails"]["groupsToOverride"] = value - def set_group_configuration_iam_roles_to_override(self, value: List[str]): + def set_group_configuration_iam_roles_to_override(self, value: list[str]): """A list of the current IAM roles associated with these groups.""" self._data.setdefault("groupOverrideDetails", {}) self["groupOverrideDetails"]["iamRolesToOverride"] = value @@ -576,30 +578,30 @@ def challenge_result(self) -> bool: return bool(self["challengeResult"]) @property - def challenge_metadata(self) -> Optional[str]: + def challenge_metadata(self) -> str | None: """Your name for the custom challenge. Used only if challengeName is CUSTOM_CHALLENGE.""" return self.get("challengeMetadata") class DefineAuthChallengeTriggerEventRequest(DictWrapper): @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" return self["request"]["userAttributes"] @property - def user_not_found(self) -> Optional[bool]: + def user_not_found(self) -> bool | None: """A Boolean that is populated when PreventUserExistenceErrors is set to ENABLED for your user pool client. A value of true means that the user id (username, email address, etc.) did not match any existing users.""" return self["request"].get("userNotFound") @property - def session(self) -> List[ChallengeResult]: + def session(self) -> list[ChallengeResult]: """An array of ChallengeResult elements, each of which contains the following elements:""" return [ChallengeResult(result) for result in self["request"]["session"]] @property - def client_metadata(self) -> Dict[str, str]: + def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the defined auth challenge trigger.""" return self["request"].get("clientMetadata") or {} @@ -665,12 +667,12 @@ def response(self) -> DefineAuthChallengeTriggerEventResponse: class CreateAuthChallengeTriggerEventRequest(DictWrapper): @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" return self["request"]["userAttributes"] @property - def user_not_found(self) -> Optional[bool]: + def user_not_found(self) -> bool | None: """This boolean is populated when PreventUserExistenceErrors is set to ENABLED for your User Pool client.""" return self["request"].get("userNotFound") @@ -680,12 +682,12 @@ def challenge_name(self) -> str: return self["request"]["challengeName"] @property - def session(self) -> List[ChallengeResult]: + def session(self) -> list[ChallengeResult]: """An array of ChallengeResult elements, each of which contains the following elements:""" return [ChallengeResult(result) for result in self["request"]["session"]] @property - def client_metadata(self) -> Dict[str, str]: + def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the creation auth challenge trigger.""" return self["request"].get("clientMetadata") or {} @@ -693,22 +695,22 @@ def client_metadata(self) -> Dict[str, str]: class CreateAuthChallengeTriggerEventResponse(DictWrapper): @property - def public_challenge_parameters(self) -> Dict[str, str]: + def public_challenge_parameters(self) -> dict[str, str]: return self["response"]["publicChallengeParameters"] @public_challenge_parameters.setter - def public_challenge_parameters(self, value: Dict[str, str]): + def public_challenge_parameters(self, value: dict[str, str]): """One or more key-value pairs for the client app to use in the challenge to be presented to the user. This parameter should contain all the necessary information to accurately present the challenge to the user.""" self["response"]["publicChallengeParameters"] = value @property - def private_challenge_parameters(self) -> Dict[str, str]: + def private_challenge_parameters(self) -> dict[str, str]: return self["response"]["privateChallengeParameters"] @private_challenge_parameters.setter - def private_challenge_parameters(self, value: Dict[str, str]): + def private_challenge_parameters(self, value: dict[str, str]): """This parameter is only used by the "Verify Auth Challenge" Response Lambda trigger. This parameter should contain all the information that is required to validate the user's response to the challenge. In other words, the publicChallengeParameters parameter contains the @@ -757,12 +759,12 @@ def response(self) -> CreateAuthChallengeTriggerEventResponse: class VerifyAuthChallengeResponseTriggerEventRequest(DictWrapper): @property - def user_attributes(self) -> Dict[str, str]: + def user_attributes(self) -> dict[str, str]: """One or more name-value pairs representing user attributes. The attribute names are the keys.""" return self["request"]["userAttributes"] @property - def private_challenge_parameters(self) -> Dict[str, str]: + def private_challenge_parameters(self) -> dict[str, str]: """This parameter comes from the Create Auth Challenge trigger, and is compared against a user’s challengeAnswer to determine whether the user passed the challenge.""" return self["request"]["privateChallengeParameters"] @@ -773,13 +775,13 @@ def challenge_answer(self) -> Any: return self["request"]["challengeAnswer"] @property - def client_metadata(self) -> Dict[str, str]: + def client_metadata(self) -> dict[str, str]: """One or more key-value pairs that you can provide as custom input to the Lambda function that you specify for the "Verify Auth Challenge" trigger.""" return self["request"].get("clientMetadata") or {} @property - def user_not_found(self) -> Optional[bool]: + def user_not_found(self) -> bool | None: """This boolean is populated when PreventUserExistenceErrors is set to ENABLED for your User Pool client.""" return self["request"].get("userNotFound") diff --git a/aws_lambda_powertools/utilities/data_classes/common.py b/aws_lambda_powertools/utilities/data_classes/common.py index 7e9ed2471d2..b44e9b8ab3c 100644 --- a/aws_lambda_powertools/utilities/data_classes/common.py +++ b/aws_lambda_powertools/utilities/data_classes/common.py @@ -1,9 +1,12 @@ +from __future__ import annotations + import base64 import json from functools import cached_property -from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional +from typing import TYPE_CHECKING, Any, Callable, Iterator, Mapping -from aws_lambda_powertools.shared.headers_serializer import BaseHeadersSerializer +if TYPE_CHECKING: + from aws_lambda_powertools.shared.headers_serializer import BaseHeadersSerializer class CaseInsensitiveDict(dict): @@ -52,11 +55,11 @@ def __setitem__(self, k, v): class DictWrapper(Mapping): """Provides a single read only access to a wrapper dict""" - def __init__(self, data: Dict[str, Any], json_deserializer: Optional[Callable] = None): + def __init__(self, data: dict[str, Any], json_deserializer: Callable | None = None): """ Parameters ---------- - data : Dict[str, Any] + data : dict[str, Any] Lambda Event Source Event payload json_deserializer : Callable, optional function to deserialize `str`, `bytes`, `bytearray` containing a JSON document to a Python `obj`, @@ -83,7 +86,7 @@ def __len__(self) -> int: def __str__(self) -> str: return str(self._str_helper()) - def _str_helper(self) -> Dict[str, Any]: + def _str_helper(self) -> dict[str, Any]: """ Recursively get a Dictionary of DictWrapper properties primarily for use by __str__ for debugging purposes. @@ -98,7 +101,7 @@ def _str_helper(self) -> Dict[str, Any]: if hasattr(self, "_sensitive_properties"): sensitive_properties.extend(self._sensitive_properties) # pyright: ignore - result: Dict[str, Any] = {} + result: dict[str, Any] = {} for property_key in properties: if property_key in sensitive_properties: result[property_key] = "[SENSITIVE]" @@ -120,33 +123,33 @@ def _str_helper(self) -> Dict[str, Any]: return result - def _properties(self) -> List[str]: + def _properties(self) -> list[str]: return [p for p in dir(self.__class__) if isinstance(getattr(self.__class__, p), property)] - def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]: + def get(self, key: str, default: Any | None = None) -> Any | None: return self._data.get(key, default) @property - def raw_event(self) -> Dict[str, Any]: + def raw_event(self) -> dict[str, Any]: """The original raw event dict""" return self._data class BaseProxyEvent(DictWrapper): @property - def headers(self) -> Dict[str, str]: + def headers(self) -> dict[str, str]: return CaseInsensitiveDict(self.get("headers")) @property - def query_string_parameters(self) -> Dict[str, str]: + def query_string_parameters(self) -> dict[str, str]: return self.get("queryStringParameters") or {} @property - def multi_value_query_string_parameters(self) -> Dict[str, List[str]]: + def multi_value_query_string_parameters(self) -> dict[str, list[str]]: return self.get("multiValueQueryStringParameters") or {} @cached_property - def resolved_query_string_parameters(self) -> Dict[str, List[str]]: + def resolved_query_string_parameters(self) -> dict[str, list[str]]: """ This property determines the appropriate query string parameter to be used as a trusted source for validating OpenAPI. @@ -157,7 +160,7 @@ def resolved_query_string_parameters(self) -> Dict[str, List[str]]: return {k: v.split(",") for k, v in self.query_string_parameters.items()} @property - def resolved_headers_field(self) -> Dict[str, str]: + def resolved_headers_field(self) -> dict[str, str]: """ This property determines the appropriate header to be used as a trusted source for validating OpenAPI. @@ -172,11 +175,11 @@ def resolved_headers_field(self) -> Dict[str, str]: return self.headers @property - def is_base64_encoded(self) -> Optional[bool]: + def is_base64_encoded(self) -> bool | None: return self.get("isBase64Encoded") @property - def body(self) -> Optional[str]: + def body(self) -> str | None: """Submitted body of the request as a string""" return self.get("body") @@ -189,9 +192,9 @@ def json_body(self) -> Any: return None @cached_property - def decoded_body(self) -> Optional[str]: + def decoded_body(self) -> str | None: """Decode the body from base64 if encoded, otherwise return it as is.""" - body: Optional[str] = self.body + body: str | None = self.body if self.is_base64_encoded and body: return base64.b64decode(body.encode()).decode() return body @@ -247,56 +250,56 @@ def validity_not_before(self) -> str: class APIGatewayEventIdentity(DictWrapper): @property - def access_key(self) -> Optional[str]: + def access_key(self) -> str | None: return self["requestContext"]["identity"].get("accessKey") @property - def account_id(self) -> Optional[str]: + def account_id(self) -> str | None: """The AWS account ID associated with the request.""" return self["requestContext"]["identity"].get("accountId") @property - def api_key(self) -> Optional[str]: + def api_key(self) -> str | None: """For API methods that require an API key, this variable is the API key associated with the method request. For methods that don't require an API key, this variable is null.""" return self["requestContext"]["identity"].get("apiKey") @property - def api_key_id(self) -> Optional[str]: + def api_key_id(self) -> str | None: """The API key ID associated with an API request that requires an API key.""" return self["requestContext"]["identity"].get("apiKeyId") @property - def caller(self) -> Optional[str]: + def caller(self) -> str | None: """The principal identifier of the caller making the request.""" return self["requestContext"]["identity"].get("caller") @property - def cognito_authentication_provider(self) -> Optional[str]: + def cognito_authentication_provider(self) -> str | None: """A comma-separated list of the Amazon Cognito authentication providers used by the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" return self["requestContext"]["identity"].get("cognitoAuthenticationProvider") @property - def cognito_authentication_type(self) -> Optional[str]: + def cognito_authentication_type(self) -> str | None: """The Amazon Cognito authentication type of the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" return self["requestContext"]["identity"].get("cognitoAuthenticationType") @property - def cognito_identity_id(self) -> Optional[str]: + def cognito_identity_id(self) -> str | None: """The Amazon Cognito identity ID of the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" return self["requestContext"]["identity"].get("cognitoIdentityId") @property - def cognito_identity_pool_id(self) -> Optional[str]: + def cognito_identity_pool_id(self) -> str | None: """The Amazon Cognito identity pool ID of the caller making the request. Available only if the request was signed with Amazon Cognito credentials.""" return self["requestContext"]["identity"].get("cognitoIdentityPoolId") @property - def principal_org_id(self) -> Optional[str]: + def principal_org_id(self) -> str | None: """The AWS organization ID.""" return self["requestContext"]["identity"].get("principalOrgId") @@ -306,22 +309,22 @@ def source_ip(self) -> str: return self["requestContext"]["identity"]["sourceIp"] @property - def user(self) -> Optional[str]: + def user(self) -> str | None: """The principal identifier of the user making the request.""" return self["requestContext"]["identity"].get("user") @property - def user_agent(self) -> Optional[str]: + def user_agent(self) -> str | None: """The User Agent of the API caller.""" return self["requestContext"]["identity"].get("userAgent") @property - def user_arn(self) -> Optional[str]: + def user_arn(self) -> str | None: """The Amazon Resource Name (ARN) of the effective user identified after authentication.""" return self["requestContext"]["identity"].get("userArn") @property - def client_cert(self) -> Optional[RequestContextClientCert]: + def client_cert(self) -> RequestContextClientCert | None: client_cert = self["requestContext"]["identity"].get("clientCert") return None if client_cert is None else RequestContextClientCert(client_cert) @@ -338,16 +341,16 @@ def api_id(self) -> str: return self["requestContext"]["apiId"] @property - def domain_name(self) -> Optional[str]: + def domain_name(self) -> str | None: """A domain name""" return self["requestContext"].get("domainName") @property - def domain_prefix(self) -> Optional[str]: + def domain_prefix(self) -> str | None: return self["requestContext"].get("domainPrefix") @property - def extended_request_id(self) -> Optional[str]: + def extended_request_id(self) -> str | None: """An automatically generated ID for the API call, which contains more useful information for debugging/troubleshooting.""" return self["requestContext"].get("extendedRequestId") @@ -381,7 +384,7 @@ def request_id(self) -> str: return self["requestContext"]["requestId"] @property - def request_time(self) -> Optional[str]: + def request_time(self) -> str | None: """The CLF-formatted request time (dd/MMM/yyyy:HH:mm:ss +-hhmm)""" return self["requestContext"].get("requestTime") @@ -474,7 +477,7 @@ def time_epoch(self) -> int: return self["requestContext"]["timeEpoch"] @property - def authentication(self) -> Optional[RequestContextClientCert]: + def authentication(self) -> RequestContextClientCert | None: """Optional when using mutual TLS authentication""" # FunctionURL might have NONE as AuthZ authentication = self["requestContext"].get("authentication") or {} diff --git a/aws_lambda_powertools/utilities/data_classes/connect_contact_flow_event.py b/aws_lambda_powertools/utilities/data_classes/connect_contact_flow_event.py index f5dbbcb715e..ff0952b5f98 100644 --- a/aws_lambda_powertools/utilities/data_classes/connect_contact_flow_event.py +++ b/aws_lambda_powertools/utilities/data_classes/connect_contact_flow_event.py @@ -1,5 +1,6 @@ +from __future__ import annotations + from enum import Enum, auto -from typing import Dict, Optional from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -47,19 +48,19 @@ def name(self) -> str: class ConnectContactFlowMediaStreamAudio(DictWrapper): @property - def start_fragment_number(self) -> Optional[str]: + def start_fragment_number(self) -> str | None: """The number that identifies the Kinesis Video Streams fragment, in the stream used for Live media streaming, in which the customer audio stream started. """ return self["StartFragmentNumber"] @property - def start_timestamp(self) -> Optional[str]: + def start_timestamp(self) -> str | None: """When the customer audio stream started.""" return self["StartTimestamp"] @property - def stream_arn(self) -> Optional[str]: + def stream_arn(self) -> str | None: """The ARN of the Kinesis Video stream used for Live media streaming that includes the customer data to reference. """ @@ -80,7 +81,7 @@ def customer(self) -> ConnectContactFlowMediaStreamCustomer: class ConnectContactFlowData(DictWrapper): @property - def attributes(self) -> Dict[str, str]: + def attributes(self) -> dict[str, str]: """These are attributes that have been previously associated with a contact, such as when using a Set contact attributes block in a contact flow. This map may be empty if there aren't any saved attributes. @@ -98,7 +99,7 @@ def contact_id(self) -> str: return self["ContactId"] @property - def customer_endpoint(self) -> Optional[ConnectContactFlowEndpoint]: + def customer_endpoint(self) -> ConnectContactFlowEndpoint | None: """Contains the customer’s address (number) and type of address.""" if self["CustomerEndpoint"] is not None: return ConnectContactFlowEndpoint(self["CustomerEndpoint"]) @@ -129,14 +130,14 @@ def previous_contact_id(self) -> str: return self["PreviousContactId"] @property - def queue(self) -> Optional[ConnectContactFlowQueue]: + def queue(self) -> ConnectContactFlowQueue | None: """The current queue.""" if self["Queue"] is not None: return ConnectContactFlowQueue(self["Queue"]) return None @property - def system_endpoint(self) -> Optional[ConnectContactFlowEndpoint]: + def system_endpoint(self) -> ConnectContactFlowEndpoint | None: """Contains the address (number) the customer dialed to call your contact center and type of address.""" if self["SystemEndpoint"] is not None: return ConnectContactFlowEndpoint(self["SystemEndpoint"]) @@ -161,6 +162,6 @@ def contact_data(self) -> ConnectContactFlowData: return ConnectContactFlowData(self["Details"]["ContactData"]) @property - def parameters(self) -> Dict[str, str]: + def parameters(self) -> dict[str, str]: """These are parameters specific to this call that were defined when you created the Lambda function.""" return self["Details"]["Parameters"] diff --git a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py index 139a70e9065..46e0d22fd87 100644 --- a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py @@ -1,6 +1,8 @@ +from __future__ import annotations + from enum import Enum from functools import cached_property -from typing import Any, Dict, Iterator, Optional +from typing import Any, Iterator from aws_lambda_powertools.shared.dynamodb_deserializer import TypeDeserializer from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -18,17 +20,17 @@ class StreamViewType(Enum): class StreamRecord(DictWrapper): _deserializer = TypeDeserializer() - def __init__(self, data: Dict[str, Any]): + def __init__(self, data: dict[str, Any]): """StreamRecord constructor Parameters ---------- - data: Dict[str, Any] + data: dict[str, Any] Represents the dynamodb dict inside DynamoDBStreamEvent's records """ super().__init__(data) self._deserializer = TypeDeserializer() - def _deserialize_dynamodb_dict(self, key: str) -> Dict[str, Any]: + def _deserialize_dynamodb_dict(self, key: str) -> dict[str, Any]: """Deserialize DynamoDB records available in `Keys`, `NewImage`, and `OldImage` Parameters @@ -38,46 +40,46 @@ def _deserialize_dynamodb_dict(self, key: str) -> Dict[str, Any]: Returns ------- - Dict[str, Any] + dict[str, Any] Deserialized records in Python native types """ dynamodb_dict = self._data.get(key) or {} return {k: self._deserializer.deserialize(v) for k, v in dynamodb_dict.items()} @property - def approximate_creation_date_time(self) -> Optional[int]: + def approximate_creation_date_time(self) -> int | None: """The approximate date and time when the stream record was created, in UNIX epoch time format.""" item = self.get("ApproximateCreationDateTime") return None if item is None else int(item) @cached_property - def keys(self) -> Dict[str, Any]: # type: ignore[override] + def keys(self) -> dict[str, Any]: # type: ignore[override] """The primary key attribute(s) for the DynamoDB item that was modified.""" return self._deserialize_dynamodb_dict("Keys") @cached_property - def new_image(self) -> Dict[str, Any]: + def new_image(self) -> dict[str, Any]: """The item in the DynamoDB table as it appeared after it was modified.""" return self._deserialize_dynamodb_dict("NewImage") @cached_property - def old_image(self) -> Dict[str, Any]: + def old_image(self) -> dict[str, Any]: """The item in the DynamoDB table as it appeared before it was modified.""" return self._deserialize_dynamodb_dict("OldImage") @property - def sequence_number(self) -> Optional[str]: + def sequence_number(self) -> str | None: """The sequence number of the stream record.""" return self.get("SequenceNumber") @property - def size_bytes(self) -> Optional[int]: + def size_bytes(self) -> int | None: """The size of the stream record, in bytes.""" item = self.get("SizeBytes") return None if item is None else int(item) @property - def stream_view_type(self) -> Optional[StreamViewType]: + def stream_view_type(self) -> StreamViewType | None: """The type of data from the modified DynamoDB item that was captured in this stream record""" item = self.get("StreamViewType") return None if item is None else StreamViewType[str(item)] @@ -93,39 +95,39 @@ class DynamoDBRecord(DictWrapper): """A description of a unique event within a stream""" @property - def aws_region(self) -> Optional[str]: + def aws_region(self) -> str | None: """The region in which the GetRecords request was received""" return self.get("awsRegion") @property - def dynamodb(self) -> Optional[StreamRecord]: + def dynamodb(self) -> StreamRecord | None: """The main body of the stream record, containing all the DynamoDB-specific dicts.""" stream_record = self.get("dynamodb") return None if stream_record is None else StreamRecord(stream_record) @property - def event_id(self) -> Optional[str]: + def event_id(self) -> str | None: """A globally unique identifier for the event that was recorded in this stream record.""" return self.get("eventID") @property - def event_name(self) -> Optional[DynamoDBRecordEventName]: + def event_name(self) -> DynamoDBRecordEventName | None: """The type of data modification that was performed on the DynamoDB table""" item = self.get("eventName") return None if item is None else DynamoDBRecordEventName[item] @property - def event_source(self) -> Optional[str]: + def event_source(self) -> str | None: """The AWS service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.""" return self.get("eventSource") @property - def event_source_arn(self) -> Optional[str]: + def event_source_arn(self) -> str | None: """The Amazon Resource Name (ARN) of the event source""" return self.get("eventSourceARN") @property - def event_version(self) -> Optional[str]: + def event_version(self) -> str | None: """The version number of the stream record format.""" return self.get("eventVersion") diff --git a/aws_lambda_powertools/utilities/data_classes/event_bridge_event.py b/aws_lambda_powertools/utilities/data_classes/event_bridge_event.py index bdbf9d68afa..f7ce1953e2a 100644 --- a/aws_lambda_powertools/utilities/data_classes/event_bridge_event.py +++ b/aws_lambda_powertools/utilities/data_classes/event_bridge_event.py @@ -1,4 +1,6 @@ -from typing import Any, Dict, List, Optional +from __future__ import annotations + +from typing import Any from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -43,7 +45,7 @@ def region(self) -> str: return self["region"] @property - def resources(self) -> List[str]: + def resources(self) -> list[str]: """This JSON array contains ARNs that identify resources that are involved in the event. Inclusion of these ARNs is at the discretion of the service.""" return self["resources"] @@ -59,11 +61,11 @@ def detail_type(self) -> str: return self["detail-type"] @property - def detail(self) -> Dict[str, Any]: + def detail(self) -> dict[str, Any]: """A JSON object, whose content is at the discretion of the service originating the event.""" return self["detail"] @property - def replay_name(self) -> Optional[str]: + def replay_name(self) -> str | None: """Identifies whether the event is being replayed and what is the name of the replay.""" return self["replay-name"] diff --git a/aws_lambda_powertools/utilities/data_classes/event_source.py b/aws_lambda_powertools/utilities/data_classes/event_source.py index 3968f923573..164e29dbd11 100644 --- a/aws_lambda_powertools/utilities/data_classes/event_source.py +++ b/aws_lambda_powertools/utilities/data_classes/event_source.py @@ -1,16 +1,20 @@ -from typing import Any, Callable, Dict, Type +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Callable from aws_lambda_powertools.middleware_factory import lambda_handler_decorator -from aws_lambda_powertools.utilities.data_classes.common import DictWrapper -from aws_lambda_powertools.utilities.typing import LambdaContext + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + from aws_lambda_powertools.utilities.typing import LambdaContext @lambda_handler_decorator def event_source( handler: Callable[[Any, LambdaContext], Any], - event: Dict[str, Any], + event: dict[str, Any], context: LambdaContext, - data_class: Type[DictWrapper], + data_class: type[DictWrapper], ): """Middleware to create an instance of the passed in event source data class @@ -18,11 +22,11 @@ def event_source( ---------- handler: Callable Lambda's handler - event: Dict + event: dict[str, Any] Lambda's Event - context: Dict + context: LambdaContext Lambda's Context - data_class: Type[DictWrapper] + data_class: type[DictWrapper] Data class type to instantiate Example diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py index f73802ba699..436afe43652 100644 --- a/aws_lambda_powertools/utilities/data_classes/kafka_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import base64 from functools import cached_property -from typing import Any, Dict, Iterator, List, Optional +from typing import Any, Iterator from aws_lambda_powertools.utilities.data_classes.common import CaseInsensitiveDict, DictWrapper @@ -57,12 +59,12 @@ def json_value(self) -> Any: return self._json_deserializer(self.decoded_value.decode("utf-8")) @property - def headers(self) -> List[Dict[str, List[int]]]: + def headers(self) -> list[dict[str, list[int]]]: """The raw Kafka record headers.""" return self["headers"] @cached_property - def decoded_headers(self) -> Dict[str, bytes]: + def decoded_headers(self) -> dict[str, bytes]: """Decodes the headers as a single dictionary.""" return CaseInsensitiveDict((k, bytes(v)) for chunk in self.headers for k, v in chunk.items()) @@ -75,9 +77,9 @@ class KafkaEvent(DictWrapper): - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html """ - def __init__(self, data: Dict[str, Any]): + def __init__(self, data: dict[str, Any]): super().__init__(data) - self._records: Optional[Iterator[KafkaEventRecord]] = None + self._records: Iterator[KafkaEventRecord] | None = None @property def event_source(self) -> str: @@ -85,7 +87,7 @@ def event_source(self) -> str: return self["eventSource"] @property - def event_source_arn(self) -> Optional[str]: + def event_source_arn(self) -> str | None: """The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK.""" return self.get("eventSourceArn") @@ -95,7 +97,7 @@ def bootstrap_servers(self) -> str: return self["bootstrapServers"] @property - def decoded_bootstrap_servers(self) -> List[str]: + def decoded_bootstrap_servers(self) -> list[str]: """The decoded Kafka bootstrap URL.""" return self.bootstrap_servers.split(",") diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 492aac53176..720bfa89eee 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import base64 import json import warnings from dataclasses import dataclass, field from functools import cached_property -from typing import Any, Callable, ClassVar, Dict, Iterator, List, Optional, Tuple +from typing import Any, Callable, ClassVar, Iterator from typing_extensions import Literal @@ -17,7 +19,7 @@ class KinesisFirehoseDataTransformationRecordMetadata: Parameters ---------- - partition_keys: Dict[str, str] + partition_keys: dict[str, str] A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}` Documentation: @@ -25,9 +27,9 @@ class KinesisFirehoseDataTransformationRecordMetadata: - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ - partition_keys: Dict[str, str] = field(default_factory=lambda: {}) + partition_keys: dict[str, str] = field(default_factory=lambda: {}) - def asdict(self) -> Dict: + def asdict(self) -> dict: if self.partition_keys is not None: return {"partitionKeys": self.partition_keys} return {} @@ -48,7 +50,7 @@ class KinesisFirehoseDataTransformationRecord: Use `data_from_text` or `data_from_json` methods to convert data if needed. - metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] + metadata: KinesisFirehoseDataTransformationRecordMetadata | None Metadata associated with this record; can contain partition keys. See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html @@ -63,23 +65,23 @@ class KinesisFirehoseDataTransformationRecord: - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html """ - _valid_result_types: ClassVar[Tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed") + _valid_result_types: ClassVar[tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed") record_id: str result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok" data: str = "" - metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None + metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None json_serializer: Callable = json.dumps json_deserializer: Callable = json.loads - def asdict(self) -> Dict: + def asdict(self) -> dict: if self.result not in self._valid_result_types: warnings.warn( stacklevel=1, message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"', ) - record: Dict[str, Any] = { + record: dict[str, Any] = { "recordId": self.record_id, "result": self.result, "data": self.data, @@ -103,7 +105,7 @@ def data_as_text(self) -> str: return self.data_as_bytes.decode("utf-8") @cached_property - def data_as_json(self) -> Dict: + def data_as_json(self) -> dict: """Decoded base64-encoded data loaded to json""" if not self.data: return {} @@ -121,7 +123,7 @@ class KinesisFirehoseDataTransformationResponse: Parameters ---------- - records : List[KinesisFirehoseResponseRecord] + records : list[KinesisFirehoseResponseRecord] records of Kinesis Data Firehose response object, optional parameter at start. can be added later using `add_record` function. @@ -161,12 +163,12 @@ def lambda_handler(event: dict, context: LambdaContext): ``` """ - records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list) + records: list[KinesisFirehoseDataTransformationRecord] = field(default_factory=list) def add_record(self, record: KinesisFirehoseDataTransformationRecord): self.records.append(record) - def asdict(self) -> Dict: + def asdict(self) -> dict: if not self.records: raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response") @@ -225,7 +227,7 @@ def data(self) -> str: return self["data"] @property - def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]: + def metadata(self) -> KinesisFirehoseRecordMetadata | None: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None @@ -248,7 +250,7 @@ def build_data_transformation_response( self, result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok", data: str = "", - metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None, + metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None, ) -> KinesisFirehoseDataTransformationRecord: """Create a KinesisFirehoseResponseRecord directly using the record_id and given values @@ -290,7 +292,7 @@ def delivery_stream_arn(self) -> str: return self["deliveryStreamArn"] @property - def source_kinesis_stream_arn(self) -> Optional[str]: + def source_kinesis_stream_arn(self) -> str | None: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn") diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 06eaedb8904..ba2300b34be 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import base64 import json import zlib -from typing import Iterator, List +from typing import Iterator from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import ( CloudWatchLogsDecodedData, @@ -109,7 +111,7 @@ def records(self) -> Iterator[KinesisStreamRecord]: yield KinesisStreamRecord(record) -def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> List[CloudWatchLogsDecodedData]: +def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> list[CloudWatchLogsDecodedData]: return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records] diff --git a/aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py b/aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py index 0eaae042621..1f654dfc0d7 100644 --- a/aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py +++ b/aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from functools import cached_property -from typing import Any, Dict, List +from typing import Any from aws_lambda_powertools.utilities.data_classes.common import DictWrapper from aws_lambda_powertools.utilities.data_classes.shared_functions import base64_decode @@ -15,7 +17,7 @@ def content_encoding(self) -> str: return self["contentEncoding"] @property - def headers(self) -> Dict[str, Any]: + def headers(self) -> dict[str, Any]: return self["headers"] @property @@ -100,7 +102,7 @@ class RabbitMQEvent(DictWrapper): - https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/ """ - def __init__(self, data: Dict[str, Any]): + def __init__(self, data: dict[str, Any]): super().__init__(data) self._rmq_messages_by_queue = { key: [RabbitMessage(message) for message in messages] @@ -117,5 +119,5 @@ def event_source_arn(self) -> str: return self["eventSourceArn"] @property - def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]: + def rmq_messages_by_queue(self) -> dict[str, list[RabbitMessage]]: return self._rmq_messages_by_queue diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py index bd9a07c4484..9ad201afc2d 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -1,12 +1,14 @@ +from __future__ import annotations + import warnings from dataclasses import dataclass, field -from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple +from typing import Any, Iterator, Literal from urllib.parse import unquote_plus from aws_lambda_powertools.utilities.data_classes.common import DictWrapper # list of valid result code. Used both in S3BatchOperationResponse and S3BatchOperationResponseRecord -VALID_RESULT_CODES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure") +VALID_RESULT_CODES: tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure") RESULT_CODE_TYPE = Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] @@ -14,9 +16,9 @@ class S3BatchOperationResponseRecord: task_id: str result_code: RESULT_CODE_TYPE - result_string: Optional[str] = None + result_string: str | None = None - def asdict(self) -> Dict[str, Any]: + def asdict(self) -> dict[str, Any]: if self.result_code not in VALID_RESULT_CODES: warnings.warn( stacklevel=2, @@ -53,7 +55,7 @@ class S3BatchOperationResponse: treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] Undocumented parameter, defaults to "Succeeded" - results : List[S3BatchOperationResult] + results : list[S3BatchOperationResult] Results of each S3 Batch Operations task, optional parameter at start. Can be added later using `add_result` function. @@ -112,7 +114,7 @@ def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): invocation_schema_version: str invocation_id: str treat_missing_keys_as: RESULT_CODE_TYPE = "Succeeded" - results: List[S3BatchOperationResponseRecord] = field(default_factory=list) + results: list[S3BatchOperationResponseRecord] = field(default_factory=list) def __post_init__(self): if self.treat_missing_keys_as not in VALID_RESULT_CODES: @@ -125,7 +127,7 @@ def __post_init__(self): def add_result(self, result: S3BatchOperationResponseRecord): self.results.append(result) - def asdict(self) -> Dict: + def asdict(self) -> dict: result_count = len(self.results) if result_count != 1: @@ -146,7 +148,7 @@ def get_id(self) -> str: return self["id"] @property - def user_arguments(self) -> Dict[str, str]: + def user_arguments(self) -> dict[str, str]: """Get user arguments provided for this job (only for invocation schema 2.0)""" return self.get("userArguments") or {} @@ -163,12 +165,12 @@ def s3_key(self) -> str: return unquote_plus(self["s3Key"]) @property - def s3_version_id(self) -> Optional[str]: + def s3_version_id(self) -> str | None: """Object version if bucket is versioning-enabled, otherwise null""" return self.get("s3VersionId") @property - def s3_bucket_arn(self) -> Optional[str]: + def s3_bucket_arn(self) -> str | None: """Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')""" return self.get("s3BucketArn") diff --git a/aws_lambda_powertools/utilities/data_classes/s3_event.py b/aws_lambda_powertools/utilities/data_classes/s3_event.py index 1a6f9c541a3..f3c0fa2adf9 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_event.py @@ -1,4 +1,6 @@ -from typing import Dict, Iterator, Optional +from __future__ import annotations + +from typing import Iterator from urllib.parse import unquote_plus from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -32,7 +34,7 @@ def key(self) -> str: return unquote_plus(self["key"]) @property - def size(self) -> Optional[int]: + def size(self) -> int | None: """Object size. Object deletion event doesn't contain size.""" return self.get("size") @@ -79,12 +81,12 @@ def requester(self) -> str: return self["requester"] @property - def source_ip_address(self) -> Optional[str]: + def source_ip_address(self) -> str | None: """Get the source IP address of S3 request. Only present for events triggered by an S3 request.""" return self.get("source-ip-address") @property - def reason(self) -> Optional[str]: + def reason(self) -> str | None: """Get the reason for the S3 notification. For 'Object Created events', the S3 API used to create the object: `PutObject`, `POST Object`, `CopyObject`, or @@ -94,7 +96,7 @@ def reason(self) -> Optional[str]: return self.get("reason") @property - def deletion_type(self) -> Optional[str]: + def deletion_type(self) -> str | None: """Get the deletion type for the S3 object in this notification. For 'Object Deleted' events, when an unversioned object is deleted, or a versioned object is permanently deleted @@ -104,7 +106,7 @@ def deletion_type(self) -> Optional[str]: return self.get("deletion-type") @property - def restore_expiry_time(self) -> Optional[str]: + def restore_expiry_time(self) -> str | None: """Get the restore expiry time for the S3 object in this notification. For 'Object Restore Completed' events, the time when the temporary copy of the object will be deleted from S3. @@ -112,7 +114,7 @@ def restore_expiry_time(self) -> Optional[str]: return self.get("restore-expiry-time") @property - def source_storage_class(self) -> Optional[str]: + def source_storage_class(self) -> str | None: """Get the source storage class of the S3 object in this notification. For 'Object Restore Initiated' and 'Object Restore Completed' events, the storage class of the object being @@ -121,7 +123,7 @@ def source_storage_class(self) -> Optional[str]: return self.get("source-storage-class") @property - def destination_storage_class(self) -> Optional[str]: + def destination_storage_class(self) -> str | None: """Get the destination storage class of the S3 object in this notification. For 'Object Storage Class Changed' events, the new storage class of the object. @@ -129,7 +131,7 @@ def destination_storage_class(self) -> Optional[str]: return self.get("destination-storage-class") @property - def destination_access_tier(self) -> Optional[str]: + def destination_access_tier(self) -> str | None: """Get the destination access tier of the S3 object in this notification. For 'Object Access Tier Changed' events, the new access tier of the object. @@ -182,7 +184,7 @@ def etag(self) -> str: return self["s3"]["object"].get("eTag", "") @property - def version_id(self) -> Optional[str]: + def version_id(self) -> str | None: """Object version if bucket is versioning-enabled, otherwise null""" return self["s3"]["object"].get("versionId") @@ -273,7 +275,7 @@ def request_parameters(self) -> S3RequestParameters: return S3RequestParameters(self._data) @property - def response_elements(self) -> Dict[str, str]: + def response_elements(self) -> dict[str, str]: """The responseElements key value is useful if you want to trace a request by following up with AWS Support. Both x-amz-request-id and x-amz-id-2 help Amazon S3 trace an individual request. These values are the same @@ -287,7 +289,7 @@ def s3(self) -> S3Message: return S3Message(self._data) @property - def glacier_event_data(self) -> Optional[S3EventRecordGlacierEventData]: + def glacier_event_data(self) -> S3EventRecordGlacierEventData | None: """The glacierEventData key is only visible for s3:ObjectRestore:Completed events.""" item = self.get("glacierEventData") return None if item is None else S3EventRecordGlacierEventData(item) diff --git a/aws_lambda_powertools/utilities/data_classes/s3_object_event.py b/aws_lambda_powertools/utilities/data_classes/s3_object_event.py index 011392a7671..c3fc9d7232a 100644 --- a/aws_lambda_powertools/utilities/data_classes/s3_object_event.py +++ b/aws_lambda_powertools/utilities/data_classes/s3_object_event.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional +from __future__ import annotations from aws_lambda_powertools.utilities.data_classes.common import CaseInsensitiveDict, DictWrapper @@ -62,7 +62,7 @@ def url(self) -> str: return self["url"] @property - def headers(self) -> Dict[str, str]: + def headers(self) -> dict[str, str]: """A map of string to strings containing the HTTP headers and their values from the original call, excluding any authorization-related headers. @@ -194,7 +194,7 @@ def arn(self) -> str: return self["arn"] @property - def session_context(self) -> Optional[S3ObjectSessionContext]: + def session_context(self) -> S3ObjectSessionContext | None: """If the request was made with temporary security credentials, this element provides information about the session that was created for those credentials.""" session_context = self.get("sessionContext") diff --git a/aws_lambda_powertools/utilities/data_classes/ses_event.py b/aws_lambda_powertools/utilities/data_classes/ses_event.py index 5adcf7149ee..e50ec9ccc56 100644 --- a/aws_lambda_powertools/utilities/data_classes/ses_event.py +++ b/aws_lambda_powertools/utilities/data_classes/ses_event.py @@ -1,4 +1,6 @@ -from typing import Iterator, List, Optional +from __future__ import annotations + +from typing import Iterator from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -20,7 +22,7 @@ def return_path(self) -> str: return self["returnPath"] @property - def get_from(self) -> List[str]: + def get_from(self) -> list[str]: """The values in the From header of the email.""" # Note: this name conflicts with existing python builtins return self["from"] @@ -31,7 +33,7 @@ def date(self) -> str: return self["date"] @property - def to(self) -> List[str]: + def to(self) -> list[str]: """The values in the To header of the email.""" return self["to"] @@ -46,22 +48,22 @@ def subject(self) -> str: return str(self["subject"]) @property - def cc(self) -> List[str]: + def cc(self) -> list[str]: """The values in the CC header of the email.""" return self.get("cc") or [] @property - def bcc(self) -> List[str]: + def bcc(self) -> list[str]: """The values in the BCC header of the email.""" return self.get("bcc") or [] @property - def sender(self) -> List[str]: + def sender(self) -> list[str]: """The values in the Sender header of the email.""" return self.get("sender") or [] @property - def reply_to(self) -> List[str]: + def reply_to(self) -> list[str]: """The values in the replyTo header of the email.""" return self.get("replyTo") or [] @@ -87,7 +89,7 @@ def message_id(self) -> str: return self["messageId"] @property - def destination(self) -> List[str]: + def destination(self) -> list[str]: """A complete list of all recipient addresses (including To: and CC: recipients) from the MIME headers of the incoming email.""" return self["destination"] @@ -131,7 +133,7 @@ def get_type(self) -> str: return self["type"] @property - def topic_arn(self) -> Optional[str]: + def topic_arn(self) -> str | None: """String that contains the Amazon Resource Name (ARN) of the Amazon SNS topic to which the notification was published.""" return self.get("topicArn") @@ -162,7 +164,7 @@ def processing_time_millis(self) -> int: return int(self["processingTimeMillis"]) @property - def recipients(self) -> List[str]: + def recipients(self) -> list[str]: """A list of recipients (specifically, the envelope RCPT TO addresses) that were matched by the active receipt rule. The addresses listed here may differ from those listed by the destination field in the mail object.""" @@ -195,7 +197,7 @@ def dmarc_verdict(self) -> SESReceiptStatus: return SESReceiptStatus(self["dmarcVerdict"]) @property - def dmarc_policy(self) -> Optional[str]: + def dmarc_policy(self) -> str | None: """Indicates the Domain-based Message Authentication, Reporting & Conformance (DMARC) settings for the sending domain. This field only appears if the message fails DMARC authentication. Possible values for this field are: none, quarantine, reject""" diff --git a/aws_lambda_powertools/utilities/data_classes/sns_event.py b/aws_lambda_powertools/utilities/data_classes/sns_event.py index 5d29d682ef2..1720389ad05 100644 --- a/aws_lambda_powertools/utilities/data_classes/sns_event.py +++ b/aws_lambda_powertools/utilities/data_classes/sns_event.py @@ -1,4 +1,6 @@ -from typing import Dict, Iterator +from __future__ import annotations + +from typing import Iterator from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -50,7 +52,7 @@ def message(self) -> str: return self["Message"] @property - def message_attributes(self) -> Dict[str, SNSMessageAttribute]: + def message_attributes(self) -> dict[str, SNSMessageAttribute]: return {k: SNSMessageAttribute(v) for (k, v) in self["MessageAttributes"].items()} @property diff --git a/aws_lambda_powertools/utilities/data_classes/sqs_event.py b/aws_lambda_powertools/utilities/data_classes/sqs_event.py index dda63430dc6..dc149c04902 100644 --- a/aws_lambda_powertools/utilities/data_classes/sqs_event.py +++ b/aws_lambda_powertools/utilities/data_classes/sqs_event.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from functools import cached_property -from typing import Any, Dict, ItemsView, Iterator, Optional, Type, TypeVar +from typing import Any, Dict, ItemsView, Iterator, TypeVar from aws_lambda_powertools.utilities.data_classes import S3Event from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -8,7 +10,7 @@ class SQSRecordAttributes(DictWrapper): @property - def aws_trace_header(self) -> Optional[str]: + def aws_trace_header(self) -> str | None: """Returns the AWS X-Ray trace header string.""" return self.get("AWSTraceHeader") @@ -33,12 +35,12 @@ def approximate_first_receive_timestamp(self) -> str: return self["ApproximateFirstReceiveTimestamp"] @property - def sequence_number(self) -> Optional[str]: + def sequence_number(self) -> str | None: """The large, non-consecutive number that Amazon SQS assigns to each message.""" return self.get("SequenceNumber") @property - def message_group_id(self) -> Optional[str]: + def message_group_id(self) -> str | None: """The tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group are always processed one by one, in a @@ -47,7 +49,7 @@ def message_group_id(self) -> Optional[str]: return self.get("MessageGroupId") @property - def message_deduplication_id(self) -> Optional[str]: + def message_deduplication_id(self) -> str | None: """The token used for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent @@ -56,7 +58,7 @@ def message_deduplication_id(self) -> Optional[str]: return self.get("MessageDeduplicationId") @property - def dead_letter_queue_source_arn(self) -> Optional[str]: + def dead_letter_queue_source_arn(self) -> str | None: """The SQS queue ARN that sent the record to this DLQ. Only present when a Lambda function is using a DLQ as an event source. """ @@ -67,12 +69,12 @@ class SQSMessageAttribute(DictWrapper): """The user-specified message attribute value.""" @property - def string_value(self) -> Optional[str]: + def string_value(self) -> str | None: """Strings are Unicode with UTF-8 binary encoding.""" return self["stringValue"] @property - def binary_value(self) -> Optional[str]: + def binary_value(self) -> str | None: """Binary type attributes can store any binary data, such as compressed data, encrypted data, or images. Base64-encoded binary data object""" @@ -85,7 +87,7 @@ def data_type(self) -> str: class SQSMessageAttributes(Dict[str, SQSMessageAttribute]): - def __getitem__(self, key: str) -> Optional[SQSMessageAttribute]: # type: ignore + def __getitem__(self, key: str) -> SQSMessageAttribute | None: # type: ignore item = super().get(key) return None if item is None else SQSMessageAttribute(item) # type: ignore @@ -230,7 +232,7 @@ def decoded_nested_sns_event(self) -> SNSMessage: """ return self._decode_nested_event(SNSMessage) - def _decode_nested_event(self, nested_event_class: Type[NestedEvent]) -> NestedEvent: + def _decode_nested_event(self, nested_event_class: type[NestedEvent]) -> NestedEvent: """Returns the nested event source data object. This is useful for handling events that are sent in the body of a SQS message. diff --git a/aws_lambda_powertools/utilities/data_classes/vpc_lattice.py b/aws_lambda_powertools/utilities/data_classes/vpc_lattice.py index f04c58dc5f0..661193b01e0 100644 --- a/aws_lambda_powertools/utilities/data_classes/vpc_lattice.py +++ b/aws_lambda_powertools/utilities/data_classes/vpc_lattice.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from functools import cached_property -from typing import Any, Dict, Optional +from typing import Any from aws_lambda_powertools.shared.headers_serializer import ( BaseHeadersSerializer, @@ -25,7 +27,7 @@ def json_body(self) -> Any: return self._json_deserializer(self.decoded_body) @property - def headers(self) -> Dict[str, str]: + def headers(self) -> dict[str, str]: """The VPC Lattice event headers.""" return CaseInsensitiveDict(self["headers"]) @@ -70,63 +72,63 @@ def path(self) -> str: return self["raw_path"] @property - def query_string_parameters(self) -> Dict[str, str]: + def query_string_parameters(self) -> dict[str, str]: """The request query string parameters.""" return self["query_string_parameters"] @cached_property - def resolved_headers_field(self) -> Dict[str, Any]: + def resolved_headers_field(self) -> dict[str, Any]: return CaseInsensitiveDict((k, v.split(",") if "," in v else v) for k, v in self.headers.items()) class vpcLatticeEventV2Identity(DictWrapper): @property - def source_vpc_arn(self) -> Optional[str]: + def source_vpc_arn(self) -> str | None: """The VPC Lattice v2 Event requestContext Identity sourceVpcArn""" return self.get("sourceVpcArn") @property - def get_type(self) -> Optional[str]: + def get_type(self) -> str | None: """The VPC Lattice v2 Event requestContext Identity type""" return self.get("type") @property - def principal(self) -> Optional[str]: + def principal(self) -> str | None: """The VPC Lattice v2 Event requestContext principal""" return self.get("principal") @property - def principal_org_id(self) -> Optional[str]: + def principal_org_id(self) -> str | None: """The VPC Lattice v2 Event requestContext principalOrgID""" return self.get("principalOrgID") @property - def session_name(self) -> Optional[str]: + def session_name(self) -> str | None: """The VPC Lattice v2 Event requestContext sessionName""" return self.get("sessionName") @property - def x509_subject_cn(self) -> Optional[str]: + def x509_subject_cn(self) -> str | None: """The VPC Lattice v2 Event requestContext X509SubjectCn""" return self.get("X509SubjectCn") @property - def x509_issuer_ou(self) -> Optional[str]: + def x509_issuer_ou(self) -> str | None: """The VPC Lattice v2 Event requestContext X509IssuerOu""" return self.get("X509IssuerOu") @property - def x509_san_dns(self) -> Optional[str]: + def x509_san_dns(self) -> str | None: """The VPC Lattice v2 Event requestContext X509SanDns""" return self.get("x509SanDns") @property - def x509_san_uri(self) -> Optional[str]: + def x509_san_uri(self) -> str | None: """The VPC Lattice v2 Event requestContext X509SanUri""" return self.get("X509SanUri") @property - def x509_san_name_cn(self) -> Optional[str]: + def x509_san_name_cn(self) -> str | None: """The VPC Lattice v2 Event requestContext X509SanNameCn""" return self.get("X509SanNameCn") @@ -170,7 +172,7 @@ def version(self) -> str: return self["version"] @property - def is_base64_encoded(self) -> Optional[bool]: + def is_base64_encoded(self) -> bool | None: """A boolean flag to indicate if the applicable request payload is Base64-encode""" return self.get("isBase64Encoded") @@ -185,10 +187,10 @@ def request_context(self) -> vpcLatticeEventV2RequestContext: return vpcLatticeEventV2RequestContext(self["requestContext"]) @cached_property - def query_string_parameters(self) -> Dict[str, str]: + def query_string_parameters(self) -> dict[str, str]: """The request query string parameters. - For VPC Lattice V2, the queryStringParameters will contain a Dict[str, List[str]] + For VPC Lattice V2, the queryStringParameters will contain a dict[str, list[str]] so to keep compatibility with existing utilities, we merge all the values with a comma. """ params = self.get("queryStringParameters") or {}