diff --git a/packages/api-dynamodb-to-elasticsearch/__tests__/Decompressor.test.ts b/packages/api-dynamodb-to-elasticsearch/__tests__/Decompressor.test.ts new file mode 100644 index 00000000000..32cea0106bc --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/__tests__/Decompressor.test.ts @@ -0,0 +1,65 @@ +import { createGzipCompression } from "@webiny/api-elasticsearch"; +import { Decompressor } from "~/Decompressor"; +import { createPlugins } from "~tests/plugins"; +import { FaultyDecompressorPlugin } from "~tests/mocks/FaultyDecompressorPlugin"; +import { PluginsContainer } from "@webiny/plugins"; + +const compressor = createGzipCompression(); + +describe("Decompressor", () => { + it("should not do anything with the data because it is not compressed", async () => { + const decompressor = new Decompressor({ + plugins: createPlugins() + }); + + const data = { + title: "Hello World" + }; + + const result = await decompressor.decompress(data); + expect(result).toEqual(data); + }); + + it("should decompress the data", async () => { + const decompressor = new Decompressor({ + plugins: createPlugins() + }); + + const input = Object.freeze({ + title: "Hello World" + }); + + const data = Object.freeze(await compressor.compress(input)); + + const result = await decompressor.decompress(data); + expect(result).toEqual(input); + }); + + it("should return null because something is wrong with the compressed data", async () => { + const decompressor = new Decompressor({ + plugins: createPlugins() + }); + + const data = { + value: "some wrong value which cannot be decompressed", + compression: "gzip" + }; + + const result = await decompressor.decompress(data); + expect(result).toEqual(null); + }); + + it("should return null even if decompress throws an error", async () => { + const decompressor = new Decompressor({ + plugins: new PluginsContainer([new FaultyDecompressorPlugin()]) + }); + + const data = { + value: "some wrong value which cannot be decompressed", + compression: "gzip" + }; + + const result = await decompressor.decompress(data); + expect(result).toEqual(null); + }); +}); diff --git a/packages/api-dynamodb-to-elasticsearch/__tests__/Operations.test.ts b/packages/api-dynamodb-to-elasticsearch/__tests__/Operations.test.ts new file mode 100644 index 00000000000..4b478e13701 --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/__tests__/Operations.test.ts @@ -0,0 +1,129 @@ +import { Operations } from "~/Operations"; + +describe("Operations", () => { + it("should insert an item", async () => { + const operations = new Operations(); + + operations.insert({ + id: "1", + index: "test-index", + data: { + title: "Hello World" + } + }); + + expect(operations.items).toEqual([ + { + index: { + _id: "1", + _index: "test-index" + } + }, + { + title: "Hello World" + } + ]); + + expect(operations.total).toBe(2); + }); + + it("should modify an item", async () => { + const operations = new Operations(); + + operations.modify({ + id: "1", + index: "test-index", + data: { + title: "Hello World" + } + }); + + expect(operations.items).toEqual([ + { + index: { + _id: "1", + _index: "test-index" + } + }, + { + title: "Hello World" + } + ]); + + expect(operations.total).toBe(2); + }); + + it("should delete an item", async () => { + const operations = new Operations(); + + operations.delete({ + id: "1", + index: "test-index" + }); + + expect(operations.items).toEqual([ + { + delete: { + _id: "1", + _index: "test-index" + } + } + ]); + + expect(operations.total).toBe(1); + }); + + it("should insert, update and delete items", async () => { + const operations = new Operations(); + + operations.insert({ + id: "1", + index: "test-index", + data: { + title: "Hello World" + } + }); + + operations.modify({ + id: "2", + index: "test-index-2", + data: { + title: "Hello World 2" + } + }); + + operations.delete({ + id: "1", + index: "test-index" + }); + + expect(operations.items).toEqual([ + { + index: { + _id: "1", + _index: "test-index" + } + }, + { + title: "Hello World" + }, + { + index: { + _id: "2", + _index: "test-index-2" + } + }, + { + title: "Hello World 2" + }, + { + delete: { + _id: "1", + _index: "test-index" + } + } + ]); + + expect(operations.total).toBe(5); + }); +}); diff --git a/packages/api-dynamodb-to-elasticsearch/__tests__/OperationsBuilder.test.ts b/packages/api-dynamodb-to-elasticsearch/__tests__/OperationsBuilder.test.ts new file mode 100644 index 00000000000..89ecf5755df --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/__tests__/OperationsBuilder.test.ts @@ -0,0 +1,287 @@ +import { OperationsBuilder } from "~/OperationsBuilder"; +import { Decompressor } from "~/Decompressor"; +import { createPlugins } from "~tests/plugins"; +import { DynamoDBRecord } from "@webiny/handler-aws/types"; +import { marshall } from "~/marshall"; +import { OperationType } from "~/Operations"; + +describe("OperationsBuilder", () => { + const decompressor = new Decompressor({ + plugins: createPlugins() + }); + + it("should build an insert operation", async () => { + const builder = new OperationsBuilder({ + decompressor + }); + + const records: DynamoDBRecord[] = [ + { + eventID: "123", + eventName: OperationType.INSERT, + dynamodb: { + Keys: marshall({ + PK: "insertPk", + SK: "insertSk" + }), + NewImage: marshall({ + index: "a-test-index", + data: { + id: "123", + title: "Test" + } + }) + } + } + ]; + + const operations = await builder.build({ + records + }); + expect(operations.total).toBe(2); + + expect(operations.items).toEqual([ + { + index: { + _id: "insertPk:insertSk", + _index: "a-test-index" + } + }, + { + id: "123", + title: "Test" + } + ]); + }); + + it("should build a delete operation", async () => { + const builder = new OperationsBuilder({ + decompressor + }); + + const records: DynamoDBRecord[] = [ + { + eventID: "123", + eventName: OperationType.REMOVE, + dynamodb: { + Keys: marshall({ + PK: "deletePk", + SK: "deleteSk" + }), + OldImage: marshall({ + index: "a-test-index-for-delete" + }) + } + } + ]; + + const operations = await builder.build({ + records + }); + expect(operations.total).toBe(1); + + expect(operations.items).toEqual([ + { + delete: { + _id: "deletePk:deleteSk", + _index: "a-test-index-for-delete" + } + } + ]); + }); + + it("should skip record if there are no keys", async () => { + const builder = new OperationsBuilder({ + decompressor + }); + + const records: DynamoDBRecord[] = [ + { + eventID: "123", + eventName: OperationType.INSERT, + dynamodb: { + NewImage: marshall({ + index: "a-test-index", + data: { + id: "123", + title: "Test" + } + }) + } + } + ]; + + const operations = await builder.build({ + records + }); + expect(operations.total).toBe(0); + + expect(operations.items).toEqual([]); + }); + + it("should skip record if there is a missing dynamodb property", async () => { + const builder = new OperationsBuilder({ + decompressor + }); + + const records: DynamoDBRecord[] = [ + { + eventID: "123", + eventName: OperationType.INSERT + } + ]; + + const operations = await builder.build({ + records + }); + expect(operations.total).toBe(0); + + expect(operations.items).toEqual([]); + }); + + it("should skip record if newImage is marked as ignored", async () => { + const builder = new OperationsBuilder({ + decompressor + }); + + const records: DynamoDBRecord[] = [ + { + eventID: "123", + eventName: OperationType.INSERT, + dynamodb: { + Keys: marshall({ + PK: "insertPk", + SK: "insertSk" + }), + NewImage: marshall({ + ignore: true + }) + } + } + ]; + + const operations = await builder.build({ + records + }); + expect(operations.total).toBe(0); + + expect(operations.items).toEqual([]); + }); + + it("should skip record if there is nothing in the newImage", async () => { + const builder = new OperationsBuilder({ + decompressor + }); + + const records: DynamoDBRecord[] = [ + { + eventID: "123", + eventName: OperationType.INSERT, + dynamodb: { + Keys: marshall({ + PK: "insertPk", + SK: "insertSk" + }), + NewImage: marshall({}) + } + }, + { + eventID: "123", + eventName: OperationType.INSERT, + dynamodb: { + Keys: marshall({ + PK: "insertPk", + SK: "insertSk" + }), + // @ts-expect-error + NewImage: null + } + } + ]; + + const operations = await builder.build({ + records + }); + expect(operations.total).toBe(0); + + expect(operations.items).toEqual([]); + }); + + it("should skip record if there is no data in the newImage.data", async () => { + const builder = new OperationsBuilder({ + decompressor + }); + + const records: DynamoDBRecord[] = [ + { + eventID: "123", + eventName: OperationType.INSERT, + dynamodb: { + Keys: marshall({ + PK: "insertPk", + SK: "insertSk" + }), + NewImage: marshall({ + index: "a-test-index" + }) + } + } + ]; + + const operations = await builder.build({ + records + }); + expect(operations.total).toBe(0); + + expect(operations.items).toEqual([]); + }); + + it("should skip record if there is no index in the oldImage", async () => { + const builder = new OperationsBuilder({ + decompressor + }); + + const records: DynamoDBRecord[] = [ + { + eventID: "123", + eventName: OperationType.REMOVE, + dynamodb: { + Keys: marshall({ + PK: "deletePk", + SK: "deleteSk" + }), + OldImage: marshall({}) + } + }, + { + eventID: "1234", + eventName: OperationType.REMOVE, + dynamodb: { + Keys: marshall({ + PK: "deletePk", + SK: "deleteSk" + }), + // @ts-expect-error + OldImage: null + } + }, + { + eventID: "12345", + eventName: OperationType.REMOVE, + dynamodb: { + Keys: marshall({ + PK: "deletePk", + SK: "deleteSk" + }) + } + } + ]; + + const operations = await builder.build({ + records + }); + expect(operations.total).toBe(0); + + expect(operations.items).toEqual([]); + }); +}); diff --git a/packages/api-dynamodb-to-elasticsearch/__tests__/event.test.ts b/packages/api-dynamodb-to-elasticsearch/__tests__/event.test.ts index 7896a1a35ba..97616dc7377 100644 --- a/packages/api-dynamodb-to-elasticsearch/__tests__/event.test.ts +++ b/packages/api-dynamodb-to-elasticsearch/__tests__/event.test.ts @@ -1,7 +1,7 @@ import { createEventHandler } from "~/index"; import { PluginsContainer } from "@webiny/plugins"; import { marshall as baseMarshall } from "@webiny/aws-sdk/client-dynamodb"; -import { DynamoDBRecord } from "aws-lambda"; +import { DynamoDBRecord } from "@webiny/handler-aws/types"; interface Event { Records: DynamoDBRecord[]; @@ -93,4 +93,22 @@ describe("event", () => { expect(result).toEqual(null); }); + + it("should just skip because of no elasticsearch", async () => { + const eventHandler = createEventHandler(); + + const result = await eventHandler.cb({ + context: { + ...context, + elasticsearch: undefined + }, + event, + lambdaContext, + request, + reply, + next: jest.fn() + }); + + expect(result).toEqual(null); + }); }); diff --git a/packages/api-dynamodb-to-elasticsearch/__tests__/mocks/FaultyDecompressorPlugin.ts b/packages/api-dynamodb-to-elasticsearch/__tests__/mocks/FaultyDecompressorPlugin.ts new file mode 100644 index 00000000000..86f151f1492 --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/__tests__/mocks/FaultyDecompressorPlugin.ts @@ -0,0 +1,16 @@ +import { CompressionPlugin } from "@webiny/api-elasticsearch"; + +export class FaultyDecompressorPlugin extends CompressionPlugin { + public canCompress(): boolean { + return true; + } + public compress(): Promise { + throw new Error("Throwing an error on purpose - compress."); + } + public canDecompress(): boolean { + return true; + } + public decompress(): Promise { + throw new Error("Throwing an error on purpose - decompress."); + } +} diff --git a/packages/api-dynamodb-to-elasticsearch/__tests__/plugins.ts b/packages/api-dynamodb-to-elasticsearch/__tests__/plugins.ts new file mode 100644 index 00000000000..bd941b6ca73 --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/__tests__/plugins.ts @@ -0,0 +1,6 @@ +import { PluginsContainer } from "@webiny/plugins"; +import { createGzipCompression } from "@webiny/api-elasticsearch"; + +export const createPlugins = () => { + return new PluginsContainer([createGzipCompression()]); +}; diff --git a/packages/api-dynamodb-to-elasticsearch/__tests__/transfer.test.ts b/packages/api-dynamodb-to-elasticsearch/__tests__/transfer.test.ts index 3f30a387c25..daff9ce40eb 100644 --- a/packages/api-dynamodb-to-elasticsearch/__tests__/transfer.test.ts +++ b/packages/api-dynamodb-to-elasticsearch/__tests__/transfer.test.ts @@ -1,4 +1,4 @@ -import { createEventHandler, Operations } from "~/index"; +import { createEventHandler, OperationType } from "~/index"; import { createElasticsearchClient } from "@webiny/project-utils/testing/elasticsearch/createClient"; import { ElasticsearchContext } from "@webiny/api-elasticsearch/types"; import { Context, LambdaContext, Reply, Request } from "@webiny/handler-aws/types"; @@ -28,7 +28,7 @@ describe("transfer data", () => { event: { Records: [ { - eventName: Operations.INSERT, + eventName: OperationType.INSERT, dynamodb: { Keys: marshall({ PK: "PK_TEST", diff --git a/packages/api-dynamodb-to-elasticsearch/src/Decompressor.ts b/packages/api-dynamodb-to-elasticsearch/src/Decompressor.ts new file mode 100644 index 00000000000..498d75029ec --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/Decompressor.ts @@ -0,0 +1,24 @@ +import { IDecompressor } from "./types"; +import { decompress } from "@webiny/api-elasticsearch"; +import { GenericRecord } from "@webiny/cli/types"; +import { PluginsContainer } from "@webiny/plugins"; + +export interface IDecompressorParams { + plugins: PluginsContainer; +} + +export class Decompressor implements IDecompressor { + private readonly plugins: PluginsContainer; + + public constructor(params: IDecompressorParams) { + this.plugins = params.plugins; + } + + public async decompress(data: GenericRecord): Promise { + try { + return await decompress(this.plugins, data); + } catch (ex) { + return null; + } + } +} diff --git a/packages/api-dynamodb-to-elasticsearch/src/Operations.ts b/packages/api-dynamodb-to-elasticsearch/src/Operations.ts new file mode 100644 index 00000000000..d69ec29a1ed --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/Operations.ts @@ -0,0 +1,46 @@ +import { GenericRecord } from "@webiny/cli/types"; +import { + IDeleteOperationParams, + IInsertOperationParams, + IModifyOperationParams, + IOperations +} from "~/types"; + +export enum OperationType { + INSERT = "INSERT", + MODIFY = "MODIFY", + REMOVE = "REMOVE" +} + +export class Operations implements IOperations { + public readonly items: GenericRecord[] = []; + + public get total(): number { + return this.items.length; + } + + public insert(params: IInsertOperationParams): void { + this.items.push( + { + index: { + _id: params.id, + _index: params.index + } + }, + params.data + ); + } + + public modify(params: IModifyOperationParams): void { + this.insert(params); + } + + public delete(params: IDeleteOperationParams): void { + this.items.push({ + delete: { + _id: params.id, + _index: params.index + } + }); + } +} diff --git a/packages/api-dynamodb-to-elasticsearch/src/OperationsBuilder.ts b/packages/api-dynamodb-to-elasticsearch/src/OperationsBuilder.ts new file mode 100644 index 00000000000..69be6e36a56 --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/OperationsBuilder.ts @@ -0,0 +1,133 @@ +import { + IDecompressor, + IOperations, + IOperationsBuilder, + IOperationsBuilderBuildParams +} from "./types"; +import { Operations, OperationType } from "~/Operations"; +import { unmarshall } from "~/marshall"; + +interface RecordDynamoDbImage { + data: Record; + ignore?: boolean; + index: string; +} + +interface RecordDynamoDbKeys { + PK: string; + SK: string; +} + +export interface IOperationsBuilderParams { + decompressor: IDecompressor; +} + +export class OperationsBuilder implements IOperationsBuilder { + private readonly decompressor: IDecompressor; + + public constructor(params: IOperationsBuilderParams) { + this.decompressor = params.decompressor; + } + + public async build(params: IOperationsBuilderBuildParams): Promise { + const operations = new Operations(); + for (const record of params.records) { + if (!record.dynamodb) { + continue; + } else if (!record.eventName) { + console.error( + `Could not get operation from the record, skipping event "${record.eventID}".` + ); + continue; + } + + const keys = unmarshall(record.dynamodb.Keys); + if (!keys?.PK || !keys.SK) { + console.error( + `Could not get keys from the record, skipping event "${record.eventID}".` + ); + continue; + } + + const id = `${keys.PK}:${keys.SK}`; + + /** + * On operations other than REMOVE we decompress the data and store it into the Elasticsearch. + * No need to try to decompress if operation is REMOVE since there is no data sent into that operation. + */ + if ( + record.eventName === OperationType.INSERT || + record.eventName === OperationType.MODIFY + ) { + const newImage = unmarshall(record.dynamodb.NewImage); + /** + * If there is no newImage, silently continue to the next operation. + */ + if ( + !newImage || + typeof newImage !== "object" || + Object.keys(newImage).length === 0 + ) { + continue; + } + /** + * Note that with the `REMOVE` event, there is no `NewImage` property. Which means, + * if the `newImage` is `undefined`, we are dealing with a `REMOVE` event and we still + * need to process it. + */ + // + else if (newImage.ignore === true) { + // Nothing to log here, we are skipping the record intentionally. + continue; + } + /** + * Also, possibly there is no index? + */ + // + else if (!newImage.index) { + console.error( + `Could not get index from the new image, skipping event "${record.eventID}".` + ); + console.log({ newImage }); + continue; + } + /** + * We must decompress the data that is going into the Elasticsearch. + */ + const data = await this.decompressor.decompress(newImage.data); + /** + * No point in writing null or undefined data into the Elasticsearch. + * This might happen on some error while decompressing. We will log it. + * + * Data should NEVER be null or undefined in the Elasticsearch DynamoDB table, unless it is a delete operations. + * If it is - it is a bug. + */ + if (data === undefined || data === null) { + console.error( + `Could not get decompressed data, skipping ES operation "${record.eventName}", ID ${id}. Skipping...` + ); + continue; + } + + operations.insert({ + id, + index: newImage.index, + data + }); + } else if (record.eventName === OperationType.REMOVE) { + const oldImage = unmarshall(record.dynamodb.OldImage); + /** + * If there is no index found, silently continue to the next operation. + */ + if (!oldImage?.index) { + continue; + } + operations.delete({ + id, + index: oldImage.index + }); + } + } + return operations; + } +} diff --git a/packages/api-dynamodb-to-elasticsearch/src/eventHandler.ts b/packages/api-dynamodb-to-elasticsearch/src/eventHandler.ts new file mode 100644 index 00000000000..ef4b1e0f58f --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/eventHandler.ts @@ -0,0 +1,59 @@ +import { getNumberEnvVariable } from "~/helpers/getNumberEnvVariable"; +import { createDynamoDBEventHandler, timerFactory } from "@webiny/handler-aws"; +import { ElasticsearchContext } from "@webiny/api-elasticsearch/types"; +import { Decompressor } from "~/Decompressor"; +import { OperationsBuilder } from "~/OperationsBuilder"; +import { executeWithRetry } from "~/executeWithRetry"; + +const MAX_PROCESSOR_PERCENT = getNumberEnvVariable( + "MAX_ES_PROCESSOR", + process.env.NODE_ENV === "test" ? 101 : 98 +); + +/** + * Also, we need to set the maximum running time for the Lambda Function. + * https://github.com/webiny/webiny-js/blob/f7352d418da2b5ae0b781376be46785aa7ac6ae0/packages/pulumi-aws/src/apps/core/CoreOpenSearch.ts#L232 + * https://github.com/webiny/webiny-js/blob/f7352d418da2b5ae0b781376be46785aa7ac6ae0/packages/pulumi-aws/src/apps/core/CoreElasticSearch.ts#L218 + */ +const MAX_RUNNING_TIME = 900; + +export const createEventHandler = () => { + return createDynamoDBEventHandler(async ({ event, context: ctx, lambdaContext }) => { + const timer = timerFactory(lambdaContext); + const context = ctx as unknown as ElasticsearchContext; + if (!context.elasticsearch) { + console.error("Missing elasticsearch definition on context."); + return null; + } + + const decompressor = new Decompressor({ + plugins: context.plugins + }); + + const builder = new OperationsBuilder({ + decompressor + }); + + const operations = await builder.build({ + records: event.Records + }); + /** + * No need to do anything if there are no operations. + */ + if (operations.total === 0) { + return null; + } + /** + * Execute the operations with retry. + */ + await executeWithRetry({ + timer, + maxRunningTime: MAX_RUNNING_TIME, + maxProcessorPercent: MAX_PROCESSOR_PERCENT, + context, + operations + }); + + return null; + }); +}; diff --git a/packages/api-dynamodb-to-elasticsearch/src/execute.ts b/packages/api-dynamodb-to-elasticsearch/src/execute.ts new file mode 100644 index 00000000000..2bbcd6b5569 --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/execute.ts @@ -0,0 +1,136 @@ +import { + createWaitUntilHealthy, + ElasticsearchCatClusterHealthStatus, + UnhealthyClusterError, + WaitingHealthyClusterAbortedError +} from "@webiny/api-elasticsearch"; +import { ITimer } from "@webiny/handler-aws"; +import { ApiResponse, ElasticsearchContext } from "@webiny/api-elasticsearch/types"; +import { WebinyError } from "@webiny/error"; +import { IOperations } from "./types"; + +export interface BulkOperationsResponseBodyItemIndexError { + reason?: string; +} + +export interface BulkOperationsResponseBodyItemIndex { + error?: BulkOperationsResponseBodyItemIndexError; +} + +export interface BulkOperationsResponseBodyItem { + index?: BulkOperationsResponseBodyItemIndex; + error?: string; +} + +export interface BulkOperationsResponseBody { + items: BulkOperationsResponseBodyItem[]; +} + +export interface IExecuteParams { + timer: ITimer; + maxRunningTime: number; + maxProcessorPercent: number; + context: Pick; + operations: IOperations; +} + +const getError = (item: BulkOperationsResponseBodyItem): string | null => { + if (!item.index?.error?.reason) { + return null; + } + const reason = item.index.error.reason; + if (reason.match(/no such index \[([a-zA-Z0-9_-]+)\]/) !== null) { + return "index"; + } + return reason; +}; + +const checkErrors = (result?: ApiResponse): void => { + if (!result || !result.body || !result.body.items) { + return; + } + for (const item of result.body.items) { + const err = getError(item); + if (!err) { + continue; + } else if (err === "index") { + if (process.env.DEBUG === "true") { + console.error("Bulk response", JSON.stringify(result, null, 2)); + } + continue; + } + console.error(item.error); + throw new WebinyError(err, "DYNAMODB_TO_ELASTICSEARCH_ERROR", item); + } +}; + +export const execute = (params: IExecuteParams) => { + return async (): Promise => { + const { context, timer, maxRunningTime, maxProcessorPercent, operations } = params; + const remainingTime = timer.getRemainingSeconds(); + const runningTime = maxRunningTime - remainingTime; + const maxWaitingTime = remainingTime - 90; + + if (process.env.DEBUG === "true") { + console.debug( + `The Lambda is already running for ${runningTime}s. Setting Health Check max waiting time: ${maxWaitingTime}s` + ); + } + + const healthCheck = createWaitUntilHealthy(context.elasticsearch, { + minClusterHealthStatus: ElasticsearchCatClusterHealthStatus.Yellow, + waitingTimeStep: 30, + maxProcessorPercent, + maxWaitingTime + }); + + try { + await healthCheck.wait({ + async onUnhealthy({ startedAt, runs, mustEndAt, waitingTimeStep, waitingReason }) { + console.debug(`Cluster is unhealthy on run #${runs}.`, { + startedAt, + mustEndAt, + waitingTimeStep, + waitingReason + }); + }, + async onTimeout({ startedAt, runs, waitingTimeStep, mustEndAt, waitingReason }) { + console.error(`Cluster health check timeout on run #${runs}.`, { + startedAt, + mustEndAt, + waitingTimeStep, + waitingReason + }); + } + }); + } catch (ex) { + if ( + ex instanceof UnhealthyClusterError || + ex instanceof WaitingHealthyClusterAbortedError + ) { + throw ex; + } + console.error(`Cluster health check failed.`, ex); + throw ex; + } + + try { + const res = await context.elasticsearch.bulk({ + body: operations.items + }); + checkErrors(res); + } catch (error) { + if (process.env.DEBUG !== "true") { + throw error; + } + const meta = error?.meta || {}; + delete meta["meta"]; + console.error("Bulk error", JSON.stringify(error, null, 2)); + throw error; + } + if (process.env.DEBUG !== "true") { + return; + } + console.info(`Transferred ${operations.total} record operations to Elasticsearch.`); + }; +}; diff --git a/packages/api-dynamodb-to-elasticsearch/src/executeWithRetry.ts b/packages/api-dynamodb-to-elasticsearch/src/executeWithRetry.ts new file mode 100644 index 00000000000..f9e442fe08d --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/executeWithRetry.ts @@ -0,0 +1,66 @@ +import { execute, IExecuteParams } from "~/execute"; +import { NotEnoughRemainingTimeError } from "~/NotEnoughRemainingTimeError"; +import pRetry from "p-retry"; +import { getNumberEnvVariable } from "./helpers/getNumberEnvVariable"; + +const minRemainingSecondsToTimeout = 120; + +export interface IExecuteWithRetryParams extends IExecuteParams { + maxRetryTime?: number; + retries?: number; + minTimeout?: number; + maxTimeout?: number; +} + +export const executeWithRetry = async (params: IExecuteWithRetryParams) => { + const maxRetryTime = getNumberEnvVariable( + "WEBINY_DYNAMODB_TO_ELASTICSEARCH_MAX_RETRY_TIME", + params.maxRetryTime || 300000 + ); + const retries = getNumberEnvVariable( + "WEBINY_DYNAMODB_TO_ELASTICSEARCH_RETRIES", + params.retries || 20 + ); + const minTimeout = getNumberEnvVariable( + "WEBINY_DYNAMODB_TO_ELASTICSEARCH_MIN_TIMEOUT", + params.minTimeout || 1500 + ); + const maxTimeout = getNumberEnvVariable( + "WEBINY_DYNAMODB_TO_ELASTICSEARCH_MAX_TIMEOUT", + params.maxTimeout || 30000 + ); + + try { + await pRetry( + execute({ + timer: params.timer, + maxRunningTime: params.maxRunningTime, + maxProcessorPercent: params.maxProcessorPercent, + context: params.context, + operations: params.operations + }), + { + maxRetryTime, + retries, + minTimeout, + maxTimeout, + onFailedAttempt: error => { + if (params.timer.getRemainingSeconds() < minRemainingSecondsToTimeout) { + throw new NotEnoughRemainingTimeError(error); + } + /** + * We will only log attempts which are after 3/4 of total attempts. + */ + if (error.attemptNumber < retries * 0.75) { + return; + } + console.error(`Attempt #${error.attemptNumber} failed.`); + console.error(error); + } + } + ); + } catch (ex) { + // TODO implement storing of failed operations + throw ex; + } +}; diff --git a/packages/api-dynamodb-to-elasticsearch/src/helpers/getNumberEnvVariable.ts b/packages/api-dynamodb-to-elasticsearch/src/helpers/getNumberEnvVariable.ts new file mode 100644 index 00000000000..43df24b6f5e --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/helpers/getNumberEnvVariable.ts @@ -0,0 +1,10 @@ +export const getNumberEnvVariable = (name: string, def: number): number => { + const input = process.env[name]; + const value = Number(input); + if (isNaN(value)) { + return def; + } else if (value <= 0) { + return def; + } + return value; +}; diff --git a/packages/api-dynamodb-to-elasticsearch/src/index.ts b/packages/api-dynamodb-to-elasticsearch/src/index.ts index 212dec152c0..99a39edec60 100644 --- a/packages/api-dynamodb-to-elasticsearch/src/index.ts +++ b/packages/api-dynamodb-to-elasticsearch/src/index.ts @@ -1,343 +1,9 @@ -import WebinyError from "@webiny/error"; -import { AttributeValue, unmarshall as baseUnmarshall } from "@webiny/aws-sdk/client-dynamodb"; -import { - createWaitUntilHealthy, - decompress, - UnhealthyClusterError, - WaitingHealthyClusterAbortedError -} from "@webiny/api-elasticsearch"; -import { ApiResponse, ElasticsearchContext } from "@webiny/api-elasticsearch/types"; -import { createDynamoDBEventHandler, timerFactory } from "@webiny/handler-aws"; -import { ElasticsearchCatClusterHealthStatus } from "@webiny/api-elasticsearch/operations/types"; -import pRetry from "p-retry"; -import { NotEnoughRemainingTimeError } from "./NotEnoughRemainingTimeError"; - -export enum Operations { - INSERT = "INSERT", - MODIFY = "MODIFY", - REMOVE = "REMOVE" -} - -interface BulkOperationsResponseBodyItemIndexError { - reason?: string; -} - -interface BulkOperationsResponseBodyItemIndex { - error?: BulkOperationsResponseBodyItemIndexError; -} - -interface BulkOperationsResponseBodyItem { - index?: BulkOperationsResponseBodyItemIndex; - error?: string; -} - -interface BulkOperationsResponseBody { - items: BulkOperationsResponseBodyItem[]; -} - -const getError = (item: BulkOperationsResponseBodyItem): string | null => { - if (!item.index?.error?.reason) { - return null; - } - const reason = item.index.error.reason; - if (reason.match(/no such index \[([a-zA-Z0-9_-]+)\]/) !== null) { - return "index"; - } - return reason; -}; - -const checkErrors = (result?: ApiResponse): void => { - if (!result || !result.body || !result.body.items) { - return; - } - for (const item of result.body.items) { - const err = getError(item); - if (!err) { - continue; - } else if (err === "index") { - if (process.env.DEBUG === "true") { - console.error("Bulk response", JSON.stringify(result, null, 2)); - } - continue; - } - console.error(item.error); - throw new WebinyError(err, "DYNAMODB_TO_ELASTICSEARCH_ERROR", item); - } -}; - -const getNumberEnvVariable = (name: string, def: number): number => { - const input = process.env[name]; - const value = Number(input); - if (isNaN(value)) { - return def; - } else if (value <= 0) { - return def; - } - return value; -}; - -const MAX_PROCESSOR_PERCENT = getNumberEnvVariable( - "MAX_ES_PROCESSOR", - process.env.NODE_ENV === "test" ? 101 : 98 -); - -interface RecordDynamoDbImage { - data: Record; - ignore?: boolean; - index: string; -} - -interface RecordDynamoDbKeys { - PK: string; - SK: string; -} - -const unmarshall = (value?: Record): T | undefined => { - if (!value) { - return undefined; - } - return baseUnmarshall(value) as T; -}; - -const minRemainingSecondsToTimeout = 120; - -/** - * Also, we need to set the maximum running time for the Lambda Function. - * https://github.com/webiny/webiny-js/blob/f7352d418da2b5ae0b781376be46785aa7ac6ae0/packages/pulumi-aws/src/apps/core/CoreOpenSearch.ts#L232 - * https://github.com/webiny/webiny-js/blob/f7352d418da2b5ae0b781376be46785aa7ac6ae0/packages/pulumi-aws/src/apps/core/CoreElasticSearch.ts#L218 - */ -const MAX_RUNNING_TIME = 900; - -export const createEventHandler = () => { - return createDynamoDBEventHandler(async ({ event, context: ctx, lambdaContext }) => { - const timer = timerFactory(lambdaContext); - const context = ctx as unknown as ElasticsearchContext; - if (!context.elasticsearch) { - console.error("Missing elasticsearch definition on context."); - return null; - } - - const operations: Record[] = []; - - const operationIdList: string[] = []; - - for (const record of event.Records) { - const dynamodb = record.dynamodb; - if (!dynamodb) { - continue; - } - /** - * TODO: figure out correct types - */ - // @ts-expect-error - const newImage = unmarshall(dynamodb.NewImage); - - // Note that with the `REMOVE` event, there is no `NewImage` property. Which means, - // if the `newImage` is `undefined`, we are dealing with a `REMOVE` event and we still - // need to process it. - if (newImage && newImage.ignore === true) { - continue; - } - /** - * TODO: figure out correct types - */ - // @ts-expect-error - const keys = unmarshall(dynamodb.Keys); - if (!keys?.PK || !keys.SK) { - continue; - } - const _id = `${keys.PK}:${keys.SK}`; - /** - * TODO: figure out correct types - */ - // @ts-expect-error - const oldImage = unmarshall(dynamodb.OldImage); - const operation = record.eventName; - - /** - * On operations other than REMOVE we decompress the data and store it into the Elasticsearch. - * No need to try to decompress if operation is REMOVE since there is no data sent into that operation. - */ - let data: any = undefined; - if (newImage && operation !== Operations.REMOVE) { - /** - * We must decompress the data that is going into the Elasticsearch. - */ - data = await decompress(context.plugins, newImage.data); - /** - * No point in writing null or undefined data into the Elasticsearch. - * This might happen on some error while decompressing. We will log it. - * - * Data should NEVER be null or undefined in the Elasticsearch DynamoDB table, unless it is a delete operations. - * If it is - it is a bug. - */ - if (data === undefined || data === null) { - console.error( - `Could not get decompressed data, skipping ES operation "${operation}", ID ${_id}` - ); - continue; - } - } - - operationIdList.push(_id); - - switch (record.eventName) { - case Operations.INSERT: - case Operations.MODIFY: - if (newImage) { - operations.push( - { - index: { - _id, - _index: newImage.index - } - }, - data - ); - } - break; - case Operations.REMOVE: - operations.push({ - delete: { - _id, - _index: oldImage?.index || "unknown" - } - }); - break; - default: - break; - } - } - /** - * No need to do anything if there are no operations. - */ - if (operations.length === 0) { - return null; - } - /** - * Wrap the code we need to run into the function, so it can be called within itself. - */ - const execute = async (): Promise => { - const remainingTime = timer.getRemainingSeconds(); - const runningTime = MAX_RUNNING_TIME - remainingTime; - const maxWaitingTime = remainingTime - 90; - - if (process.env.DEBUG === "true") { - console.debug( - `The Lambda is already running for ${runningTime}s. Setting Health Check max waiting time: ${maxWaitingTime}s` - ); - } - - const healthCheck = createWaitUntilHealthy(context.elasticsearch, { - minClusterHealthStatus: ElasticsearchCatClusterHealthStatus.Yellow, - waitingTimeStep: 30, - maxProcessorPercent: MAX_PROCESSOR_PERCENT, - maxWaitingTime - }); - - try { - await healthCheck.wait({ - async onUnhealthy({ - startedAt, - runs, - mustEndAt, - waitingTimeStep, - waitingReason - }) { - console.debug(`Cluster is unhealthy on run #${runs}.`, { - startedAt, - mustEndAt, - waitingTimeStep, - waitingReason - }); - }, - async onTimeout({ - startedAt, - runs, - waitingTimeStep, - mustEndAt, - waitingReason - }) { - console.error(`Cluster health check timeout on run #${runs}.`, { - startedAt, - mustEndAt, - waitingTimeStep, - waitingReason - }); - } - }); - } catch (ex) { - if ( - ex instanceof UnhealthyClusterError || - ex instanceof WaitingHealthyClusterAbortedError - ) { - throw ex; - } - console.error(`Cluster health check failed.`, ex); - throw ex; - } - - try { - const res = await context.elasticsearch.bulk({ - body: operations - }); - checkErrors(res); - } catch (error) { - if (process.env.DEBUG !== "true") { - throw error; - } - const meta = error?.meta || {}; - delete meta["meta"]; - console.error("Bulk error", JSON.stringify(error, null, 2)); - throw error; - } - if (process.env.DEBUG !== "true") { - return; - } - console.info( - `Transferred ${operations.length / 2} record operations to Elasticsearch.` - ); - }; - - const maxRetryTime = getNumberEnvVariable( - "WEBINY_DYNAMODB_TO_ELASTICSEARCH_MAX_RETRY_TIME", - 300000 - ); - const retries = getNumberEnvVariable("WEBINY_DYNAMODB_TO_ELASTICSEARCH_RETRIES", 20); - const minTimeout = getNumberEnvVariable( - "WEBINY_DYNAMODB_TO_ELASTICSEARCH_MIN_TIMEOUT", - 1500 - ); - const maxTimeout = getNumberEnvVariable( - "WEBINY_DYNAMODB_TO_ELASTICSEARCH_MAX_TIMEOUT", - 30000 - ); - - try { - await pRetry(execute, { - maxRetryTime, - retries, - minTimeout, - maxTimeout, - onFailedAttempt: error => { - if (timer.getRemainingSeconds() < minRemainingSecondsToTimeout) { - throw new NotEnoughRemainingTimeError(error); - } - /** - * We will only log attempts which are after 3/4 of total attempts. - */ - if (error.attemptNumber < retries * 0.75) { - return; - } - console.error(`Attempt #${error.attemptNumber} failed.`); - console.error(error); - } - }); - } catch (ex) { - // TODO implement storing of failed operations - throw ex; - } - - return null; - }); -}; +export * from "./Decompressor"; +export * from "./eventHandler"; +export * from "./execute"; +export * from "./executeWithRetry"; +export * from "./marshall"; +export * from "./NotEnoughRemainingTimeError"; +export * from "./Operations"; +export * from "./OperationsBuilder"; +export * from "./types"; diff --git a/packages/api-dynamodb-to-elasticsearch/src/marshall.ts b/packages/api-dynamodb-to-elasticsearch/src/marshall.ts new file mode 100644 index 00000000000..40ac68e1d72 --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/marshall.ts @@ -0,0 +1,29 @@ +import { + marshall as baseMarshall, + unmarshall as baseUnmarshall +} from "@webiny/aws-sdk/client-dynamodb"; +import { GenericRecord } from "@webiny/cli/types"; + +import { AttributeValue } from "@webiny/handler-aws/types"; + +export interface MarshalledValue { + [key: string]: AttributeValue; +} + +export const marshall = (value: GenericRecord): MarshalledValue => { + if (!value) { + return value; + } + return baseMarshall(value) as MarshalledValue; +}; + +export const unmarshall = (value?: MarshalledValue): T | undefined => { + if (!value) { + return undefined; + } + /** + * We can safely cast the return value to `T` because we are 100% positive that this is correct. + */ + // @ts-expect-error + return baseUnmarshall(value) as T; +}; diff --git a/packages/api-dynamodb-to-elasticsearch/src/types.ts b/packages/api-dynamodb-to-elasticsearch/src/types.ts new file mode 100644 index 00000000000..2707505d00e --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/types.ts @@ -0,0 +1,35 @@ +import { GenericRecord } from "@webiny/cli/types"; +import { DynamoDBRecord } from "@webiny/handler-aws/types"; + +export interface IOperationsBuilderBuildParams { + records: DynamoDBRecord[]; +} + +export interface IOperationsBuilder { + build(params: IOperationsBuilderBuildParams): Promise; +} + +export interface IInsertOperationParams { + id: string; + index: string; + data: GenericRecord; +} + +export type IModifyOperationParams = IInsertOperationParams; + +export interface IDeleteOperationParams { + id: string; + index: string; +} + +export interface IOperations { + items: GenericRecord[]; + total: number; + insert(params: IInsertOperationParams): void; + modify(params: IModifyOperationParams): void; + delete(params: IDeleteOperationParams): void; +} + +export interface IDecompressor { + decompress(data: GenericRecord): Promise; +} diff --git a/packages/handler-aws/package.json b/packages/handler-aws/package.json index c63b112fcc5..25dff515fce 100644 --- a/packages/handler-aws/package.json +++ b/packages/handler-aws/package.json @@ -13,12 +13,12 @@ "license": "MIT", "dependencies": { "@fastify/aws-lambda": "3.1.3", - "@types/aws-lambda": "^8.10.134", "@webiny/aws-sdk": "0.0.0", "@webiny/handler": "0.0.0", "@webiny/handler-client": "0.0.0", "@webiny/plugins": "0.0.0", "@webiny/utils": "0.0.0", + "aws-lambda": "^1.0.7", "fastify": "4.15.0" }, "devDependencies": { @@ -36,15 +36,5 @@ "build": "yarn webiny run build", "watch": "yarn webiny run watch" }, - "gitHead": "8476da73b653c89cc1474d968baf55c1b0ae0e5f", - "adio": { - "ignore": { - "src": [ - "aws-lambda" - ], - "dependencies": [ - "@types/aws-lambda" - ] - } - } + "gitHead": "8476da73b653c89cc1474d968baf55c1b0ae0e5f" } diff --git a/packages/handler-aws/src/types.ts b/packages/handler-aws/src/types.ts index 079957f9140..aa944ca6369 100644 --- a/packages/handler-aws/src/types.ts +++ b/packages/handler-aws/src/types.ts @@ -1,18 +1,19 @@ import type { APIGatewayEvent, - Context as LambdaContext, APIGatewayEventRequestContextWithAuthorizer, + Context as LambdaContext, DynamoDBStreamEvent, EventBridgeEvent, S3Event, - SQSEvent, - SNSEvent + SNSEvent, + SQSEvent } from "aws-lambda"; - import "fastify"; import { CreateHandlerParams as BaseCreateHandlerParams } from "@webiny/handler"; import { LambdaFastifyOptions as LambdaOptions } from "@fastify/aws-lambda"; +export { AttributeValue, DynamoDBRecord } from "aws-lambda"; + export { HandlerRegistry } from "~/registry"; export * from "@webiny/handler/types";