Skip to content

Commit

Permalink
add sink change log
Browse files Browse the repository at this point in the history
  • Loading branch information
tiffanyvu committed Nov 26, 2024
1 parent 89520bf commit 21f1968
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
29 changes: 25 additions & 4 deletions lib/lambda/sinkChangelog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
logError,
} from "../libs/sink-lib";
import { Index } from "shared-types/opensearch";
import { z } from "zod";
const osDomain = process.env.osDomain;
if (!osDomain) {
throw new Error("Missing required environment variable(s)");
Expand Down Expand Up @@ -61,8 +62,7 @@ const processAndIndex = async ({
transforms: any;
topicPartition: string;
}) => {
const docs: Array<(typeof transforms)[keyof typeof transforms]["Schema"]> =
[];
const docs: Array<(typeof transforms)[keyof typeof transforms]["Schema"]> = [];
for (const kafkaRecord of kafkaRecords) {
console.log(JSON.stringify(kafkaRecord, null, 2));
const { value, offset } = kafkaRecord;
Expand All @@ -85,9 +85,30 @@ const processAndIndex = async ({
console.log("event below");
console.log(record.event);

if (record.isAdminChange) {
const deletedPackageSchema = z.object({
id: z.string(),
deleted: z.boolean(),
});

deletedPackageSchema.transform((schema) => ({
...schema,
event: "soft-delete",
packageId: schema.id,
id: `${schema.id}-${offset}`,
}));

const result = deletedPackageSchema.safeParse(record);

if (result.success) {
docs.push(result.data);
} else {
console.log("Skipping package with invalid format", result.error.message);
}
}

if (record.event in transforms) {
const transformForEvent =
transforms[record.event as keyof typeof transforms];
const transformForEvent = transforms[record.event as keyof typeof transforms];

const result = transformForEvent.transform(offset).safeParse(record);

Expand Down
2 changes: 2 additions & 0 deletions lib/lambda/update/updatePackage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ describe("Update package", () => {
{
key: "test-id",
value: JSON.stringify({
id: "test-id",
deleted: true,
isAdminChange: true,
origin: "mako",
Expand Down Expand Up @@ -119,6 +120,7 @@ describe("Update package", () => {
value: JSON.stringify({
state: "MD",
initialIntakeNeeded: true,
id: "test-id",
isAdminChange: true,
origin: "mako",
}),
Expand Down

0 comments on commit 21f1968

Please sign in to comment.