diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index ec45bfbd0b2..06eaedb8904 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -1,7 +1,11 @@ import base64 import json -from typing import Iterator +import zlib +from typing import Iterator, List +from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import ( + CloudWatchLogsDecodedData, +) from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @@ -43,6 +47,11 @@ def data_as_json(self) -> dict: """Decode binary encoded data as json""" return json.loads(self.data_as_text()) + def data_zlib_compressed_as_json(self) -> dict: + """Decode binary encoded data as bytes""" + decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32) + return json.loads(decompressed) + class KinesisStreamRecord(DictWrapper): @property @@ -98,3 +107,11 @@ class KinesisStreamEvent(DictWrapper): def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record) + + +def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> List[CloudWatchLogsDecodedData]: + return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records] + + +def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) -> CloudWatchLogsDecodedData: + return CloudWatchLogsDecodedData(data=record.kinesis.data_zlib_compressed_as_json()) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 9981978ebc9..85c58e7ce72 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -58,33 +58,33 @@ Same example as above, but using the `event_source` decorator ## Supported event sources -Event Source | Data_class -------------------------------------------------- | --------------------------------------------------------------------------------- -[Active MQ](#active-mq) | `ActiveMQEvent` -[API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` -[API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` -[API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent` -[API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2` -[Application Load Balancer](#application-load-balancer) | `ALBEvent` -[AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent` -[AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent` -[CloudWatch Dashboard Custom Widget](#cloudwatch-dashboard-custom-widget) | `CloudWatchDashboardCustomWidgetEvent` -[CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent` -[CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent` -[Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event` -[Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent` -[DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` -[EventBridge](#eventbridge) | `EventBridgeEvent` -[Kafka](#kafka) | `KafkaEvent` -[Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` -[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` -[Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` -[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` -[S3](#s3) | `S3Event` -[S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` -[SES](#ses) | `SESEvent` -[SNS](#sns) | `SNSEvent` -[SQS](#sqs) | `SQSEvent` +| Event Source | Data_class | +| ------------------------------------------------------------------------- | -------------------------------------------------- | +| [Active MQ](#active-mq) | `ActiveMQEvent` | +| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` | +| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` | +| [API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent` | +| [API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2` | +| [Application Load Balancer](#application-load-balancer) | `ALBEvent` | +| [AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent` | +| [AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent` | +| [CloudWatch Dashboard Custom Widget](#cloudwatch-dashboard-custom-widget) | `CloudWatchDashboardCustomWidgetEvent` | +| [CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent` | +| [CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent` | +| [Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event` | +| [Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent` | +| [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` | +| [EventBridge](#eventbridge) | `EventBridgeEvent` | +| [Kafka](#kafka) | `KafkaEvent` | +| [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` | +| [Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` | +| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` | +| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` | +| [S3](#s3) | `S3Event` | +| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` | +| [SES](#ses) | `SESEvent` | +| [SNS](#sns) | `SNSEvent` | +| [SQS](#sqs) | `SQSEvent` | ???+ info The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of @@ -456,9 +456,9 @@ In this example, we also use the new Logger `correlation_id` and built-in `corre A simple echo script. Anything passed in \`\`\`echo\`\`\` parameter is returned as the content of custom widget. ### Widget parameters - Param | Description - ---|--- - **echo** | The content to echo back + | Param | Description | + | -------- | ------------------------ | + | **echo** | The content to echo back | ### Example parameters \`\`\` yaml @@ -497,6 +497,53 @@ decompress and parse json data from the event. do_something_with(event.timestamp, event.message) ``` +#### Kinesis integration + +[When streaming CloudWatch Logs to a Kinesis Data Stream](https://aws.amazon.com/premiumsupport/knowledge-center/streaming-cloudwatch-logs/){target="_blank"} (cross-account or not), you can use `extract_cloudwatch_logs_from_event` to decode, decompress and extract logs as `CloudWatchLogsDecodedData` to ease log processing. + +=== "app.py" + + ```python hl_lines="5-6 11" + from typing import List + + from aws_lambda_powertools.utilities.data_classes import event_source + from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData + from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import ( + KinesisStreamEvent, extract_cloudwatch_logs_from_event) + + + @event_source(data_class=KinesisStreamEvent) + def simple_handler(event: KinesisStreamEvent, context): + logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event) + for log in logs: + if log.message_type == "DATA_MESSAGE": + return "success" + return "nothing to be processed" + ``` + +Alternatively, you can use `extract_cloudwatch_logs_from_record` to seamless integrate with the [Batch utility](./batch.md) for more robust log processing. + +=== "app.py" + + ```python hl_lines="3-4 10" + from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType, + batch_processor) + from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import ( + KinesisStreamRecord, extract_cloudwatch_logs_from_record) + + processor = BatchProcessor(event_type=EventType.KinesisDataStreams) + + + def record_handler(record: KinesisStreamRecord): + log = extract_cloudwatch_logs_from_record(record) + return log.message_type == "DATA_MESSAGE" + + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + ``` + ### CodePipeline Job Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda @@ -553,18 +600,18 @@ Data classes and utility functions to help create continuous delivery pipelines Cognito User Pools have several [different Lambda trigger sources](https://docs.aws.amazon.com/cognito/latest/developerguide/cognito-user-identity-pools-working-with-aws-lambda-triggers.html#cognito-user-identity-pools-working-with-aws-lambda-trigger-sources), all of which map to a different data class, which can be imported from `aws_lambda_powertools.data_classes.cognito_user_pool_event`: -Trigger/Event Source | Data Class -------------------------------------------------- | ------------------------------------------------- -Custom message event | `data_classes.cognito_user_pool_event.CustomMessageTriggerEvent` -Post authentication | `data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent` -Post confirmation | `data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent` -Pre authentication | `data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent` -Pre sign-up | `data_classes.cognito_user_pool_event.PreSignUpTriggerEvent` -Pre token generation | `data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent` -User migration | `data_classes.cognito_user_pool_event.UserMigrationTriggerEvent` -Define Auth Challenge | `data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent` -Create Auth Challenge | `data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent` -Verify Auth Challenge | `data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent` +| Trigger/Event Source | Data Class | +| --------------------- | ------------------------------------------------------------------------------ | +| Custom message event | `data_classes.cognito_user_pool_event.CustomMessageTriggerEvent` | +| Post authentication | `data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent` | +| Post confirmation | `data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent` | +| Pre authentication | `data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent` | +| Pre sign-up | `data_classes.cognito_user_pool_event.PreSignUpTriggerEvent` | +| Pre token generation | `data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent` | +| User migration | `data_classes.cognito_user_pool_event.UserMigrationTriggerEvent` | +| Define Auth Challenge | `data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent` | +| Create Auth Challenge | `data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent` | +| Verify Auth Challenge | `data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent` | #### Post Confirmation Example diff --git a/tests/events/kinesisStreamCloudWatchLogsEvent.json b/tests/events/kinesisStreamCloudWatchLogsEvent.json new file mode 100644 index 00000000000..a9a6959f907 --- /dev/null +++ b/tests/events/kinesisStreamCloudWatchLogsEvent.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "da10bf66b1f54bff5d96eae99149ad1f", + "sequenceNumber": "49635052289529725553291405521504870233219489715332317186", + "data": "H4sIAAAAAAAAAK2Sa2vbMBSG/4ox+xg3Oror39IlvaztVmJv7WjCUGwl8+ZLZstts5L/vuOsZYUyWGEgJHiP9J7nvOghLF3b2rVLthsXjsLJOBl/uZjG8fh4Gg7C+q5yDcqUAWcSONHEoFzU6+Om7jZYGdq7dljYcpnZ4cZHwLWOJl1Zbs/r9cR6e9RVqc/rKlpXV9eXt+fy27vt8W+L2DfOlr07oXQIMAQyvHlzPk6mcbKgciktF5lQfMU5dZZqzrShLF2uFC60aLtlmzb5prc/ygvvmjYc3YRPFG+LusuurE+/Ikqb1Gd55dq8jV+8isT6+317Rk42J5PTcLFnm966yvd2D2GeISJTYIwCJSQ1BE9OtWZCABWaKMIJAMdDMyU5MYZLhmkxBhQxfY4Re1tiWiAlBsgIVQTE4Cl6tI+T8SwJZu5Hh1dPs1FApOMSDI9WVKmIC+4irTMWQZYpx7QkztrgE06MU4yCx9DmVbgbvABmQJTGtkYAB0NwEwyYQUBpqEFuSbkGrThTRKi/AlP+HHj6fvJa3P9Ap/+Rbja9/PD6POd+0jXW7xM1B8CDsp37w7woXBb8qQDZ6xeurJttEOc/HWpUBxeHKNr74LHwsXXYlsm9flrl/rmFIQeS7m3m1fVs/DlIGpu6nhMiyWQGXNKIMbcCIgkhElKbaZnZpYJUz33s1iV+z/6+StMlR3yphHNcCyxiNEXf2zed6xuEu8XuF2wb6krnAwAA", + "approximateArrivalTimestamp": 1668093033.744 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000000:49635052289529725553291405521504870233219489715332317186", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49", + "awsRegion": "eu-west-1", + "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "cf4c4c2c9a49bdfaf58d7dbbc2b06081", + "sequenceNumber": "49635052289529725553291405520881064510298312199003701250", + "data": "H4sIAAAAAAAAAK2SW2/TQBCF/4pl8ViTvc7u5i0laVraQhUbWtREaG1PgsGXYK/bhqr/nXVoBRIgUYnXc2bPfHO092GFXWc3mOy2GI7D6SSZfDyfxfFkPgsPwua2xtbLjFPBgQqiifFy2WzmbdNvvTOyt92otFWa29HWRVRoHU37qtqdNZupdfaorzNXNHW0qS+vLm7O4PPr3fxHROxatNWQThgbUTqiZHT94mySzOJkBUqYLOWY8ZQLbaTRkEvDciUYzWzKfETXp13WFtsh/qgoHbZdOL4OnyhelU2fX1qXffIoXdKcFjV2RRf/9iqSmy933Sk53h5PT8LVnm12g7Ub4u7DIveIXFFjFNGUKUlAaMY0EUJKLjkQbxhKGCWeknMKoAGUkYoJ7TFd4St2tvJtDRYxDAg3VB08Ve/j42SySIIFfu396Ek+DkS+xkwAiYhM00isgUV6jXmEMrM5EmMsh+C9v9hfMQ4eS1vW4cPBH4CZVpoTJkEIAp5RUMo8vGFae3JNCCdUccMVgPw7sP4VePZm+lzc/0AH/0i3mF28fX6fSzftW+v2jZKXRgVVt3SHRVliHvx06F4+x6ppd0FcfEMvMR2cH3rR3gWPxrsO/Vau9vqyvlpMPgRJazMcYGgEHHLKBhLGJaBA0JLxNc0JppoS9Cwxbir/B4d5QDBAQSnfFFGp8aa/vxw2uLbHYUH4sHr4Dj5RJxfMAwAA", + "approximateArrivalTimestamp": 1668092612.992 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000000:49635052289529725553291405520881064510298312199003701250", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49", + "awsRegion": "eu-west-1", + "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG" + } + ] +} \ No newline at end of file diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index a0113b62486..916e9b61e0d 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -83,6 +83,10 @@ StreamViewType, ) from aws_lambda_powertools.utilities.data_classes.event_source import event_source +from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import ( + extract_cloudwatch_logs_from_event, + extract_cloudwatch_logs_from_record, +) from aws_lambda_powertools.utilities.data_classes.s3_object_event import ( S3ObjectLambdaEvent, ) @@ -1267,6 +1271,14 @@ def test_kinesis_stream_event_json_data(): assert record.kinesis.data_as_json() == json_value +def test_kinesis_stream_event_cloudwatch_logs_data_extraction(): + event = KinesisStreamEvent(load_event("kinesisStreamCloudWatchLogsEvent.json")) + extracted_logs = extract_cloudwatch_logs_from_event(event) + individual_logs = [extract_cloudwatch_logs_from_record(record) for record in event.records] + + assert len(extracted_logs) == len(individual_logs) + + def test_alb_event(): event = ALBEvent(load_event("albEvent.json")) assert event.request_context.elb_target_group_arn == event["requestContext"]["elb"]["targetGroupArn"]