Skip to content

Commit

Permalink
[CHK-2154] fix(retry): catch uncaught errors in retry queue listener (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
giovanniberti authored Nov 29, 2023
1 parent 5f973b1 commit e0aa897
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 67 deletions.
4 changes: 2 additions & 2 deletions jest.setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ process.env.CLIENT_ECOMMERCE='{"TEMPLATE_IDS":["success","ko"]}'
process.env.CLIENT_ECOMMERCE_TEST='{"TEMPLATE_IDS":["success","ko","poc-1","poc-2"]}'
process.env.STORAGE_TRANSIENT_CONNECTION_STRING="AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;"
process.env.STORAGE_DEADLETTER_CONNECTION_STRING="AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;"
process.env.RETRY_QUEUE_NAME="retry-queue"
process.env.ERROR_QUEUE_NAME="error-queue"
process.env.RETRY_QUEUE_NAME="pagopa-ecommerce-notifications-service-retry-queue"
process.env.ERROR_QUEUE_NAME="pagopa-ecommerce-notifications-service-errors-queue"
process.env.INITIAL_RETRY_TIMEOUT_SECONDS="120"
process.env.MAX_RETRY_ATTEMPTS="3"
process.env.AI_SAMPLING_PERCENTAGE="30"
Expand Down
2 changes: 1 addition & 1 deletion src/__tests__/ErrorQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ describe("error queue", () => {
expect(spySendMessages).toBeCalled();
jest.useRealTimers();
});
});
});
67 changes: 59 additions & 8 deletions src/__tests__/RetryQueueListener.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ describe("retry queue", () => {
lang: {language: "IT" }
} as any;

const mockDeleteMessage = jest.fn((messageId, popReceipt, options) => {
return Promise.resolve(messageId);
});

retryQueueClient.deleteMessage = mockDeleteMessage;

it("sendMessageToRetryQueue", async () => {
jest.useFakeTimers();
const emailMockedFunction = jest.fn();
Expand All @@ -53,9 +59,14 @@ describe("retry queue", () => {
args: ["--no-sandbox"],
headless: true
});

retryQueueClient.createIfNotExists = jest.fn().mockResolvedValue({});


const mockedMailFunction = jest.fn().mockResolvedValueOnce(sentMessageMock(1)).mockResolvedValueOnce(sentMessageMock(3)).mockResolvedValueOnce(sentMessageMock(3));
const mailTrasporterMock = {
sendMail: mockedMailFunction
} as unknown as Transporter<SentMessageInfo>;

const mockReceiveMessages = jest.fn().mockResolvedValueOnce({receivedMessageItems:
[{
messageId: "1",
Expand All @@ -76,22 +87,62 @@ describe("retry queue", () => {

retryQueueClient.receiveMessages = mockReceiveMessages;

const mockDeleteMessage = jest.fn().mockResolvedValueOnce("1").mockResolvedValueOnce("2").mockResolvedValueOnce("3");
retryQueueClient.deleteMessage = mockDeleteMessage;
const spySendMail = jest.spyOn(mailTrasporterMock,'sendMail');
retryQueueClient.createIfNotExists();
const spyDecrypt = jest.spyOn(apiPdvClient, 'findPiiUsingGET').mockResolvedValue({_tag: "Right", right:{ status:200 , value: {pii: JSON.stringify(requestMock.body)}, headers: "" as any} });

const mockedMailFunction = jest.fn().mockResolvedValueOnce(sentMessageMock(1)).mockResolvedValueOnce(sentMessageMock(3)).mockResolvedValueOnce(sentMessageMock(3));
addRetryQueueListener(config,mailTrasporterMock,browser);

expect(setInterval).toHaveBeenCalledTimes(1);
expect(setInterval).toHaveBeenLastCalledWith(expect.any(Function), 1000);
jest.advanceTimersByTime(1000);
expect(mockReceiveMessages).toHaveBeenCalledTimes(1);

console.log(mockedMailFunction.mock.calls.length);

await browser?.close();
jest.useRealTimers();
});

it("retry queue event listener catches errors", async () => {
jest.useFakeTimers();
const emailMockedFunction = jest.fn();
registerHelpers();

jest.spyOn(global, 'setInterval');

const browser = await puppeteer.launch({
args: ["--no-sandbox"],
headless: true
});

retryQueueClient.createIfNotExists = jest.fn().mockResolvedValue({});

const mockReceiveMessages = jest.fn().mockResolvedValueOnce({receivedMessageItems:
[{
messageId: "1",
popReceipt: "1PR",
messageText: JSON.stringify(requestMock)
}
]} as QueueReceiveMessageResponse);

retryQueueClient.receiveMessages = mockReceiveMessages;

const mockedMailFunction = jest.fn().mockResolvedValueOnce(sentMessageMock(1));
const mailTrasporterMock = {
sendMail: mockedMailFunction
} as unknown as Transporter<SentMessageInfo>;
const spySendMail = jest.spyOn(mailTrasporterMock,'sendMail');
retryQueueClient.createIfNotExists();
const spyDecrypt = jest.spyOn(apiPdvClient, 'findPiiUsingGET').mockResolvedValue({_tag: "Right", right:{ status:200 , value: {pii: JSON.stringify(requestMock.body)}, headers: "" as any} });
addRetryQueueListener(config,mailTrasporterMock,browser);
const spyDecrypt = jest.spyOn(apiPdvClient, 'findPiiUsingGET').mockResolvedValue({_tag: "Right", right:{ status: 400 , value: { status: 400, title: ""}, headers: "" as any} });

addRetryQueueListener(config, mailTrasporterMock, browser);

expect(setInterval).toHaveBeenCalledTimes(1);
expect(setInterval).toHaveBeenLastCalledWith(expect.any(Function), 1000);
jest.advanceTimersByTime(1000);
expect(mockReceiveMessages.mock.calls.length).toBe(1);

expect(mockReceiveMessages).toHaveBeenCalledTimes(1);

await browser?.close();
jest.useRealTimers();
Expand Down
115 changes: 65 additions & 50 deletions src/queues/RetryQueueListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,75 @@ export const addRetryQueueListener = (
browserEngine: Browser
): void => {
const retrieveMessage = async (): Promise<void> => {
const messages = await retryQueueClient.receiveMessages({
numberOfMessages: 14
});
try {
const messages = await retryQueueClient.receiveMessages({
numberOfMessages: 14
});

if (messages?.receivedMessageItems.length > 0) {
logger.info(
`Retrying ${messages.receivedMessageItems.length} enqueued messages`
);
for (const message of messages.receivedMessageItems) {
await retryQueueClient.deleteMessage(
message.messageId,
message.popReceipt
);
const { clientId, bodyEncrypted, retryCount } = JSON.parse(
message.messageText
if (messages?.receivedMessageItems.length > 0) {
logger.info(
`Retrying ${messages.receivedMessageItems.length} enqueued messages`
);
logger.info(bodyEncrypted);
await pipe(
decryptBody(bodyEncrypted),
TE.bimap(
e => {
logger.error(`Error while invoke PDV while decrypt body: ${e} `);
// Error case: we fail to decrypt the request body -> we write the same event on the retry queque with a decremented retryCount
writeMessageIntoQueue(
bodyEncrypted,
clientId,
retryCount - 1,
config
);
},
// Happy path: we successfully decrypted the request body and can retry sending the email
async bodyDecrypted => {
const bodyRequest = JSON.parse(
bodyDecrypted
) as NotificationEmailRequest;
const templateId = bodyRequest.templateId;
const schema = await import(
`../generated/templates/${templateId}/schema.js`
);
void sendEmail(
{
"X-Client-Id": clientId,
body: bodyRequest
for (const message of messages.receivedMessageItems) {
try {
await retryQueueClient.deleteMessage(
message.messageId,
message.popReceipt
);

const { clientId, bodyEncrypted, retryCount } = JSON.parse(
message.messageText
);
logger.info(bodyEncrypted);
await pipe(
decryptBody(bodyEncrypted),
TE.bimap(
e => {
logger.error(
`Error while invoke PDV while decrypt body: ${e} `
);
// Error case: we fail to decrypt the request body -> we write the same event on the retry queque with a decremented retryCount
writeMessageIntoQueue(
bodyEncrypted,
clientId,
retryCount - 1,
config
);
},
schema,
browserEngine,
mailTrasporter,
config,
retryCount - 1
);
}
)
)();
// Happy path: we successfully decrypted the request body and can retry sending the email
async bodyDecrypted => {
const bodyRequest = JSON.parse(
bodyDecrypted
) as NotificationEmailRequest;
const templateId = bodyRequest.templateId;
const schema = await import(
`../generated/templates/${templateId}/schema.js`
);
void sendEmail(
{
"X-Client-Id": clientId,
body: bodyRequest
},
schema,
browserEngine,
mailTrasporter,
config,
retryCount - 1
);
}
)
)();
} catch (e) {
logger.error(
`Caught exception while processing message from retry queue with messageId ${message.messageId}`
);
}
}
}
} catch (e) {
logger.error(
`Caught exception while retrieving messages from queue: ${e}`
);
}
};

Expand Down
40 changes: 34 additions & 6 deletions src/util/confidentialDataManager.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { pipe } from "fp-ts/lib/function";
import * as TE from "fp-ts/lib/TaskEither";
import * as E from "fp-ts/lib/Either";
import * as t from "io-ts";
import nodeFetch from "node-fetch";
import { TypeofApiResponse } from "@pagopa/ts-commons/lib/requests";
import { FindPiiUsingGETT } from "@src/generated/personal-data-vault/requestTypes";
import { createClient } from "../generated/personal-data-vault/client";
import { getConfigOrThrow } from "./config";
import { logger } from "./logger";

const config = getConfigOrThrow();

Expand Down Expand Up @@ -37,13 +42,36 @@ export const encryptBody = (body: string): TE.TaskEither<Error, string> =>

export const decryptBody = (opaqueData: string): TE.TaskEither<Error, string> =>
pipe(
() =>
apiPdvClient.findPiiUsingGET({
api_key: config.PERSONAL_DATA_VAULT_API_KEY,
token: opaqueData
}),
() => {
try {
return apiPdvClient.findPiiUsingGET({
api_key: config.PERSONAL_DATA_VAULT_API_KEY,
token: opaqueData
});
} catch (e) {
logger.error(
`Got unexpected error while invoking PDV for decrypting body: ${e}`
);
return new Promise(
(
res: (
value: t.Validation<TypeofApiResponse<FindPiiUsingGETT>>
) => void
) => res(E.left([]))
);
}
},
TE.fold(
errs => TE.left(Error(`Got error: ${errs}`)),
errs => {
// eslint-disable-next-line functional/no-let
let logMessageDetail = JSON.stringify(errs);

if (errs.length === 0) {
logMessageDetail = "";
}

return TE.left(Error(`Got error: ${logMessageDetail}`));
},
res => {
if (res.status === 200) {
return TE.right(res.value.pii);
Expand Down

0 comments on commit e0aa897

Please sign in to comment.