Skip to content

Commit

Permalink
feat(parser): extract CloudWatch Logs in Kinesis streams (#1726)
Browse files Browse the repository at this point in the history
  • Loading branch information
heitorlessa authored Nov 16, 2022
1 parent 206038e commit 2b4740a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
22 changes: 21 additions & 1 deletion aws_lambda_powertools/utilities/parser/models/kinesis.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from typing import List, Type, Union
import json
import zlib
from typing import Dict, List, Type, Union

from pydantic import BaseModel, validator

from aws_lambda_powertools.shared.functions import base64_decode
from aws_lambda_powertools.utilities.parser.models.cloudwatch import (
CloudWatchLogsDecode,
)
from aws_lambda_powertools.utilities.parser.types import Literal


Expand All @@ -28,6 +33,21 @@ class KinesisDataStreamRecord(BaseModel):
eventSourceARN: str
kinesis: KinesisDataStreamRecordPayload

def decompress_zlib_record_data_as_json(self) -> Dict:
"""Decompress Kinesis Record bytes data zlib compressed to JSON"""
if not isinstance(self.kinesis.data, bytes):
raise ValueError("We can only decompress bytes data, not custom models.")

return json.loads(zlib.decompress(self.kinesis.data, zlib.MAX_WBITS | 32))


class KinesisDataStreamModel(BaseModel):
Records: List[KinesisDataStreamRecord]


def extract_cloudwatch_logs_from_event(event: KinesisDataStreamModel) -> List[CloudWatchLogsDecode]:
return [CloudWatchLogsDecode(**record.decompress_zlib_record_data_as_json()) for record in event.Records]


def extract_cloudwatch_logs_from_record(record: KinesisDataStreamRecord) -> CloudWatchLogsDecode:
return CloudWatchLogsDecode(**record.decompress_zlib_record_data_as_json())
40 changes: 40 additions & 0 deletions tests/functional/parser/test_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from aws_lambda_powertools.utilities.parser import (
BaseModel,
ValidationError,
envelopes,
event_parser,
Expand All @@ -11,6 +12,13 @@
KinesisDataStreamModel,
KinesisDataStreamRecordPayload,
)
from aws_lambda_powertools.utilities.parser.models.cloudwatch import (
CloudWatchLogsDecode,
)
from aws_lambda_powertools.utilities.parser.models.kinesis import (
extract_cloudwatch_logs_from_event,
extract_cloudwatch_logs_from_record,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from tests.functional.parser.schemas import MyKinesisBusiness
from tests.functional.utils import load_event
Expand Down Expand Up @@ -111,3 +119,35 @@ def test_validate_event_does_not_conform_with_model():
event_dict: Any = {"hello": "s"}
with pytest.raises(ValidationError):
handle_kinesis(event_dict, LambdaContext())


def test_kinesis_stream_event_cloudwatch_logs_data_extraction():
# GIVEN a KinesisDataStreamModel is instantiated with CloudWatch Logs compressed data
event_dict = load_event("kinesisStreamCloudWatchLogsEvent.json")
stream_data = KinesisDataStreamModel(**event_dict)
single_record = stream_data.Records[0]

# WHEN we try to extract CloudWatch Logs from KinesisDataStreamRecordPayload model
extracted_logs = extract_cloudwatch_logs_from_event(stream_data)
individual_logs = [extract_cloudwatch_logs_from_record(record) for record in stream_data.Records]
single_log = extract_cloudwatch_logs_from_record(single_record)

# THEN we should have extracted any potential logs as CloudWatchLogsDecode models
assert len(extracted_logs) == len(individual_logs)
assert isinstance(single_log, CloudWatchLogsDecode)


def test_kinesis_stream_event_cloudwatch_logs_data_extraction_fails_with_custom_model():
# GIVEN a custom model replaces Kinesis Record Data bytes
class DummyModel(BaseModel):
...

event_dict = load_event("kinesisStreamCloudWatchLogsEvent.json")
stream_data = KinesisDataStreamModel(**event_dict)

# WHEN decompress_zlib_record_data_as_json is used
# THEN ValueError should be raised
with pytest.raises(ValueError, match="We can only decompress bytes data"):
for record in stream_data.Records:
record.kinesis.data = DummyModel()
record.decompress_zlib_record_data_as_json()

0 comments on commit 2b4740a

Please sign in to comment.