Skip to content

Commit

Permalink
fix: NotificationsManager sendHandler doesn't need transaction block
Browse files Browse the repository at this point in the history
  • Loading branch information
amydevs committed Apr 22, 2024
1 parent db58000 commit f310eb3
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 60 deletions.
118 changes: 58 additions & 60 deletions src/notifications/NotificationsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,70 +146,68 @@ class NotificationsManager {
retryIntervalTimeMax: number;
},
) => {
return await this.db.withTransactionF(async (tran) => {
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded)!;
const notificationId = notificationsUtils.decodeNotificationId(
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded)!;
const notificationId = notificationsUtils.decodeNotificationId(
notificationIdEncoded,
)!;
const notificationKeyPath = [
...this.notificationsManagerOutboxDbPath,
notificationId.toBuffer(),
];
const notificationDb =
(await this.db.get<NotificationDB>(notificationKeyPath))!;
const signedNotification = await notificationsUtils.generateNotification(
{
notificationIdEncoded,
)!;
const notificationKeyPath = [
...this.notificationsManagerOutboxDbPath,
notificationId.toBuffer(),
];
const notificationDb =
(await tran.get<NotificationDB>(notificationKeyPath))!;
const signedNotification = await notificationsUtils.generateNotification(
{
notificationIdEncoded,
...notificationDb,
},
this.keyRing.keyPair,
);
// The task id if a new task has been scheduled for a retry.
try {
await this.nodeManager.withConnF(nodeId, async (connection) => {
const client = connection.getClient();
await client.methods.notificationsSend({
signedNotificationEncoded: signedNotification,
});
...notificationDb,
},
this.keyRing.keyPair,
);
// The task id if a new task has been scheduled for a retry.
try {
await this.nodeManager.withConnF(nodeId, async (connection) => {
const client = connection.getClient();
await client.methods.notificationsSend({
signedNotificationEncoded: signedNotification,
});
} catch (e) {
this.logger.warn(
`Could not send to ${
notificationDb.data.type
} notification to ${nodesUtils.encodeNodeId(
nodeId,
)}: ${e.toString()}`,
});
await this.db.del(notificationKeyPath);
} catch (e) {
this.logger.warn(
`Could not send ${
notificationDb.data.type
} notification to ${nodesUtils.encodeNodeId(
nodeId,
)}: ${e.toString()}`,
);
if (nodesUtils.isConnectionError(e) && 0 < retries) {
const delay =
taskInfo.delay === 0
? retryIntervalTimeMin
: Math.min(taskInfo.delay * 2, retryIntervalTimeMax);
// Recursively return inner task, so that the handler can process them.
const newTask = await this.taskManager.scheduleTask(
{
handlerId: this.sendNotificationHandlerId,
path: [this.sendNotificationHandlerId, notificationIdEncoded],
parameters: [
{
nodeIdEncoded,
notificationIdEncoded,
retries: retries - 1,
retryIntervalTimeMin,
retryIntervalTimeMax,
},
],
delay: delay,
lazy: false,
},
);
if (nodesUtils.isConnectionError(e) && 0 < retries) {
const delay =
taskInfo.delay === 0
? retryIntervalTimeMin
: Math.min(taskInfo.delay * 2, retryIntervalTimeMax);
// Recursively return inner task, so that the handler can process them.
const newTask = await this.taskManager.scheduleTask(
{
handlerId: this.sendNotificationHandlerId,
path: [this.sendNotificationHandlerId, notificationIdEncoded],
parameters: [
{
nodeIdEncoded,
notificationIdEncoded,
retries: retries - 1,
retryIntervalTimeMin,
retryIntervalTimeMax,
},
],
delay: delay,
lazy: false,
},
tran,
);
return newTask;
}
await this.db.del(notificationKeyPath);
throw e;
return newTask;
}
});
await this.db.del(notificationKeyPath);
throw e;
}
};
public readonly sendNotificationHandlerId =
`${this.constructor.name}.sendNotificationHandler` as TaskHandlerId;
Expand Down
12 changes: 12 additions & 0 deletions tests/notifications/NotificationsManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ describe('NotificationsManager', () => {
);
await taskManager.startProcessing();
await Promise.all(sendProms);
await expect(
notificationsManager
.readOutboxNotifications()
.next()
.then((data) => data.done),
).resolves.toBe(true);
const receivedNotifications = await AsyncIterable.as(
receiver.notificationsManager.readInboxNotifications(),
).toArray();
Expand Down Expand Up @@ -369,6 +375,12 @@ describe('NotificationsManager', () => {
.then((value) => value.sendP),
notificationsErrors.ErrorNotificationsPermissionsNotFound,
);
await expect(
notificationsManager
.readOutboxNotifications()
.next()
.then((data) => data.done),
).resolves.toBe(true);
const receivedNotifications = await AsyncIterable.as(
receiver.notificationsManager.readInboxNotifications(),
).toArray();
Expand Down

0 comments on commit f310eb3

Please sign in to comment.