From c466c806985e518a5c52acb54485fbccbaa2548f Mon Sep 17 00:00:00 2001 From: Stephane B Date: Wed, 17 Jan 2024 10:36:12 -0500 Subject: [PATCH] feat(event_source): Add support for S3 batch operations (#3572) * Add support for S3 Batch Operations event and response along with unit tests. This seamlessly support both schema 1.0 and 2.0 A few notes: - S3BatchOperationXXX or S3BatchOperationsXXX ? - s3 key is not url-encoded in real life despite what the documentation implies. Need to test with some keys that contain spaces, etc... - S3BatchOperationResult has some factory methods to simplifies building - S3BatchOperationEvent may need to as it makes initialization needlessly complicated * Add documentation with example based on the AWS S3 documentation * Use unquote_plus and add unit test for key encoded with space * Initial refactor * Changing the DX to improve usability * Documentation * Adding parser * Small refactor * Addressing Ruben's feedback - Docs and examples * Addressing Ruben's feedback - Docs and examples * Addressing Ruben's feedback - Code * Addressing Ruben's feedback - Code --------- Co-authored-by: Leandro Damascena --- .../utilities/data_classes/__init__.py | 8 + .../data_classes/s3_batch_operation_event.py | 242 ++++++++++++++++++ .../utilities/parser/models/__init__.py | 4 + .../parser/models/s3_batch_operation.py | 34 +++ docs/utilities/data_classes.md | 13 +- docs/utilities/parser.md | 1 + .../event_sources/src/s3_batch_operation.py | 37 +++ .../events/s3BatchOperationEventSchemaV1.json | 15 ++ .../events/s3BatchOperationEventSchemaV2.json | 19 ++ .../test_s3_batch_operation_event.py | 60 +++++ .../test_s3_batch_operation_response.py | 152 +++++++++++ tests/unit/parser/test_s3_batch_operation.py | 38 +++ ...bject_event.py => test_s3_object_event.py} | 0 13 files changed, 622 insertions(+), 1 deletion(-) create mode 100644 aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py create mode 100644 aws_lambda_powertools/utilities/parser/models/s3_batch_operation.py create mode 100644 examples/event_sources/src/s3_batch_operation.py create mode 100644 tests/events/s3BatchOperationEventSchemaV1.json create mode 100644 tests/events/s3BatchOperationEventSchemaV2.json create mode 100644 tests/unit/data_classes/test_s3_batch_operation_event.py create mode 100644 tests/unit/data_classes/test_s3_batch_operation_response.py create mode 100644 tests/unit/parser/test_s3_batch_operation.py rename tests/unit/parser/{test_s3 object_event.py => test_s3_object_event.py} (100%) diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index fd9294bc8bb..38274f0bab4 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -23,6 +23,11 @@ ) from .kinesis_stream_event import KinesisStreamEvent from .lambda_function_url_event import LambdaFunctionUrlEvent +from .s3_batch_operation_event import ( + S3BatchOperationEvent, + S3BatchOperationResponse, + S3BatchOperationResponseRecord, +) from .s3_event import S3Event, S3EventBridgeNotificationEvent from .secrets_manager_event import SecretsManagerEvent from .ses_event import SESEvent @@ -52,6 +57,9 @@ "LambdaFunctionUrlEvent", "S3Event", "S3EventBridgeNotificationEvent", + "S3BatchOperationEvent", + "S3BatchOperationResponse", + "S3BatchOperationResponseRecord", "SESEvent", "SNSEvent", "SQSEvent", diff --git a/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py new file mode 100644 index 00000000000..9c742e0c553 --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py @@ -0,0 +1,242 @@ +import warnings +from dataclasses import dataclass, field +from typing import Any, Dict, Iterator, List, Optional, Tuple +from urllib.parse import unquote_plus + +from aws_lambda_powertools.shared.types import Literal +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + +# list of valid result code. Used both in S3BatchOperationResponse and S3BatchOperationResponseRecord +VALID_RESULT_CODES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure") +RESULT_CODE_TYPE = Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] + + +@dataclass(repr=False, order=False) +class S3BatchOperationResponseRecord: + task_id: str + result_code: RESULT_CODE_TYPE + result_string: Optional[str] = None + + def asdict(self) -> Dict[str, Any]: + if self.result_code not in VALID_RESULT_CODES: + warnings.warn( + stacklevel=2, + message=f"The resultCode {self.result_code} is not valid. " + f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", + ) + + return { + "taskId": self.task_id, + "resultCode": self.result_code, + "resultString": self.result_string, + } + + +@dataclass(repr=False, order=False) +class S3BatchOperationResponse: + """S3 Batch Operations response object + + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html + - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions + - https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion + + Parameters + ---------- + invocation_schema_version : str + Specifies the schema version for the payload that Batch Operations sends when invoking + an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event. + + invocation_id : str + The identifier of the invocation request. This must be copied from the event. + + treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] + Undocumented parameter, defaults to "Succeeded" + + results : List[S3BatchOperationResult] + Results of each S3 Batch Operations task, + optional parameter at start. Can be added later using `add_result` function. + + Examples + -------- + + **S3 Batch Operations** + + ```python + import boto3 + + from botocore.exceptions import ClientError + + from aws_lambda_powertools.utilities.data_classes import ( + S3BatchOperationEvent, + S3BatchOperationResponse, + event_source + ) + from aws_lambda_powertools.utilities.typing import LambdaContext + + + @event_source(data_class=S3BatchOperationEvent) + def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): + response = S3BatchOperationResponse( + event.invocation_schema_version, + event.invocation_id, + "PermanentFailure" + ) + + result = None + task = event.task + src_key: str = task.s3_key + src_bucket: str = task.s3_bucket + + s3 = boto3.client("s3", region_name='us-east-1') + + try: + dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) + result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") + except ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + if error_code == 'RequestTimeout': + result = task.build_task_batch_response("TemporaryFailure", "Timeout - trying again") + else: + result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") + except Exception as e: + result = task.build_task_batch_response("PermanentFailure", str(e)) + finally: + response.add_result(result) + + return response.asdict() + ``` + """ + + invocation_schema_version: str + invocation_id: str + treat_missing_keys_as: RESULT_CODE_TYPE = "Succeeded" + results: List[S3BatchOperationResponseRecord] = field(default_factory=list) + + def __post_init__(self): + if self.treat_missing_keys_as not in VALID_RESULT_CODES: + warnings.warn( + stacklevel=2, + message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, " + f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", + ) + + def add_result(self, result: S3BatchOperationResponseRecord): + self.results.append(result) + + def asdict(self) -> Dict: + result_count = len(self.results) + + if result_count != 1: + raise ValueError(f"Response must have exactly one result, but got {result_count}") + + return { + "invocationSchemaVersion": self.invocation_schema_version, + "treatMissingKeysAs": self.treat_missing_keys_as, + "invocationId": self.invocation_id, + "results": [result.asdict() for result in self.results], + } + + +class S3BatchOperationJob(DictWrapper): + @property + def get_id(self) -> str: + # Note: this name conflicts with existing python builtins + return self["id"] + + @property + def user_arguments(self) -> Optional[Dict[str, str]]: + """Get user arguments provided for this job (only for invocation schema 2.0)""" + return self.get("userArguments") + + +class S3BatchOperationTask(DictWrapper): + @property + def task_id(self) -> str: + """Get the task id""" + return self["taskId"] + + @property + def s3_key(self) -> str: + """Get the object key using unquote_plus""" + return unquote_plus(self["s3Key"]) + + @property + def s3_version_id(self) -> Optional[str]: + """Object version if bucket is versioning-enabled, otherwise null""" + return self.get("s3VersionId") + + @property + def s3_bucket_arn(self) -> Optional[str]: + """Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')""" + return self.get("s3BucketArn") + + @property + def s3_bucket(self) -> str: + """ + Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') + or from 's3BucketArn' (invocationSchemaVersion '1.0') + """ + if self.s3_bucket_arn: + return self.s3_bucket_arn.split(":::")[-1] + return self["s3Bucket"] + + def build_task_batch_response( + self, + result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded", + result_string: str = "", + ) -> S3BatchOperationResponseRecord: + """Create a S3BatchOperationResponseRecord directly using the task_id and given values + + Parameters + ---------- + result_code : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded" + task result, supported value: "Succeeded", "TemporaryFailure", "PermanentFailure" + result_string : str + string to identify in the report + """ + return S3BatchOperationResponseRecord( + task_id=self.task_id, + result_code=result_code, + result_string=result_string, + ) + + +class S3BatchOperationEvent(DictWrapper): + """Amazon S3BatchOperation Event + + Documentation: + -------------- + - https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html + """ + + @property + def invocation_id(self) -> str: + """Get the identifier of the invocation request""" + return self["invocationId"] + + @property + def invocation_schema_version(self) -> Literal["1.0", "2.0"]: + """ + Get the schema version for the payload that Batch Operations sends when invoking an + AWS Lambda function. Either '1.0' or '2.0'. + """ + return self["invocationSchemaVersion"] + + @property + def tasks(self) -> Iterator[S3BatchOperationTask]: + """Get s3 batch operation tasks""" + for task in self["tasks"]: + yield S3BatchOperationTask(task) + + @property + def task(self) -> S3BatchOperationTask: + """Get the first s3 batch operation task""" + return next(self.tasks) + + @property + def job(self) -> S3BatchOperationJob: + """Get the s3 batch operation job""" + return S3BatchOperationJob(self["job"]) diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index db3aa524377..c036490ec53 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -68,6 +68,7 @@ S3Model, S3RecordModel, ) +from .s3_batch_operation import S3BatchOperationJobModel, S3BatchOperationModel, S3BatchOperationTaskModel from .s3_event_notification import ( S3SqsEventNotificationModel, S3SqsEventNotificationRecordModel, @@ -177,4 +178,7 @@ "BedrockAgentEventModel", "BedrockAgentRequestBodyModel", "BedrockAgentRequestMediaModel", + "S3BatchOperationJobModel", + "S3BatchOperationModel", + "S3BatchOperationTaskModel", ] diff --git a/aws_lambda_powertools/utilities/parser/models/s3_batch_operation.py b/aws_lambda_powertools/utilities/parser/models/s3_batch_operation.py new file mode 100644 index 00000000000..1b7961999bd --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/models/s3_batch_operation.py @@ -0,0 +1,34 @@ +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, validator + +from aws_lambda_powertools.utilities.parser.types import Literal + + +class S3BatchOperationTaskModel(BaseModel): + taskId: str + s3Key: str + s3VersionId: Optional[str] = None + s3BucketArn: Optional[str] = None + s3Bucket: Optional[str] = None + + @validator("s3Bucket", pre=True, always=True) + def validate_bucket(cls, current_value, values): + # Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') + # or from 's3BucketArn' (invocationSchemaVersion '1.0') + if values.get("s3BucketArn") and not current_value: + # Replace s3Bucket value with the value from s3BucketArn + return values["s3BucketArn"].split(":::")[-1] + return current_value + + +class S3BatchOperationJobModel(BaseModel): + id: str + userArguments: Optional[Dict[str, Any]] = None + + +class S3BatchOperationModel(BaseModel): + invocationId: str + invocationSchemaVersion: Literal["1.0", "2.0"] + job: S3BatchOperationJobModel + tasks: List[S3BatchOperationTaskModel] diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 37d3725967c..97b7a5dfda2 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -75,7 +75,7 @@ Log Data Event for Troubleshooting ## Supported event sources | Event Source | Data_class | -| ------------------------------------------------------------------------- | -------------------------------------------------- | +|---------------------------------------------------------------------------|----------------------------------------------------| | [Active MQ](#active-mq) | `ActiveMQEvent` | | [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` | | [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` | @@ -99,6 +99,7 @@ Log Data Event for Troubleshooting | [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` | | [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` | | [S3](#s3) | `S3Event` | +| [S3 Batch Operations](#s3-batch-operations) | `S3BatchOperationEvent` | | [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` | | [S3 EventBridge Notification](#s3-eventbridge-notification) | `S3EventBridgeNotificationEvent` | | [SES](#ses) | `SESEvent` | @@ -1076,6 +1077,16 @@ for more details. do_something_with(f"{bucket_name}/{object_key}") ``` +### S3 Batch Operations + +This example is based on the AWS S3 Batch Operations documentation [Example Lambda function for S3 Batch Operations](https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html){target="_blank"}. + +=== "app.py" + + ```python hl_lines="4 8 10 20 25 27 29 33" + --8<-- "examples/event_sources/src/s3_batch_operation.py" + ``` + ### S3 Object Lambda This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3](https://aws.amazon.com/blogs/aws/introducing-amazon-s3-object-lambda-use-your-code-to-process-data-as-it-is-being-retrieved-from-s3/){target="_blank"}. diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md index bfd64f8b7ef..4a91d5aa13c 100644 --- a/docs/utilities/parser.md +++ b/docs/utilities/parser.md @@ -191,6 +191,7 @@ Parser comes with the following built-in models: | **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose | | **KinesisFirehoseSqsModel** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records | | **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload | +| **S3BatchOperationModel** | Lambda Event Source payload for Amazon S3 Batch Operation | | **S3EventNotificationEventBridgeModel** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. | | **S3Model** | Lambda Event Source payload for Amazon S3 | | **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda | diff --git a/examples/event_sources/src/s3_batch_operation.py b/examples/event_sources/src/s3_batch_operation.py new file mode 100644 index 00000000000..e292d8cae47 --- /dev/null +++ b/examples/event_sources/src/s3_batch_operation.py @@ -0,0 +1,37 @@ +import boto3 +from botocore.exceptions import ClientError + +from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent, S3BatchOperationResponse, event_source +from aws_lambda_powertools.utilities.typing import LambdaContext + + +@event_source(data_class=S3BatchOperationEvent) +def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): + response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure") + + task = event.task + src_key: str = task.s3_key + src_bucket: str = task.s3_bucket + + s3 = boto3.client("s3", region_name="us-east-1") + + try: + dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) + result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") + except ClientError as e: + error_code = e.response["Error"]["Code"] + error_message = e.response["Error"]["Message"] + if error_code == "RequestTimeout": + result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.") + else: + result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") + except Exception as e: + result = task.build_task_batch_response("PermanentFailure", str(e)) + finally: + response.add_result(result) + + return response.asdict() + + +def do_some_work(s3_client, src_bucket: str, src_key: str): + ... diff --git a/tests/events/s3BatchOperationEventSchemaV1.json b/tests/events/s3BatchOperationEventSchemaV1.json new file mode 100644 index 00000000000..8a7bcabf590 --- /dev/null +++ b/tests/events/s3BatchOperationEventSchemaV1.json @@ -0,0 +1,15 @@ +{ + "invocationSchemaVersion": "1.0", + "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", + "job": { + "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" + }, + "tasks": [ + { + "taskId": "dGFza2lkZ29lc2hlcmUK", + "s3Key": "prefix/dataset/dataset.20231222.json.gz", + "s3VersionId": "1", + "s3BucketArn": "arn:aws:s3:::powertools-dataset" + } + ] +} \ No newline at end of file diff --git a/tests/events/s3BatchOperationEventSchemaV2.json b/tests/events/s3BatchOperationEventSchemaV2.json new file mode 100644 index 00000000000..720dd1f0cf0 --- /dev/null +++ b/tests/events/s3BatchOperationEventSchemaV2.json @@ -0,0 +1,19 @@ +{ + "invocationSchemaVersion": "2.0", + "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", + "job": { + "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce", + "userArguments": { + "k1": "v1", + "k2": "v2" + } + }, + "tasks": [ + { + "taskId": "dGFza2lkZ29lc2hlcmUK", + "s3Key": "prefix/dataset/dataset.20231222.json.gz", + "s3VersionId": null, + "s3Bucket": "powertools-dataset" + } + ] +} \ No newline at end of file diff --git a/tests/unit/data_classes/test_s3_batch_operation_event.py b/tests/unit/data_classes/test_s3_batch_operation_event.py new file mode 100644 index 00000000000..ca0d4ae635c --- /dev/null +++ b/tests/unit/data_classes/test_s3_batch_operation_event.py @@ -0,0 +1,60 @@ +from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent +from tests.functional.utils import load_event + + +def test_s3_batch_operation_schema_v1(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + tasks = list(parsed_event.tasks) + assert len(tasks) == 1 + task = tasks[0] + task_raw = raw_event["tasks"][0] + + assert task.task_id == task_raw["taskId"] + assert task.s3_version_id == task_raw["s3VersionId"] + assert task.s3_bucket_arn == task_raw["s3BucketArn"] + assert task.s3_bucket == task_raw["s3BucketArn"].split(":::")[-1] + assert task.s3_key == task_raw["s3Key"] + + job = parsed_event.job + assert job.get_id == raw_event["job"]["id"] + assert job.user_arguments is None + + assert parsed_event.invocation_schema_version == raw_event["invocationSchemaVersion"] + assert parsed_event.invocation_id == raw_event["invocationId"] + + +def test_s3_batch_operation_schema_v2(): + raw_event = load_event("s3BatchOperationEventSchemaV2.json") + parsed_event = S3BatchOperationEvent(raw_event) + + tasks = list(parsed_event.tasks) + assert len(tasks) == 1 + task = tasks[0] + task_raw = raw_event["tasks"][0] + + assert task.task_id == task_raw["taskId"] + assert task.s3_version_id == task_raw["s3VersionId"] + assert task.s3_bucket_arn is None + assert task.s3_bucket == task_raw["s3Bucket"] + assert task.s3_key == task_raw["s3Key"] + + job = parsed_event.job + assert job.get_id == raw_event["job"]["id"] + assert job.user_arguments == raw_event["job"]["userArguments"] + + assert parsed_event.invocation_schema_version == raw_event["invocationSchemaVersion"] + assert parsed_event.invocation_id == raw_event["invocationId"] + + +def test_s3_task_has_key_with_spaces(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + + # When the inventory file is provided, the key must be url encoded + # and the file is will contain "object%20key%20with%20spaces.csv" + # however the event is sent with s3Key as "object+key+with+spaces.csv" + raw_event["tasks"][0]["s3Key"] = "object+key+with+spaces.csv" + parsed_event = S3BatchOperationEvent(raw_event) + + assert parsed_event.task.s3_key == "object key with spaces.csv" diff --git a/tests/unit/data_classes/test_s3_batch_operation_response.py b/tests/unit/data_classes/test_s3_batch_operation_response.py new file mode 100644 index 00000000000..c7106e0bfb7 --- /dev/null +++ b/tests/unit/data_classes/test_s3_batch_operation_response.py @@ -0,0 +1,152 @@ +import pytest + +from aws_lambda_powertools.utilities.data_classes import ( + S3BatchOperationEvent, + S3BatchOperationResponse, + S3BatchOperationResponseRecord, +) +from tests.functional.utils import load_event + + +def test_result_as_succeeded(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + task = parsed_event.task + + result_string = "Successfully processed" + result = task.build_task_batch_response("Succeeded", result_string) + + assert_result(result, parsed_event.task.task_id, "Succeeded", result_string) + + +def test_result_as_temporary_failure(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + task = parsed_event.task + + result_string = "Temporary failure" + result = task.build_task_batch_response("TemporaryFailure", result_string) + + assert_result(result, parsed_event.task.task_id, "TemporaryFailure", result_string) + + +def test_result_as_permanent_failure(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + task = parsed_event.task + + result_string = "Permanent failure" + result = task.build_task_batch_response("PermanentFailure", result_string) + + assert_result(result, parsed_event.task.task_id, "PermanentFailure", result_string) + + +def assert_result( + result: S3BatchOperationResponseRecord, + task_id: str, + expected_result_code: str, + expected_result_string: str, +): + assert result.result_code == expected_result_code + assert result.result_string == expected_result_string + + meta = result.asdict() + + assert meta == { + "taskId": task_id, + "resultCode": expected_result_code, + "resultString": expected_result_string, + } + + +def test_response(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + task = parsed_event.task + + response = S3BatchOperationResponse( + parsed_event.invocation_schema_version, + parsed_event.invocation_id, + "PermanentFailure", + ) + + result_string = "Successfully processed" + result = task.build_task_batch_response("Succeeded", result_string) + + response.add_result(result) + + assert len(response.results) == 1 + assert response.treat_missing_keys_as == "PermanentFailure" + assert response.invocation_schema_version == parsed_event.invocation_schema_version + assert response.invocation_id == parsed_event.invocation_id + + assert response.asdict() == { + "invocationSchemaVersion": parsed_event.invocation_schema_version, + "treatMissingKeysAs": "PermanentFailure", + "invocationId": parsed_event.invocation_id, + "results": [ + { + "taskId": result.task_id, + "resultCode": result.result_code, + "resultString": result.result_string, + }, + ], + } + + +def test_response_multiple_results(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + task = parsed_event.task + + response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id, "Succeeded") + + result_string = "Successfully processed" + result = task.build_task_batch_response("Succeeded", result_string) + + response.add_result(result) + + # add another result + response.add_result(result) + + with pytest.raises(ValueError, match=r"Response must have exactly one result, but got *"): + response.asdict() + + +def test_response_no_results(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id, "Succeeded") + + with pytest.raises(ValueError, match=r"Response must have exactly one result, but got *"): + response.asdict() + + +def test_invalid_treating_missing_key(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + with pytest.warns(UserWarning, match="The value *"): + S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id, "invalid_value") + + +def test_invalid_record_status(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event = S3BatchOperationEvent(raw_event) + + task = parsed_event.task + + response = S3BatchOperationResponse(parsed_event.invocation_schema_version, parsed_event.invocation_id, "Succeeded") + + result_string = "Successfully processed" + result = task.build_task_batch_response("invalid_value", result_string) + response.add_result(result) + + with pytest.warns(UserWarning, match="The resultCode *"): + response.asdict() diff --git a/tests/unit/parser/test_s3_batch_operation.py b/tests/unit/parser/test_s3_batch_operation.py new file mode 100644 index 00000000000..9a27ccc4ef1 --- /dev/null +++ b/tests/unit/parser/test_s3_batch_operation.py @@ -0,0 +1,38 @@ +from aws_lambda_powertools.utilities.parser.models import S3BatchOperationModel +from tests.functional.utils import load_event + + +def test_s3_batch_operation_v1_trigger_event(): + raw_event = load_event("s3BatchOperationEventSchemaV1.json") + parsed_event: S3BatchOperationModel = S3BatchOperationModel(**raw_event) + + tasks = list(parsed_event.tasks) + assert len(tasks) == 1 + + assert parsed_event.invocationId == raw_event["invocationId"] + assert parsed_event.invocationSchemaVersion == raw_event["invocationSchemaVersion"] + assert parsed_event.job.id == raw_event["job"]["id"] + + assert tasks[0].taskId == raw_event["tasks"][0]["taskId"] + assert tasks[0].s3Key == raw_event["tasks"][0]["s3Key"] + assert tasks[0].s3VersionId == raw_event["tasks"][0]["s3VersionId"] + assert tasks[0].s3BucketArn == raw_event["tasks"][0]["s3BucketArn"] + assert tasks[0].s3Bucket == "powertools-dataset" + + +def test_s3_batch_operation_v2_trigger_event(): + raw_event = load_event("s3BatchOperationEventSchemaV2.json") + parsed_event: S3BatchOperationModel = S3BatchOperationModel(**raw_event) + + tasks = list(parsed_event.tasks) + assert len(tasks) == 1 + + assert parsed_event.invocationId == raw_event["invocationId"] + assert parsed_event.invocationSchemaVersion == raw_event["invocationSchemaVersion"] + assert parsed_event.job.id == raw_event["job"]["id"] + assert parsed_event.job.userArguments == raw_event["job"]["userArguments"] + + assert tasks[0].taskId == raw_event["tasks"][0]["taskId"] + assert tasks[0].s3Key == raw_event["tasks"][0]["s3Key"] + assert tasks[0].s3VersionId == raw_event["tasks"][0]["s3VersionId"] + assert tasks[0].s3Bucket == raw_event["tasks"][0]["s3Bucket"] diff --git a/tests/unit/parser/test_s3 object_event.py b/tests/unit/parser/test_s3_object_event.py similarity index 100% rename from tests/unit/parser/test_s3 object_event.py rename to tests/unit/parser/test_s3_object_event.py