Skip to content

Commit

Permalink
feat(event_source): Add support for S3 batch operations (#3572)
Browse files Browse the repository at this point in the history
* Add support for S3 Batch Operations event and response along with unit tests. This seamlessly support both schema 1.0 and 2.0

A few notes:
- S3BatchOperationXXX or S3BatchOperationsXXX ?
- s3 key is not url-encoded in real life despite what the documentation implies. Need to test with some keys that contain spaces, etc...
- S3BatchOperationResult has some factory methods to simplifies building
- S3BatchOperationEvent may need to as it makes initialization needlessly complicated

* Add documentation with example based on the AWS S3 documentation

* Use unquote_plus and add unit test for key encoded with space

* Initial refactor

* Changing the DX to improve usability

* Documentation

* Adding parser

* Small refactor

* Addressing Ruben's feedback - Docs and examples

* Addressing Ruben's feedback - Docs and examples

* Addressing Ruben's feedback - Code

* Addressing Ruben's feedback - Code

---------

Co-authored-by: Leandro Damascena <[email protected]>
  • Loading branch information
sbailliez and leandrodamascena authored Jan 17, 2024
1 parent 9afbc78 commit c466c80
Show file tree
Hide file tree
Showing 13 changed files with 622 additions and 1 deletion.
8 changes: 8 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_batch_operation_event import (
S3BatchOperationEvent,
S3BatchOperationResponse,
S3BatchOperationResponseRecord,
)
from .s3_event import S3Event, S3EventBridgeNotificationEvent
from .secrets_manager_event import SecretsManagerEvent
from .ses_event import SESEvent
Expand Down Expand Up @@ -52,6 +57,9 @@
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
"S3BatchOperationEvent",
"S3BatchOperationResponse",
"S3BatchOperationResponseRecord",
"SESEvent",
"SNSEvent",
"SQSEvent",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import warnings
from dataclasses import dataclass, field
from typing import Any, Dict, Iterator, List, Optional, Tuple
from urllib.parse import unquote_plus

from aws_lambda_powertools.shared.types import Literal
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper

# list of valid result code. Used both in S3BatchOperationResponse and S3BatchOperationResponseRecord
VALID_RESULT_CODES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure")
RESULT_CODE_TYPE = Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]


@dataclass(repr=False, order=False)
class S3BatchOperationResponseRecord:
task_id: str
result_code: RESULT_CODE_TYPE
result_string: Optional[str] = None

def asdict(self) -> Dict[str, Any]:
if self.result_code not in VALID_RESULT_CODES:
warnings.warn(
stacklevel=2,
message=f"The resultCode {self.result_code} is not valid. "
f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.",
)

return {
"taskId": self.task_id,
"resultCode": self.result_code,
"resultString": self.result_string,
}


@dataclass(repr=False, order=False)
class S3BatchOperationResponse:
"""S3 Batch Operations response object
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions
- https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion
Parameters
----------
invocation_schema_version : str
Specifies the schema version for the payload that Batch Operations sends when invoking
an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event.
invocation_id : str
The identifier of the invocation request. This must be copied from the event.
treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]
Undocumented parameter, defaults to "Succeeded"
results : List[S3BatchOperationResult]
Results of each S3 Batch Operations task,
optional parameter at start. Can be added later using `add_result` function.
Examples
--------
**S3 Batch Operations**
```python
import boto3
from botocore.exceptions import ClientError
from aws_lambda_powertools.utilities.data_classes import (
S3BatchOperationEvent,
S3BatchOperationResponse,
event_source
)
from aws_lambda_powertools.utilities.typing import LambdaContext
@event_source(data_class=S3BatchOperationEvent)
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext):
response = S3BatchOperationResponse(
event.invocation_schema_version,
event.invocation_id,
"PermanentFailure"
)
result = None
task = event.task
src_key: str = task.s3_key
src_bucket: str = task.s3_bucket
s3 = boto3.client("s3", region_name='us-east-1')
try:
dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key)
result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}")
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
if error_code == 'RequestTimeout':
result = task.build_task_batch_response("TemporaryFailure", "Timeout - trying again")
else:
result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}")
except Exception as e:
result = task.build_task_batch_response("PermanentFailure", str(e))
finally:
response.add_result(result)
return response.asdict()
```
"""

invocation_schema_version: str
invocation_id: str
treat_missing_keys_as: RESULT_CODE_TYPE = "Succeeded"
results: List[S3BatchOperationResponseRecord] = field(default_factory=list)

def __post_init__(self):
if self.treat_missing_keys_as not in VALID_RESULT_CODES:
warnings.warn(
stacklevel=2,
message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, "
f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.",
)

def add_result(self, result: S3BatchOperationResponseRecord):
self.results.append(result)

def asdict(self) -> Dict:
result_count = len(self.results)

if result_count != 1:
raise ValueError(f"Response must have exactly one result, but got {result_count}")

return {
"invocationSchemaVersion": self.invocation_schema_version,
"treatMissingKeysAs": self.treat_missing_keys_as,
"invocationId": self.invocation_id,
"results": [result.asdict() for result in self.results],
}


class S3BatchOperationJob(DictWrapper):
@property
def get_id(self) -> str:
# Note: this name conflicts with existing python builtins
return self["id"]

@property
def user_arguments(self) -> Optional[Dict[str, str]]:
"""Get user arguments provided for this job (only for invocation schema 2.0)"""
return self.get("userArguments")


class S3BatchOperationTask(DictWrapper):
@property
def task_id(self) -> str:
"""Get the task id"""
return self["taskId"]

@property
def s3_key(self) -> str:
"""Get the object key using unquote_plus"""
return unquote_plus(self["s3Key"])

@property
def s3_version_id(self) -> Optional[str]:
"""Object version if bucket is versioning-enabled, otherwise null"""
return self.get("s3VersionId")

@property
def s3_bucket_arn(self) -> Optional[str]:
"""Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')"""
return self.get("s3BucketArn")

@property
def s3_bucket(self) -> str:
"""
Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0')
or from 's3BucketArn' (invocationSchemaVersion '1.0')
"""
if self.s3_bucket_arn:
return self.s3_bucket_arn.split(":::")[-1]
return self["s3Bucket"]

def build_task_batch_response(
self,
result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded",
result_string: str = "",
) -> S3BatchOperationResponseRecord:
"""Create a S3BatchOperationResponseRecord directly using the task_id and given values
Parameters
----------
result_code : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded"
task result, supported value: "Succeeded", "TemporaryFailure", "PermanentFailure"
result_string : str
string to identify in the report
"""
return S3BatchOperationResponseRecord(
task_id=self.task_id,
result_code=result_code,
result_string=result_string,
)


class S3BatchOperationEvent(DictWrapper):
"""Amazon S3BatchOperation Event
Documentation:
--------------
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html
"""

@property
def invocation_id(self) -> str:
"""Get the identifier of the invocation request"""
return self["invocationId"]

@property
def invocation_schema_version(self) -> Literal["1.0", "2.0"]:
"""
Get the schema version for the payload that Batch Operations sends when invoking an
AWS Lambda function. Either '1.0' or '2.0'.
"""
return self["invocationSchemaVersion"]

@property
def tasks(self) -> Iterator[S3BatchOperationTask]:
"""Get s3 batch operation tasks"""
for task in self["tasks"]:
yield S3BatchOperationTask(task)

@property
def task(self) -> S3BatchOperationTask:
"""Get the first s3 batch operation task"""
return next(self.tasks)

@property
def job(self) -> S3BatchOperationJob:
"""Get the s3 batch operation job"""
return S3BatchOperationJob(self["job"])
4 changes: 4 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
S3Model,
S3RecordModel,
)
from .s3_batch_operation import S3BatchOperationJobModel, S3BatchOperationModel, S3BatchOperationTaskModel
from .s3_event_notification import (
S3SqsEventNotificationModel,
S3SqsEventNotificationRecordModel,
Expand Down Expand Up @@ -177,4 +178,7 @@
"BedrockAgentEventModel",
"BedrockAgentRequestBodyModel",
"BedrockAgentRequestMediaModel",
"S3BatchOperationJobModel",
"S3BatchOperationModel",
"S3BatchOperationTaskModel",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, validator

from aws_lambda_powertools.utilities.parser.types import Literal


class S3BatchOperationTaskModel(BaseModel):
taskId: str
s3Key: str
s3VersionId: Optional[str] = None
s3BucketArn: Optional[str] = None
s3Bucket: Optional[str] = None

@validator("s3Bucket", pre=True, always=True)
def validate_bucket(cls, current_value, values):
# Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0')
# or from 's3BucketArn' (invocationSchemaVersion '1.0')
if values.get("s3BucketArn") and not current_value:
# Replace s3Bucket value with the value from s3BucketArn
return values["s3BucketArn"].split(":::")[-1]
return current_value


class S3BatchOperationJobModel(BaseModel):
id: str
userArguments: Optional[Dict[str, Any]] = None


class S3BatchOperationModel(BaseModel):
invocationId: str
invocationSchemaVersion: Literal["1.0", "2.0"]
job: S3BatchOperationJobModel
tasks: List[S3BatchOperationTaskModel]
13 changes: 12 additions & 1 deletion docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Log Data Event for Troubleshooting
## Supported event sources

| Event Source | Data_class |
| ------------------------------------------------------------------------- | -------------------------------------------------- |
|---------------------------------------------------------------------------|----------------------------------------------------|
| [Active MQ](#active-mq) | `ActiveMQEvent` |
| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` |
| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` |
Expand All @@ -99,6 +99,7 @@ Log Data Event for Troubleshooting
| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` |
| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` |
| [S3](#s3) | `S3Event` |
| [S3 Batch Operations](#s3-batch-operations) | `S3BatchOperationEvent` |
| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` |
| [S3 EventBridge Notification](#s3-eventbridge-notification) | `S3EventBridgeNotificationEvent` |
| [SES](#ses) | `SESEvent` |
Expand Down Expand Up @@ -1076,6 +1077,16 @@ for more details.
do_something_with(f"{bucket_name}/{object_key}")
```

### S3 Batch Operations

This example is based on the AWS S3 Batch Operations documentation [Example Lambda function for S3 Batch Operations](https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html){target="_blank"}.

=== "app.py"

```python hl_lines="4 8 10 20 25 27 29 33"
--8<-- "examples/event_sources/src/s3_batch_operation.py"
```

### S3 Object Lambda

This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3](https://aws.amazon.com/blogs/aws/introducing-amazon-s3-object-lambda-use-your-code-to-process-data-as-it-is-being-retrieved-from-s3/){target="_blank"}.
Expand Down
1 change: 1 addition & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ Parser comes with the following built-in models:
| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose |
| **KinesisFirehoseSqsModel** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records |
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
| **S3BatchOperationModel** | Lambda Event Source payload for Amazon S3 Batch Operation |
| **S3EventNotificationEventBridgeModel** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. |
| **S3Model** | Lambda Event Source payload for Amazon S3 |
| **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda |
Expand Down
37 changes: 37 additions & 0 deletions examples/event_sources/src/s3_batch_operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import boto3
from botocore.exceptions import ClientError

from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent, S3BatchOperationResponse, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=S3BatchOperationEvent)
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext):
response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure")

task = event.task
src_key: str = task.s3_key
src_bucket: str = task.s3_bucket

s3 = boto3.client("s3", region_name="us-east-1")

try:
dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key)
result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}")
except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
if error_code == "RequestTimeout":
result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.")
else:
result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}")
except Exception as e:
result = task.build_task_batch_response("PermanentFailure", str(e))
finally:
response.add_result(result)

return response.asdict()


def do_some_work(s3_client, src_bucket: str, src_key: str):
...
Loading

0 comments on commit c466c80

Please sign in to comment.