Skip to content

Commit

Permalink
feat(parser): add schema envelopes (#1815)
Browse files Browse the repository at this point in the history
* first envelope

* add abstract class

* add tests

* add more tests

* fix tests

* add envelopes

* add middy parser

* minor schema changes

* add more envelopes and tests, refactored utils to autocomplete event files

* simplified check

* remove middleware from this branch

* refactored from class to function envelopes

* removed parser tests, should be in another branch

* add parser to pre push

* consistent naming
  • Loading branch information
am29d authored Dec 19, 2023
1 parent 20cde95 commit ce9f8a1
Show file tree
Hide file tree
Showing 53 changed files with 1,241 additions and 173 deletions.
3 changes: 2 additions & 1 deletion .husky/pre-push
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ npm t \
-w packages/metrics \
-w packages/tracer \
-w packages/idempotency \
-w packages/parameters
-w packages/parameters \
-w packages/parser
66 changes: 65 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions packages/parser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@
"serverless",
"nodejs"
],

"peerDependencies": {
"zod": ">=3.x"
},
"devDependencies": {
"@anatine/zod-mock": "^3.13.3",
"@faker-js/faker": "^8.3.1"
}
}
}
18 changes: 18 additions & 0 deletions packages/parser/src/envelopes/apigw.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { parse } from './envelope.js';
import { z, ZodSchema } from 'zod';
import { APIGatewayProxyEventSchema } from '../schemas/apigw.js';

/**
* API Gateway envelope to extract data within body key
*/
export const apiGatewayEnvelope = <T extends ZodSchema>(
data: unknown,
schema: T
): z.infer<T> => {
const parsedEnvelope = APIGatewayProxyEventSchema.parse(data);
if (!parsedEnvelope.body) {
throw new Error('Body field of API Gateway event is undefined');
}

return parse(parsedEnvelope.body, schema);
};
18 changes: 18 additions & 0 deletions packages/parser/src/envelopes/apigwv2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { parse } from './envelope.js';
import { z, ZodSchema } from 'zod';
import { APIGatewayProxyEventV2Schema } from '../schemas/apigwv2.js';

/**
* API Gateway V2 envelope to extract data within body key
*/
export const apiGatewayV2Envelope = <T extends ZodSchema>(
data: unknown,
schema: T
): z.infer<T> => {
const parsedEnvelope = APIGatewayProxyEventV2Schema.parse(data);
if (!parsedEnvelope.body) {
throw new Error('Body field of API Gateway event is undefined');
}

return parse(parsedEnvelope.body, schema);
};
23 changes: 23 additions & 0 deletions packages/parser/src/envelopes/cloudwatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { parse } from './envelope.js';
import { z, ZodSchema } from 'zod';
import { CloudWatchLogsSchema } from '../schemas/cloudwatch.js';

/**
* CloudWatch Envelope to extract a List of log records.
*
* The record's body parameter is a string (after being base64 decoded and gzipped),
* though it can also be a JSON encoded string.
* Regardless of its type it'll be parsed into a BaseModel object.
*
* Note: The record will be parsed the same way so if model is str
*/
export const cloudWatchEnvelope = <T extends ZodSchema>(
data: unknown,
schema: T
): z.infer<T> => {
const parsedEnvelope = CloudWatchLogsSchema.parse(data);

return parsedEnvelope.awslogs.data.logEvents.map((record) => {
return parse(record.message, schema);
});
};
28 changes: 28 additions & 0 deletions packages/parser/src/envelopes/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { parse } from './envelope.js';
import { z, ZodSchema } from 'zod';
import { DynamoDBStreamSchema } from '../schemas/dynamodb.js';

type DynamoDBStreamEnvelopeResponse<T extends ZodSchema> = {
NewImage: z.infer<T>;
OldImage: z.infer<T>;
};

/**
* DynamoDB Stream Envelope to extract data within NewImage/OldImage
*
* Note: Values are the parsed models. Images' values can also be None, and
* length of the list is the record's amount in the original event.
*/
export const dynamoDDStreamEnvelope = <T extends ZodSchema>(
data: unknown,
schema: T
): DynamoDBStreamEnvelopeResponse<T>[] => {
const parsedEnvelope = DynamoDBStreamSchema.parse(data);

return parsedEnvelope.Records.map((record) => {
return {
NewImage: parse(record.dynamodb.NewImage, schema),
OldImage: parse(record.dynamodb.OldImage, schema),
};
});
};
23 changes: 23 additions & 0 deletions packages/parser/src/envelopes/envelope.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { z, ZodSchema } from 'zod';

/**
* Abstract function to parse the content of the envelope using provided schema.
* Both inputs are provided as unknown by the user.
* We expect the data to be either string that can be parsed to json or object.
* @internal
* @param data data to parse
* @param schema schema
*/
export const parse = <T extends ZodSchema>(
data: unknown,
schema: T
): z.infer<T>[] => {
if (typeof data === 'string') {
return schema.parse(JSON.parse(data));
} else if (typeof data === 'object') {
return schema.parse(data);
} else
throw new Error(
`Invalid data type for envelope. Expected string or object, got ${typeof data}`
);
};
13 changes: 13 additions & 0 deletions packages/parser/src/envelopes/eventbridge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { parse } from './envelope.js';
import { z, ZodSchema } from 'zod';
import { EventBridgeSchema } from '../schemas/eventbridge.js';

/**
* Envelope for EventBridge schema that extracts and parses data from the `detail` key.
*/
export const eventBridgeEnvelope = <T extends ZodSchema>(
data: unknown,
schema: T
): z.infer<T> => {
return parse(EventBridgeSchema.parse(data).detail, schema);
};
39 changes: 39 additions & 0 deletions packages/parser/src/envelopes/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { z, ZodSchema } from 'zod';
import { parse } from './envelope.js';
import {
KafkaMskEventSchema,
KafkaSelfManagedEventSchema,
} from '../schemas/kafka.js';
import { type KafkaRecord } from '../types/schema.js';

/**
* Kafka event envelope to extract data within body key
* The record's body parameter is a string, though it can also be a JSON encoded string.
* Regardless of its type it'll be parsed into a BaseModel object.
*
* Note: Records will be parsed the same way so if model is str,
* all items in the list will be parsed as str and not as JSON (and vice versa)
*/
export const kafkaEnvelope = <T extends ZodSchema>(
data: unknown,
schema: T
): z.infer<T> => {
// manually fetch event source to deside between Msk or SelfManaged

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const eventSource = data['eventSource'];

const parsedEnvelope:
| z.infer<typeof KafkaMskEventSchema>
| z.infer<typeof KafkaSelfManagedEventSchema> =
eventSource === 'aws:kafka'
? KafkaMskEventSchema.parse(data)
: KafkaSelfManagedEventSchema.parse(data);

return Object.values(parsedEnvelope.records).map((topicRecord) => {
return topicRecord.map((record: KafkaRecord) => {
return parse(record.value, schema);
});
});
};
26 changes: 26 additions & 0 deletions packages/parser/src/envelopes/kinesis-firehose.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { parse } from './envelope.js';
import { z, ZodSchema } from 'zod';
import { KinesisFirehoseSchema } from '../schemas/kinesis-firehose.js';

/**
* Kinesis Firehose Envelope to extract array of Records
*
* The record's data parameter is a base64 encoded string which is parsed into a bytes array,
* though it can also be a JSON encoded string.
* Regardless of its type it'll be parsed into a BaseModel object.
*
* Note: Records will be parsed the same way so if model is str,
* all items in the list will be parsed as str and not as JSON (and vice versa)
*
* https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
*/
export const kinesisFirehoseEnvelope = <T extends ZodSchema>(
data: unknown,
schema: T
): z.infer<T> => {
const parsedEnvelope = KinesisFirehoseSchema.parse(data);

return parsedEnvelope.records.map((record) => {
return parse(record.data, schema);
});
};
24 changes: 24 additions & 0 deletions packages/parser/src/envelopes/kinesis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { parse } from './envelope.js';
import { z, ZodSchema } from 'zod';
import { KinesisDataStreamSchema } from '../schemas/kinesis.js';

/**
* Kinesis Data Stream Envelope to extract array of Records
*
* The record's data parameter is a base64 encoded string which is parsed into a bytes array,
* though it can also be a JSON encoded string.
* Regardless of its type it'll be parsed into a BaseModel object.
*
* Note: Records will be parsed the same way so if model is str,
* all items in the list will be parsed as str and not as JSON (and vice versa)
*/
export const kinesisEnvelope = <T extends ZodSchema>(
data: unknown,
schema: T
): z.infer<T> => {
const parsedEnvelope = KinesisDataStreamSchema.parse(data);

return parsedEnvelope.Records.map((record) => {
return parse(record.kinesis.data, schema);
});
};
18 changes: 18 additions & 0 deletions packages/parser/src/envelopes/lambda.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { parse } from './envelope.js';
import { z, ZodSchema } from 'zod';
import { LambdaFunctionUrlSchema } from '../schemas/lambda.js';

/**
* Lambda function URL envelope to extract data within body key
*/
export const lambdaFunctionUrlEnvelope = <T extends ZodSchema>(
data: unknown,
schema: T
): z.infer<T> => {
const parsedEnvelope = LambdaFunctionUrlSchema.parse(data);
if (!parsedEnvelope.body) {
throw new Error('Body field of Lambda function URL event is undefined');
}

return parse(parsedEnvelope.body, schema);
};
Loading

0 comments on commit ce9f8a1

Please sign in to comment.