title | description |
---|---|
Event Source Data Classes |
Utility |
Event Source Data Classes utility provides classes self-describing Lambda event sources.
- Type hinting and code completion for common event types
- Helper functions for decoding/deserializing nested fields
- Docstrings for fields contained in event schemas
Background
When authoring Lambda functions, you often need to understand the schema of the event dictionary which is passed to the handler. There are several common event types which follow a specific schema, depending on the service triggering the Lambda function.
The classes are initialized by passing in the Lambda event object into the constructor of the appropriate data class or
by using the event_source
decorator.
For example, if your Lambda function is being triggered by an API Gateway proxy integration, you can use the
APIGatewayProxyEvent
class.
=== "app.py"
```python hl_lines="1 4"
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent
def lambda_handler(event: dict, context):
event = APIGatewayProxyEvent(event)
if 'helloworld' in event.path and event.http_method == 'GET':
do_something_with(event.body, user)
```
Same example as above, but using the event_source
decorator
=== "app.py"
```python hl_lines="1 3"
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent
@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
if 'helloworld' in event.path and event.http_method == 'GET':
do_something_with(event.body, user)
```
Autocomplete with self-documented properties and methods
Event Source | Data_class |
---|---|
Active MQ | ActiveMQEvent |
API Gateway Authorizer | APIGatewayAuthorizerRequestEvent |
API Gateway Authorizer V2 | APIGatewayAuthorizerEventV2 |
API Gateway Proxy | APIGatewayProxyEvent |
API Gateway Proxy V2 | APIGatewayProxyEventV2 |
Application Load Balancer | ALBEvent |
AppSync Authorizer | AppSyncAuthorizerEvent |
AppSync Resolver | AppSyncResolverEvent |
CloudWatch Dashboard Custom Widget | CloudWatchDashboardCustomWidgetEvent |
CloudWatch Logs | CloudWatchLogsEvent |
CodePipeline Job Event | CodePipelineJobEvent |
Cognito User Pool | Multiple available under cognito_user_pool_event |
Connect Contact Flow | ConnectContactFlowEvent |
DynamoDB streams | DynamoDBStreamEvent , DynamoDBRecordEventName |
EventBridge | EventBridgeEvent |
Kafka | KafkaEvent |
Kinesis Data Stream | KinesisStreamEvent |
Kinesis Firehose Delivery Stream | KinesisFirehoseEvent |
Lambda Function URL | LambdaFunctionUrlEvent |
Rabbit MQ | RabbitMQEvent |
S3 | S3Event |
S3 Object Lambda | S3ObjectLambdaEvent |
SES | SESEvent |
SNS | SNSEvent |
SQS | SQSEvent |
???+ info The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of documentation inherently (via autocompletion, types and docstrings).
It is used for Active MQ payloads{target="_blank"}, also see the AWS blog post{target="_blank"} for more details.
=== "app.py"
```python hl_lines="4-5 9-10"
from typing import Dict
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent
logger = Logger()
@event_source(data_class=ActiveMQEvent)
def lambda_handler(event: ActiveMQEvent, context):
for message in event.messages:
logger.debug(f"MessageID: {message.message_id}")
data: Dict = message.json_data
logger.debug("Process json in base64 encoded data str", data)
```
New in 1.20.0
It is used for API Gateway Rest API Lambda Authorizer payload{target="_blank"}.
Use APIGatewayAuthorizerRequestEvent
for type REQUEST
and APIGatewayAuthorizerTokenEvent
for type TOKEN
.
=== "app_type_request.py"
This example uses the `APIGatewayAuthorizerResponse` to decline a given request if the user is not found.
When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users.
```python hl_lines="2-6 29 36-42 47 49"
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
DENY_ALL_RESPONSE,
APIGatewayAuthorizerRequestEvent,
APIGatewayAuthorizerResponse,
HttpVerb,
)
from secrets import compare_digest
def get_user_by_token(token):
if compare_digest(token, "admin-foo"):
return {"id": 0, "name": "Admin", "isAdmin": True}
elif compare_digest(token, "regular-foo"):
return {"id": 1, "name": "Joe"}
else:
return None
@event_source(data_class=APIGatewayAuthorizerRequestEvent)
def handler(event: APIGatewayAuthorizerRequestEvent, context):
user = get_user_by_token(event.get_header_value("Authorization"))
if user is None:
# No user was found
# to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics
# raise Exception("Unauthorized")
# to return 403 - `{"message":"Forbidden"}`
return DENY_ALL_RESPONSE
# parse the `methodArn` as an `APIGatewayRouteArn`
arn = event.parsed_arn
# Create the response builder from parts of the `methodArn`
# and set the logged in user id and context
policy = APIGatewayAuthorizerResponse(
principal_id=user["id"],
context=user,
region=arn.region,
aws_account_id=arn.aws_account_id,
api_id=arn.api_id,
stage=arn.stage,
)
# Conditional IAM Policy
if user.get("isAdmin", False):
policy.allow_all_routes()
else:
policy.allow_route(HttpVerb.GET, "/user-profile")
return policy.asdict()
```
=== "app_type_token.py"
```python hl_lines="2-5 12-18 21 23-24"
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
APIGatewayAuthorizerTokenEvent,
APIGatewayAuthorizerResponse,
)
@event_source(data_class=APIGatewayAuthorizerTokenEvent)
def handler(event: APIGatewayAuthorizerTokenEvent, context):
arn = event.parsed_arn
policy = APIGatewayAuthorizerResponse(
principal_id="user",
region=arn.region,
aws_account_id=arn.aws_account_id,
api_id=arn.api_id,
stage=arn.stage
)
if event.authorization_token == "42":
policy.allow_all_routes()
else:
policy.deny_all_routes()
return policy.asdict()
```
New in 1.20.0
It is used for API Gateway HTTP API Lambda Authorizer payload version 2{target="_blank"}. See also this blog post{target="_blank"} for more details.
=== "app.py"
This example looks up user details via `x-token` header. It uses `APIGatewayAuthorizerResponseV2` to return a deny policy when user is not found or authorized.
```python hl_lines="2-5 21 24"
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
APIGatewayAuthorizerEventV2,
APIGatewayAuthorizerResponseV2,
)
from secrets import compare_digest
def get_user_by_token(token):
if compare_digest(token, "Foo"):
return {"name": "Foo"}
return None
@event_source(data_class=APIGatewayAuthorizerEventV2)
def handler(event: APIGatewayAuthorizerEventV2, context):
user = get_user_by_token(event.get_header_value("x-token"))
if user is None:
# No user was found, so we return not authorized
return APIGatewayAuthorizerResponseV2().asdict()
# Found the user and setting the details in the context
return APIGatewayAuthorizerResponseV2(authorize=True, context=user).asdict()
```
It is used for either API Gateway REST API or HTTP API using v1 proxy event.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent
@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
if "helloworld" in event.path and event.http_method == "GET":
request_context = event.request_context
identity = request_context.identity
user = identity.user
do_something_with(event.json_body, user)
```
It is used for HTTP API using v2 proxy event.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEventV2
@event_source(data_class=APIGatewayProxyEventV2)
def lambda_handler(event: APIGatewayProxyEventV2, context):
if "helloworld" in event.path and event.http_method == "POST":
do_something_with(event.json_body, event.query_string_parameters)
```
Is it used for Application load balancer event.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, ALBEvent
@event_source(data_class=ALBEvent)
def lambda_handler(event: ALBEvent, context):
if "helloworld" in event.path and event.http_method == "POST":
do_something_with(event.json_body, event.query_string_parameters)
```
New in 1.20.0
Used when building an AWS_LAMBDA Authorization{target="_blank"} with AppSync. See blog post Introducing Lambda authorization for AWS AppSync GraphQL APIs{target="_blank"} or read the Amplify documentation on using AWS Lambda for authorization{target="_blank"} with AppSync.
In this example extract the requestId
as the correlation_id
for logging, used @event_source
decorator and builds the AppSync authorizer using the AppSyncAuthorizerResponse
helper.
=== "app.py"
```python
from typing import Dict
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.logging.logger import Logger
from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import (
AppSyncAuthorizerEvent,
AppSyncAuthorizerResponse,
)
from aws_lambda_powertools.utilities.data_classes.event_source import event_source
logger = Logger()
def get_user_by_token(token: str):
"""Look a user by token"""
...
@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER)
@event_source(data_class=AppSyncAuthorizerEvent)
def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict:
user = get_user_by_token(event.authorization_token)
if not user:
# No user found, return not authorized
return AppSyncAuthorizerResponse().asdict()
return AppSyncAuthorizerResponse(
authorize=True,
resolver_context={"id": user.id},
# Only allow admins to delete events
deny_fields=None if user.is_admin else ["Mutation.deleteEvent"],
).asdict()
```
New in 1.12.0
Used when building Lambda GraphQL Resolvers with Amplify GraphQL Transform Library{target="_blank"} (@function
),
and AppSync Direct Lambda Resolvers{target="_blank"}.
In this example, we also use the new Logger correlation_id
and built-in correlation_paths
to extract, if available, X-Ray Trace ID in AppSync request headers:
=== "app.py"
```python hl_lines="2-5 12 14 19 21 29-30"
from aws_lambda_powertools.logging import Logger, correlation_paths
from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import (
AppSyncResolverEvent,
AppSyncIdentityCognito
)
logger = Logger()
def get_locations(name: str = None, size: int = 0, page: int = 0):
"""Your resolver logic here"""
@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)
def lambda_handler(event, context):
event: AppSyncResolverEvent = AppSyncResolverEvent(event)
# Case insensitive look up of request headers
x_forwarded_for = event.get_header_value("x-forwarded-for")
# Support for AppSyncIdentityCognito or AppSyncIdentityIAM identity types
assert isinstance(event.identity, AppSyncIdentityCognito)
identity: AppSyncIdentityCognito = event.identity
# Logging with correlation_id
logger.debug({
"x-forwarded-for": x_forwarded_for,
"username": identity.username
})
if event.type_name == "Merchant" and event.field_name == "locations":
return get_locations(**event.arguments)
raise ValueError(f"Unsupported field resolver: {event.field_name}")
```
=== "Example AppSync Event"
```json hl_lines="2-8 14 19 20"
{
"typeName": "Merchant",
"fieldName": "locations",
"arguments": {
"page": 2,
"size": 1,
"name": "value"
},
"identity": {
"claims": {
"iat": 1615366261
...
},
"username": "mike",
...
},
"request": {
"headers": {
"x-amzn-trace-id": "Root=1-60488877-0b0c4e6727ab2a1c545babd0",
"x-forwarded-for": "127.0.0.1"
...
}
},
...
}
```
=== "Example CloudWatch Log"
```json hl_lines="5 6 16"
{
"level":"DEBUG",
"location":"lambda_handler:22",
"message":{
"x-forwarded-for":"127.0.0.1",
"username":"mike"
},
"timestamp":"2021-03-10 12:38:40,062",
"service":"service_undefined",
"sampling_rate":0.0,
"cold_start":true,
"function_name":"func_name",
"function_memory_size":512,
"function_arn":"func_arn",
"function_request_id":"6735a29c-c000-4ae3-94e6-1f1c934f7f94",
"correlation_id":"Root=1-60488877-0b0c4e6727ab2a1c545babd0"
}
```
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchDashboardCustomWidgetEvent
const DOCS = `
## Echo
A simple echo script. Anything passed in \`\`\`echo\`\`\` parameter is returned as the content of custom widget.
### Widget parameters
Param | Description
---|---
**echo** | The content to echo back
### Example parameters
\`\`\` yaml
echo: <h1>Hello world</h1>
\`\`\`
`
@event_source(data_class=CloudWatchDashboardCustomWidgetEvent)
def lambda_handler(event: CloudWatchDashboardCustomWidgetEvent, context):
if event.describe:
return DOCS
# You can directly return HTML or JSON content
# Alternatively, you can return markdown that will be rendered by CloudWatch
echo = event.widget_context.params["echo"]
return { "markdown": f"# {echo}" }
```
CloudWatch Logs events by default are compressed and base64 encoded. You can use the helper function provided to decode, decompress and parse json data from the event.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchLogsEvent
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
@event_source(data_class=CloudWatchLogsEvent)
def lambda_handler(event: CloudWatchLogsEvent, context):
decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data
log_events = decompressed_log.log_events
for event in log_events:
do_something_with(event.timestamp, event.message)
```
Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda
=== "app.py"
```python
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source, CodePipelineJobEvent
logger = Logger()
@event_source(data_class=CodePipelineJobEvent)
def lambda_handler(event, context):
"""The Lambda function handler
If a continuing job then checks the CloudFormation stack status
and updates the job accordingly.
If a new job then kick of an update or creation of the target
CloudFormation stack.
"""
# Extract the Job ID
job_id = event.get_id
# Extract the params
params: dict = event.decoded_user_parameters
stack = params["stack"]
artifact_name = params["artifact"]
template_file = params["file"]
try:
if event.data.continuation_token:
# If we're continuing then the create/update has already been triggered
# we just need to check if it has finished.
check_stack_update_status(job_id, stack)
else:
template = event.get_artifact(artifact_name, template_file)
# Kick off a stack update or create
start_update_or_create(job_id, stack, template)
except Exception as e:
# If any other exceptions which we didn't expect are raised
# then fail the job and log the exception message.
logger.exception("Function failed due to exception.")
put_job_failure(job_id, "Function exception: " + str(e))
logger.debug("Function complete.")
return "Complete."
```
Cognito User Pools have several different Lambda trigger sources, all of which map to a different data class, which
can be imported from aws_lambda_powertools.data_classes.cognito_user_pool_event
:
Trigger/Event Source | Data Class |
---|---|
Custom message event | data_classes.cognito_user_pool_event.CustomMessageTriggerEvent |
Post authentication | data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent |
Post confirmation | data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent |
Pre authentication | data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent |
Pre sign-up | data_classes.cognito_user_pool_event.PreSignUpTriggerEvent |
Pre token generation | data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent |
User migration | data_classes.cognito_user_pool_event.UserMigrationTriggerEvent |
Define Auth Challenge | data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent |
Create Auth Challenge | data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent |
Verify Auth Challenge | data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent |
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent
def lambda_handler(event, context):
event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event)
user_attributes = event.request.user_attributes
do_something_with(user_attributes)
```
???+ note
In this example we are modifying the wrapped dict response fields, so we need to return the json serializable wrapped event in event.raw_event
.
This example is based on the AWS Cognito docs for Define Auth Challenge Lambda Trigger{target="_blank"}.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent
def handler(event: dict, context) -> dict:
event: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event)
if (
len(event.request.session) == 1
and event.request.session[0].challenge_name == "SRP_A"
):
event.response.issue_tokens = False
event.response.fail_authentication = False
event.response.challenge_name = "PASSWORD_VERIFIER"
elif (
len(event.request.session) == 2
and event.request.session[1].challenge_name == "PASSWORD_VERIFIER"
and event.request.session[1].challenge_result
):
event.response.issue_tokens = False
event.response.fail_authentication = False
event.response.challenge_name = "CUSTOM_CHALLENGE"
elif (
len(event.request.session) == 3
and event.request.session[2].challenge_name == "CUSTOM_CHALLENGE"
and event.request.session[2].challenge_result
):
event.response.issue_tokens = True
event.response.fail_authentication = False
else:
event.response.issue_tokens = False
event.response.fail_authentication = True
return event.raw_event
```
=== "SPR_A response"
```json hl_lines="25-27"
{
"version": "1",
"region": "us-east-1",
"userPoolId": "us-east-1_example",
"userName": "UserName",
"callerContext": {
"awsSdkVersion": "awsSdkVersion",
"clientId": "clientId"
},
"triggerSource": "DefineAuthChallenge_Authentication",
"request": {
"userAttributes": {
"sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
"email_verified": "true",
"name": "First Last",
"email": "[email protected]"
},
"session": [
{
"challengeName": "SRP_A",
"challengeResult": true
}
]
},
"response": {
"issueTokens": false,
"failAuthentication": false,
"challengeName": "PASSWORD_VERIFIER"
}
}
```
=== "PASSWORD_VERIFIER success response"
```json hl_lines="30-32"
{
"version": "1",
"region": "us-east-1",
"userPoolId": "us-east-1_example",
"userName": "UserName",
"callerContext": {
"awsSdkVersion": "awsSdkVersion",
"clientId": "clientId"
},
"triggerSource": "DefineAuthChallenge_Authentication",
"request": {
"userAttributes": {
"sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
"email_verified": "true",
"name": "First Last",
"email": "[email protected]"
},
"session": [
{
"challengeName": "SRP_A",
"challengeResult": true
},
{
"challengeName": "PASSWORD_VERIFIER",
"challengeResult": true
}
]
},
"response": {
"issueTokens": false,
"failAuthentication": false,
"challengeName": "CUSTOM_CHALLENGE"
}
}
```
=== "CUSTOM_CHALLENGE success response"
```json hl_lines="34 35"
{
"version": "1",
"region": "us-east-1",
"userPoolId": "us-east-1_example",
"userName": "UserName",
"callerContext": {
"awsSdkVersion": "awsSdkVersion",
"clientId": "clientId"
},
"triggerSource": "DefineAuthChallenge_Authentication",
"request": {
"userAttributes": {
"sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
"email_verified": "true",
"name": "First Last",
"email": "[email protected]"
},
"session": [
{
"challengeName": "SRP_A",
"challengeResult": true
},
{
"challengeName": "PASSWORD_VERIFIER",
"challengeResult": true
},
{
"challengeName": "CUSTOM_CHALLENGE",
"challengeResult": true
}
]
},
"response": {
"issueTokens": true,
"failAuthentication": false
}
}
```
This example is based on the AWS Cognito docs for Create Auth Challenge Lambda Trigger{target="_blank"}.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent
@event_source(data_class=CreateAuthChallengeTriggerEvent)
def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict:
if event.request.challenge_name == "CUSTOM_CHALLENGE":
event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"}
event.response.private_challenge_parameters = {"answer": "5"}
event.response.challenge_metadata = "CAPTCHA_CHALLENGE"
return event.raw_event
```
This example is based on the AWS Cognito docs for Verify Auth Challenge Response Lambda Trigger{target="_blank"}.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent
@event_source(data_class=VerifyAuthChallengeResponseTriggerEvent)
def handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict:
event.response.answer_correct = (
event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer
)
return event.raw_event
```
New in 1.11.0
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import (
ConnectContactFlowChannel,
ConnectContactFlowEndpointType,
ConnectContactFlowEvent,
ConnectContactFlowInitiationMethod,
)
def lambda_handler(event, context):
event: ConnectContactFlowEvent = ConnectContactFlowEvent(event)
assert event.contact_data.attributes == {"Language": "en-US"}
assert event.contact_data.channel == ConnectContactFlowChannel.VOICE
assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER
assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API
```
The DynamoDB data class utility provides the base class for DynamoDBStreamEvent
, a typed class for
attributes values (AttributeValue
), as well as enums for stream view type (StreamViewType
) and event type
(DynamoDBRecordEventName
).
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBStreamEvent,
DynamoDBRecordEventName
)
def lambda_handler(event, context):
event: DynamoDBStreamEvent = DynamoDBStreamEvent(event)
# Multiple records can be delivered in a single event
for record in event.records:
if record.event_name == DynamoDBRecordEventName.MODIFY:
do_something_with(record.dynamodb.new_image)
do_something_with(record.dynamodb.old_image)
```
=== "multiple_records_types.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import AttributeValueType, AttributeValue
from aws_lambda_powertools.utilities.typing import LambdaContext
@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
for record in event.records:
key: AttributeValue = record.dynamodb.keys["id"]
if key == AttributeValueType.Number:
# {"N": "123.45"} => "123.45"
assert key.get_value == key.n_value
print(key.get_value)
elif key == AttributeValueType.Map:
assert key.get_value == key.map_value
print(key.get_value)
```
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, EventBridgeEvent
@event_source(data_class=EventBridgeEvent)
def lambda_handler(event: EventBridgeEvent, context):
do_something_with(event.detail)
```
This example is based on the AWS docs for Amazon MSK{target="_blank"} and self-managed Apache Kafka{target="_blank"}.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, KafkaEvent
@event_source(data_class=KafkaEvent)
def lambda_handler(event: KafkaEvent, context):
for record in event.records:
do_something_with(record.decoded_key, record.json_value)
```
Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json or plain text, depending on the original payload.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, KinesisStreamEvent
@event_source(data_class=KinesisStreamEvent)
def lambda_handler(event: KinesisStreamEvent, context):
kinesis_record = next(event.records).kinesis
# if data was delivered as text
data = kinesis_record.data_as_text()
# if data was delivered as json
data = kinesis_record.data_as_json()
do_something_with(data)
```
Kinesis Firehose Data Transformation can use a Lambda Function to modify the records inline, and re-emit them back to the Delivery Stream.
Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper function to access the data either as json or plain text, depending on the original payload.
=== "app.py"
```python
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
```
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, LambdaFunctionUrlEvent
@event_source(data_class=LambdaFunctionUrlEvent)
def lambda_handler(event: LambdaFunctionUrlEvent, context):
do_something_with(event.body)
```
It is used for Rabbit MQ payloads{target="_blank"}, also see the blog post{target="_blank"} for more details.
=== "app.py"
```python hl_lines="4-5 9-10"
from typing import Dict
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent
logger = Logger()
@event_source(data_class=RabbitMQEvent)
def lambda_handler(event: RabbitMQEvent, context):
for queue_name, messages in event.rmq_messages_by_queue.items():
logger.debug(f"Messages for queue: {queue_name}")
for message in messages:
logger.debug(f"MessageID: {message.basic_properties.message_id}")
data: Dict = message.json_data
logger.debug("Process json in base64 encoded data str", data)
```
=== "app.py"
```python
from urllib.parse import unquote_plus
from aws_lambda_powertools.utilities.data_classes import event_source, S3Event
@event_source(data_class=S3Event)
def lambda_handler(event: S3Event, context):
bucket_name = event.bucket_name
# Multiple records can be delivered in a single event
for record in event.records:
object_key = unquote_plus(record.s3.get_object.key)
do_something_with(f"{bucket_name}/{object_key}")
```
This example is based on the AWS Blog post Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3{target="_blank"}.
=== "app.py"
```python hl_lines="5-6 12 14"
import boto3
import requests
from aws_lambda_powertools import Logger
from aws_lambda_powertools.logging.correlation_paths import S3_OBJECT_LAMBDA
from aws_lambda_powertools.utilities.data_classes.s3_object_event import S3ObjectLambdaEvent
logger = Logger()
session = boto3.Session()
s3 = session.client("s3")
@logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA, log_event=True)
def lambda_handler(event, context):
event = S3ObjectLambdaEvent(event)
# Get object from S3
response = requests.get(event.input_s3_url)
original_object = response.content.decode("utf-8")
# Make changes to the object about to be returned
transformed_object = original_object.upper()
# Write object back to S3 Object Lambda
s3.write_get_object_response(
Body=transformed_object, RequestRoute=event.request_route, RequestToken=event.request_token
)
return {"status_code": 200}
```
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, SESEvent
@event_source(data_class=SESEvent)
def lambda_handler(event: SESEvent, context):
# Multiple records can be delivered in a single event
for record in event.records:
mail = record.ses.mail
common_headers = mail.common_headers
do_something_with(common_headers.to, common_headers.subject)
```
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, SNSEvent
@event_source(data_class=SNSEvent)
def lambda_handler(event: SNSEvent, context):
# Multiple records can be delivered in a single event
for record in event.records:
message = record.sns.message
subject = record.sns.subject
do_something_with(subject, message)
```
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, SQSEvent
@event_source(data_class=SQSEvent)
def lambda_handler(event: SQSEvent, context):
# Multiple records can be delivered in a single event
for record in event.records:
do_something_with(record.body)
```