Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

fix: improve error logging for ddbToEs sync #68

Merged
merged 2 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/AWS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ const AWSWithXray = AWSXRay.captureAWS(AWS);
const { IS_OFFLINE } = process.env;

if (IS_OFFLINE === 'true') {
AWS.config.update({
AWSWithXray.config.update({
region: process.env.AWS_REGION || 'us-west-2',
accessKeyId: process.env.ACCESS_KEY,
secretAccessKey: process.env.SECRET_KEY,
});
} else {
AWS.config.update({
AWSWithXray.config.update({
customUserAgent: process.env.CUSTOM_USER_AGENT,
});
}
Expand Down
85 changes: 40 additions & 45 deletions src/ddbToEs/ddbToEsHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ export default class DdbToEsHelper {
await this.ElasticSearch.indices.create(params);
}
} catch (error) {
console.log('Failed to check if index exist or create index', error);
console.error(`Failed to check if index: ${indexName} exist or create index`);
throw error;
}
}

Expand Down Expand Up @@ -140,64 +141,58 @@ export default class DdbToEsHelper {

// eslint-disable-next-line class-methods-use-this
async logAndExecutePromises(promiseParamAndIds: PromiseParamAndId[]) {
const upsertAvailablePromiseParamAndIds = promiseParamAndIds.filter(paramAndId => {
return paramAndId.type === 'upsert-AVAILABLE';
});

const upsertDeletedPromiseParamAndIds = promiseParamAndIds.filter(paramAndId => {
return paramAndId.type === 'upsert-DELETED';
});

const deletePromiseParamAndIds = promiseParamAndIds.filter(paramAndId => {
return paramAndId.type === 'delete';
});

console.log(
`Operation: upsert-AVAILABLE on resource Ids `,
upsertAvailablePromiseParamAndIds.map(paramAndId => {
return paramAndId.id;
}),
);

// We're using allSettled-shim because as of 7/21/2020 'serverless-plugin-typescript' does not support
// Promise.allSettled.
allSettled.shim();

// We need to execute creation of a resource before execute deleting of a resource,
// because a resource can be created and deleted, but not deleted then restored to AVAILABLE
// @ts-ignore
await Promise.allSettled(
upsertAvailablePromiseParamAndIds.map(paramAndId => {
return this.ElasticSearch.update(paramAndId.promiseParam);
}),
);
await this.executePromiseBlock('upsert-AVAILABLE', promiseParamAndIds);
await this.executePromiseBlock('upsert-DELETED', promiseParamAndIds);
await this.executePromiseBlock('delete', promiseParamAndIds);
}

console.log(
`Operation: upsert-DELETED on resource Ids `,
upsertDeletedPromiseParamAndIds.map(paramAndId => {
return paramAndId.id;
}),
);
// eslint-disable-next-line class-methods-use-this
private async executePromiseBlock(type: PromiseType, promiseParamAndIds: PromiseParamAndId[]) {
const filteredPromiseParamAndIds = promiseParamAndIds.filter(paramAndId => {
return paramAndId.type === type;
});

// @ts-ignore
await Promise.allSettled(
upsertDeletedPromiseParamAndIds.map(paramAndId => {
return this.ElasticSearch.update(paramAndId.promiseParam);
}),
);
if (filteredPromiseParamAndIds.length === 0) {
return;
}

console.log(
`Operation: delete on resource Ids `,
deletePromiseParamAndIds.map(paramAndId => {
`Starting operation "${type}" on resource Ids: `,
filteredPromiseParamAndIds.map(paramAndId => {
return paramAndId.id;
}),
);

// @ts-ignore
await Promise.allSettled(
deletePromiseParamAndIds.map(paramAndId => {
return this.ElasticSearch.delete(paramAndId.promiseParam);
const results = await Promise.allSettled(
filteredPromiseParamAndIds.map(async paramAndId => {
try {
let response;
if (type === 'upsert-AVAILABLE' || type === 'upsert-DELETED') {
response = await this.ElasticSearch.update(paramAndId.promiseParam);
} else if (type === 'delete') {
response = await this.ElasticSearch.delete(paramAndId.promiseParam);
} else {
throw new Error(`unknown type: ${type}`);
}
return response;
} catch (e) {
console.error(`${type} failed on id: ${paramAndId.id}, due to error:\n${e}`);
throw e;
}
}),
);

// Throw rejected promises
const rejected = results
.filter((result: { status: string }) => result.status === 'rejected')
.map((result: { reason: string }) => result.reason);
if (rejected.length > 0) {
throw new Error(rejected);
}
}
}
17 changes: 15 additions & 2 deletions src/ddbToEs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ export async function handleDdbToEsEvent(event: any) {
const image = AWS.DynamoDB.Converter.unmarshall(ddbJsonImage);
// Don't index binary files
if (ddbToEsHelper.isBinaryResource(image)) {
console.log('This is a Binary resource. These are not searchable');
// eslint-disable-next-line no-continue
continue;
}
Expand All @@ -47,6 +46,20 @@ export async function handleDdbToEsEvent(event: any) {

await ddbToEsHelper.logAndExecutePromises(promiseParamAndIds);
} catch (e) {
console.log('Failed to update ES records', e);
console.error(
'Synchonization failed! The resources that could be effected are: ',
rsmayda marked this conversation as resolved.
Show resolved Hide resolved
event.Records.map(
(record: {
eventName: string;
dynamodb: { OldImage: AWS.DynamoDB.AttributeMap; NewImage: AWS.DynamoDB.AttributeMap };
}) => {
const image = record.eventName === REMOVE ? record.dynamodb.OldImage : record.dynamodb.NewImage;
return `{id: ${image.id.S}, vid: ${image.vid.N}}`;
},
),
);

console.error('Failed to update ES records', e);
throw e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this will result in some messages that were processed successfully being sent to the dlq, since a single message makes the batch fail and retrying the same batch will continue to fail.

It is common to use a batch size of 1 to workaround this issue. An alternative is to enable BisectBatchOnFunctionError although I haven't used that setting before an I'm not sure about how it interacts with MaximumRetryAttempts

https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror

Copy link
Contributor Author

@rsmayda rsmayda Apr 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that is exactly the case some messages will succeed but the batch will fail if a single message fails.

I worry batch size of 1 may slow down our our sync too much. I looked into BisectBatch but as you mentioned I was not sure how it works with MaxRetry and I wasn't able to find documentation around it either. I suspect that it will Bisect at most MaxRetry times.

These writes are mostly idempotent, but there could be a use-case of a resource's availability switches due to this. ie 1) "AVAILIABLE" write fails and goes to DLQ, 2) "DELETE" write passes, 3) DLQ redrive changes the ES doc from DELTED -> AVAILIVABLE

A thing to note this DLQ redrive is a manual process and in reality I suspect that this operation would need a runbook laying out when to 'redrive' the DLQ and when not to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that guaranteeing that only the failed messages go to the DLQ is a very desirable property of the system. Otherwise ops become harder for customers for no good reason (why are there so many DLQ messages? How come only 6% of them actually failed? How can I know which of them actually failed?)

Another desirable property is handling out of order messages. Our current implementation does not do that(not the same as idempotency). It could be achieved by updating ES only if the vid of the incoming message is higher than the vid of the document in ES. This would make it safe to redrive DLQ messages. I think we can tackle this later as a separate issue.

IMO sending only the failed messages to the DLQ should be done now (can still be a different PR). I agree that BisectBatch has scarce documentation, but is worth testing it out. Maybe MaxRetry=4 and BisectBatch=true with our BatchSize=15 will effectively isolate the error to a single record.

The cheap alternative is MaxRetry=1.

I worry batch size of 1 may slow down our our sync too much

My intuition tells me the same, but we need data in order to discard that approach

}
}