Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event_sources): extract CloudWatch Logs in Kinesis streams #1710

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import base64
import json
from typing import Iterator
import zlib
from typing import Iterator, List

from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import (
CloudWatchLogsDecodedData,
)
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


Expand Down Expand Up @@ -43,6 +47,11 @@ def data_as_json(self) -> dict:
"""Decode binary encoded data as json"""
return json.loads(self.data_as_text())

def data_zlib_compressed_as_json(self) -> dict:
"""Decode binary encoded data as bytes"""
decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32)
return json.loads(decompressed)


class KinesisStreamRecord(DictWrapper):
@property
Expand Down Expand Up @@ -98,3 +107,11 @@ class KinesisStreamEvent(DictWrapper):
def records(self) -> Iterator[KinesisStreamRecord]:
for record in self["Records"]:
yield KinesisStreamRecord(record)


def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> List[CloudWatchLogsDecodedData]:
return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]


def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) -> CloudWatchLogsDecodedData:
return CloudWatchLogsDecodedData(data=record.kinesis.data_zlib_compressed_as_json())
131 changes: 89 additions & 42 deletions docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,33 @@ Same example as above, but using the `event_source` decorator

## Supported event sources

Event Source | Data_class
------------------------------------------------- | ---------------------------------------------------------------------------------
[Active MQ](#active-mq) | `ActiveMQEvent`
[API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent`
[API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2`
[API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent`
[API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2`
[Application Load Balancer](#application-load-balancer) | `ALBEvent`
[AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent`
[AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent`
[CloudWatch Dashboard Custom Widget](#cloudwatch-dashboard-custom-widget) | `CloudWatchDashboardCustomWidgetEvent`
[CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent`
[CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent`
[Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event`
[Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent`
[DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName`
[EventBridge](#eventbridge) | `EventBridgeEvent`
[Kafka](#kafka) | `KafkaEvent`
[Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent`
[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent`
[Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent`
[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent`
[S3](#s3) | `S3Event`
[S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent`
[SES](#ses) | `SESEvent`
[SNS](#sns) | `SNSEvent`
[SQS](#sqs) | `SQSEvent`
| Event Source | Data_class |
| ------------------------------------------------------------------------- | -------------------------------------------------- |
| [Active MQ](#active-mq) | `ActiveMQEvent` |
| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` |
| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` |
| [API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent` |
| [API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2` |
| [Application Load Balancer](#application-load-balancer) | `ALBEvent` |
| [AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent` |
| [AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent` |
| [CloudWatch Dashboard Custom Widget](#cloudwatch-dashboard-custom-widget) | `CloudWatchDashboardCustomWidgetEvent` |
| [CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent` |
| [CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent` |
| [Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event` |
| [Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent` |
| [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` |
| [EventBridge](#eventbridge) | `EventBridgeEvent` |
| [Kafka](#kafka) | `KafkaEvent` |
| [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` |
| [Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` |
| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` |
| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` |
| [S3](#s3) | `S3Event` |
| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` |
| [SES](#ses) | `SESEvent` |
| [SNS](#sns) | `SNSEvent` |
| [SQS](#sqs) | `SQSEvent` |

???+ info
The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of
Expand Down Expand Up @@ -456,9 +456,9 @@ In this example, we also use the new Logger `correlation_id` and built-in `corre
A simple echo script. Anything passed in \`\`\`echo\`\`\` parameter is returned as the content of custom widget.

### Widget parameters
Param | Description
---|---
**echo** | The content to echo back
| Param | Description |
| -------- | ------------------------ |
| **echo** | The content to echo back |

### Example parameters
\`\`\` yaml
Expand Down Expand Up @@ -497,6 +497,53 @@ decompress and parse json data from the event.
do_something_with(event.timestamp, event.message)
```

#### Kinesis integration

[When streaming CloudWatch Logs to a Kinesis Data Stream](https://aws.amazon.com/premiumsupport/knowledge-center/streaming-cloudwatch-logs/){target="_blank"} (cross-account or not), you can use `extract_cloudwatch_logs_from_event` to decode, decompress and extract logs as `CloudWatchLogsDecodedData` to ease log processing.

=== "app.py"

```python hl_lines="5-6 11"
from typing import List

from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
KinesisStreamEvent, extract_cloudwatch_logs_from_event)


@event_source(data_class=KinesisStreamEvent)
def simple_handler(event: KinesisStreamEvent, context):
logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event)
for log in logs:
if log.message_type == "DATA_MESSAGE":
return "success"
return "nothing to be processed"
```

Alternatively, you can use `extract_cloudwatch_logs_from_record` to seamless integrate with the [Batch utility](./batch.md) for more robust log processing.

=== "app.py"

```python hl_lines="3-4 10"
from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType,
batch_processor)
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
KinesisStreamRecord, extract_cloudwatch_logs_from_record)

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)


def record_handler(record: KinesisStreamRecord):
log = extract_cloudwatch_logs_from_record(record)
return log.message_type == "DATA_MESSAGE"


@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
return processor.response()
```

### CodePipeline Job

Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda
Expand Down Expand Up @@ -553,18 +600,18 @@ Data classes and utility functions to help create continuous delivery pipelines
Cognito User Pools have several [different Lambda trigger sources](https://docs.aws.amazon.com/cognito/latest/developerguide/cognito-user-identity-pools-working-with-aws-lambda-triggers.html#cognito-user-identity-pools-working-with-aws-lambda-trigger-sources), all of which map to a different data class, which
can be imported from `aws_lambda_powertools.data_classes.cognito_user_pool_event`:

Trigger/Event Source | Data Class
------------------------------------------------- | -------------------------------------------------
Custom message event | `data_classes.cognito_user_pool_event.CustomMessageTriggerEvent`
Post authentication | `data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent`
Post confirmation | `data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent`
Pre authentication | `data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent`
Pre sign-up | `data_classes.cognito_user_pool_event.PreSignUpTriggerEvent`
Pre token generation | `data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent`
User migration | `data_classes.cognito_user_pool_event.UserMigrationTriggerEvent`
Define Auth Challenge | `data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent`
Create Auth Challenge | `data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent`
Verify Auth Challenge | `data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent`
| Trigger/Event Source | Data Class |
| --------------------- | ------------------------------------------------------------------------------ |
| Custom message event | `data_classes.cognito_user_pool_event.CustomMessageTriggerEvent` |
| Post authentication | `data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent` |
| Post confirmation | `data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent` |
| Pre authentication | `data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent` |
| Pre sign-up | `data_classes.cognito_user_pool_event.PreSignUpTriggerEvent` |
| Pre token generation | `data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent` |
| User migration | `data_classes.cognito_user_pool_event.UserMigrationTriggerEvent` |
| Define Auth Challenge | `data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent` |
| Create Auth Challenge | `data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent` |
| Verify Auth Challenge | `data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent` |

#### Post Confirmation Example

Expand Down
36 changes: 36 additions & 0 deletions tests/events/kinesisStreamCloudWatchLogsEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "da10bf66b1f54bff5d96eae99149ad1f",
"sequenceNumber": "49635052289529725553291405521504870233219489715332317186",
"data": "H4sIAAAAAAAAAK2Sa2vbMBSG/4ox+xg3Oror39IlvaztVmJv7WjCUGwl8+ZLZstts5L/vuOsZYUyWGEgJHiP9J7nvOghLF3b2rVLthsXjsLJOBl/uZjG8fh4Gg7C+q5yDcqUAWcSONHEoFzU6+Om7jZYGdq7dljYcpnZ4cZHwLWOJl1Zbs/r9cR6e9RVqc/rKlpXV9eXt+fy27vt8W+L2DfOlr07oXQIMAQyvHlzPk6mcbKgciktF5lQfMU5dZZqzrShLF2uFC60aLtlmzb5prc/ygvvmjYc3YRPFG+LusuurE+/Ikqb1Gd55dq8jV+8isT6+317Rk42J5PTcLFnm966yvd2D2GeISJTYIwCJSQ1BE9OtWZCABWaKMIJAMdDMyU5MYZLhmkxBhQxfY4Re1tiWiAlBsgIVQTE4Cl6tI+T8SwJZu5Hh1dPs1FApOMSDI9WVKmIC+4irTMWQZYpx7QkztrgE06MU4yCx9DmVbgbvABmQJTGtkYAB0NwEwyYQUBpqEFuSbkGrThTRKi/AlP+HHj6fvJa3P9Ap/+Rbja9/PD6POd+0jXW7xM1B8CDsp37w7woXBb8qQDZ6xeurJttEOc/HWpUBxeHKNr74LHwsXXYlsm9flrl/rmFIQeS7m3m1fVs/DlIGpu6nhMiyWQGXNKIMbcCIgkhElKbaZnZpYJUz33s1iV+z/6+StMlR3yphHNcCyxiNEXf2zed6xuEu8XuF2wb6krnAwAA",
"approximateArrivalTimestamp": 1668093033.744
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49635052289529725553291405521504870233219489715332317186",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
"awsRegion": "eu-west-1",
"eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "cf4c4c2c9a49bdfaf58d7dbbc2b06081",
"sequenceNumber": "49635052289529725553291405520881064510298312199003701250",
"data": "H4sIAAAAAAAAAK2SW2/TQBCF/4pl8ViTvc7u5i0laVraQhUbWtREaG1PgsGXYK/bhqr/nXVoBRIgUYnXc2bPfHO092GFXWc3mOy2GI7D6SSZfDyfxfFkPgsPwua2xtbLjFPBgQqiifFy2WzmbdNvvTOyt92otFWa29HWRVRoHU37qtqdNZupdfaorzNXNHW0qS+vLm7O4PPr3fxHROxatNWQThgbUTqiZHT94mySzOJkBUqYLOWY8ZQLbaTRkEvDciUYzWzKfETXp13WFtsh/qgoHbZdOL4OnyhelU2fX1qXffIoXdKcFjV2RRf/9iqSmy933Sk53h5PT8LVnm12g7Ub4u7DIveIXFFjFNGUKUlAaMY0EUJKLjkQbxhKGCWeknMKoAGUkYoJ7TFd4St2tvJtDRYxDAg3VB08Ve/j42SySIIFfu396Ek+DkS+xkwAiYhM00isgUV6jXmEMrM5EmMsh+C9v9hfMQ4eS1vW4cPBH4CZVpoTJkEIAp5RUMo8vGFae3JNCCdUccMVgPw7sP4VePZm+lzc/0AH/0i3mF28fX6fSzftW+v2jZKXRgVVt3SHRVliHvx06F4+x6ppd0FcfEMvMR2cH3rR3gWPxrsO/Vau9vqyvlpMPgRJazMcYGgEHHLKBhLGJaBA0JLxNc0JppoS9Cwxbir/B4d5QDBAQSnfFFGp8aa/vxw2uLbHYUH4sHr4Dj5RJxfMAwAA",
"approximateArrivalTimestamp": 1668092612.992
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49635052289529725553291405520881064510298312199003701250",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
"awsRegion": "eu-west-1",
"eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG"
}
]
}
12 changes: 12 additions & 0 deletions tests/functional/test_data_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
StreamViewType,
)
from aws_lambda_powertools.utilities.data_classes.event_source import event_source
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
extract_cloudwatch_logs_from_event,
extract_cloudwatch_logs_from_record,
)
from aws_lambda_powertools.utilities.data_classes.s3_object_event import (
S3ObjectLambdaEvent,
)
Expand Down Expand Up @@ -1267,6 +1271,14 @@ def test_kinesis_stream_event_json_data():
assert record.kinesis.data_as_json() == json_value


def test_kinesis_stream_event_cloudwatch_logs_data_extraction():
event = KinesisStreamEvent(load_event("kinesisStreamCloudWatchLogsEvent.json"))
extracted_logs = extract_cloudwatch_logs_from_event(event)
individual_logs = [extract_cloudwatch_logs_from_record(record) for record in event.records]

assert len(extracted_logs) == len(individual_logs)


def test_alb_event():
event = ALBEvent(load_event("albEvent.json"))
assert event.request_context.elb_target_group_arn == event["requestContext"]["elb"]["targetGroupArn"]
Expand Down