From 82553acfca0803165726e1b4fee16a12f2041be1 Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Mon, 29 Apr 2024 12:47:17 +1000 Subject: [PATCH] feat: added `seek` and `seekEnd` to `NotificationsManager` --- src/client/handlers/NotificationsInboxRead.ts | 26 ++++- .../handlers/NotificationsOutboxRead.ts | 38 ++++---- src/client/types.ts | 4 +- src/ids/index.ts | 39 ++++++-- src/notifications/NotificationsManager.ts | 73 +++++++++++++- src/notifications/utils.ts | 42 ++++++++- .../NotificationsManager.test.ts | 94 +++++++++++++++++++ 7 files changed, 277 insertions(+), 39 deletions(-) diff --git a/src/client/handlers/NotificationsInboxRead.ts b/src/client/handlers/NotificationsInboxRead.ts index 37c879b972..afd145e225 100644 --- a/src/client/handlers/NotificationsInboxRead.ts +++ b/src/client/handlers/NotificationsInboxRead.ts @@ -5,8 +5,10 @@ import type { NotificationInboxMessage, NotificationReadMessage, } from '../types'; +import type { NotificationId } from '../../ids/types'; import type NotificationsManager from '../../notifications/NotificationsManager'; import { ServerHandler } from '@matrixai/rpc'; +import * as notificationsUtils from '../../notifications/utils'; class NotificationsInboxRead extends ServerHandler< { @@ -24,11 +26,29 @@ class NotificationsInboxRead extends ServerHandler< ): AsyncGenerator> { if (ctx.signal.aborted) throw ctx.signal.reason; const { db, notificationsManager } = this.container; + const { seek, seekEnd, unread, order, limit } = input; + + let seek_: NotificationId | number | undefined; + if (seek != null) { + seek_ = + typeof seek === 'string' + ? notificationsUtils.decodeNotificationId(seek) + : seek; + } + let seekEnd_: NotificationId | number | undefined; + if (seekEnd != null) { + seekEnd_ = + typeof seekEnd === 'string' + ? notificationsUtils.decodeNotificationId(seekEnd) + : seekEnd; + } return db.withTransactionG(async function* (tran) { const notifications = notificationsManager.readInboxNotifications({ - unread: input.unread, - order: input.order, - limit: input.limit, + seek: seek_, + seekEnd: seekEnd_, + unread, + order, + limit, tran, }); for await (const notification of notifications) { diff --git a/src/client/handlers/NotificationsOutboxRead.ts b/src/client/handlers/NotificationsOutboxRead.ts index 52324a918b..56206211c4 100644 --- a/src/client/handlers/NotificationsOutboxRead.ts +++ b/src/client/handlers/NotificationsOutboxRead.ts @@ -5,6 +5,7 @@ import type { NotificationOutboxMessage, NotificationOutboxReadMessage, } from '../types'; +import type { NotificationId } from '../../ids/types'; import type NotificationsManager from '../../notifications/NotificationsManager'; import { ServerHandler } from '@matrixai/rpc'; import * as notificationsUtils from '../../notifications/utils'; @@ -25,31 +26,34 @@ class NotificationsOutboxRead extends ServerHandler< ): AsyncGenerator> { if (ctx.signal.aborted) throw ctx.signal.reason; const { db, notificationsManager } = this.container; + const { seek, seekEnd, order, limit } = input; + + let seek_: NotificationId | number | undefined; + if (seek != null) { + seek_ = + typeof seek === 'string' + ? notificationsUtils.decodeNotificationId(seek) + : seek; + } + let seekEnd_: NotificationId | number | undefined; + if (seekEnd != null) { + seekEnd_ = + typeof seekEnd === 'string' + ? notificationsUtils.decodeNotificationId(seekEnd) + : seekEnd; + } return db.withTransactionG(async function* (tran) { const notifications = notificationsManager.readOutboxNotifications({ - order: input.order, - limit: input.limit, + seek: seek_, + seekEnd: seekEnd_, + order, + limit, tran, }); for await (const notification of notifications) { if (ctx.signal.aborted) throw ctx.signal.reason; - const taskInfo = - await notificationsManager.getOutboxNotificationTaskInfoById( - notificationsUtils.decodeNotificationId( - notification.notificationIdEncoded, - )!, - tran, - ); yield { notification: notification, - taskMetadata: - taskInfo != null - ? { - remainingRetries: taskInfo.parameters[0].retries, - created: taskInfo.created.getTime(), - scheduled: taskInfo.scheduled.getTime(), - } - : undefined, }; } }); diff --git a/src/client/types.ts b/src/client/types.ts index 2ad9882930..6cf7594f06 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -207,12 +207,14 @@ type SuccessMessage = { // Notifications messages type NotificationReadMessage = { + seek?: NotificationIdEncoded | number; + seekEnd?: NotificationIdEncoded | number; unread?: boolean; limit?: number; order?: 'asc' | 'desc'; }; -type NotificationOutboxReadMessage = Omit +type NotificationOutboxReadMessage = Omit; type NotificationInboxMessage = { notification: Notification; diff --git a/src/ids/index.ts b/src/ids/index.ts index 1238a6aa96..3dacdf992f 100644 --- a/src/ids/index.ts +++ b/src/ids/index.ts @@ -27,6 +27,23 @@ import { IdInternal, IdSortable, IdRandom } from '@matrixai/id'; import * as keysUtilsRandom from '../keys/utils/random'; import * as validationErrors from '../validation/errors'; +/** + * Generates an auditId from an epoch timestamp. + * + * @param epoch + * @param randomSource + */ +function generateSortableIdFromTimestamp( + epoch: number, + randomSource: (size: number) => Uint8Array = keysUtilsRandom.getRandomBytes, +): T { + const generator = new IdSortable({ + timeSource: () => () => epoch, + randomSource, + }); + return generator.get(); +} + function createPermIdGenerator(): () => PermissionId { const generator = new IdRandom({ randomSource: keysUtilsRandom.getRandomBytes, @@ -78,16 +95,8 @@ function decodeAuditEventId( * @param epoch * @param randomSource */ -function generateAuditEventIdFromTimestamp( - epoch: number, - randomSource: (size: number) => Uint8Array = keysUtilsRandom.getRandomBytes, -): AuditEventId { - const generator = new IdSortable({ - timeSource: () => () => epoch, - randomSource, - }); - return generator.get(); -} +const generateAuditEventIdFromTimestamp = + generateSortableIdFromTimestamp; /** * Creates a NodeId generator. @@ -518,6 +527,15 @@ function decodeNotificationId( return notificationId; } +/** + * Generates a NotificationId from an epoch timestamp. + * + * @param epoch + * @param randomSource + */ +const generateNotificationIdFromTimestamp = + generateSortableIdFromTimestamp; + export { createPermIdGenerator, createAuditEventIdGenerator, @@ -563,6 +581,7 @@ export { createNotificationIdGenerator, encodeNotificationId, decodeNotificationId, + generateNotificationIdFromTimestamp, }; export * from './types'; diff --git a/src/notifications/NotificationsManager.ts b/src/notifications/NotificationsManager.ts index d0f998d764..0335e23233 100644 --- a/src/notifications/NotificationsManager.ts +++ b/src/notifications/NotificationsManager.ts @@ -397,17 +397,40 @@ class NotificationsManager { } protected async *getOutboxNotificationIds({ + seek, + seekEnd, order = 'asc', limit, tran, }: { + seek?: NotificationId | number | Date; + seekEnd?: NotificationId | number | Date; order?: 'asc' | 'desc'; limit?: number; tran: DBTransaction; }): AsyncGenerator { + const seekId = + seek != null + ? notificationsUtils.extractFromSeek( + seek, + (size) => new Uint8Array(size), + ).notificationId + : undefined; + const seekEndId = + seekEnd != null + ? notificationsUtils.extractFromSeek(seekEnd, (size) => + new Uint8Array(size).fill(0xff), + ).notificationId + : undefined; + const messageIterator = tran.iterator( this.notificationsManagerOutboxDbPath, - { valueAsBuffer: false, reverse: order !== 'asc' }, + { + valueAsBuffer: false, + reverse: order !== 'asc', + gte: seekId?.toBuffer(), + lte: seekEndId?.toBuffer(), + }, ); let i = 0; for await (const [keyPath] of messageIterator) { @@ -444,23 +467,29 @@ class NotificationsManager { */ @ready(new notificationsErrors.ErrorNotificationsNotRunning()) public async *readOutboxNotifications({ + seek, + seekEnd, limit, order = 'asc', tran, }: { + seek?: NotificationId | number | Date; + seekEnd?: NotificationId | number | Date; order?: 'asc' | 'desc'; limit?: number; tran?: DBTransaction; } = {}): AsyncGenerator { if (tran == null) { const readOutboxNotifications = (tran) => - this.readOutboxNotifications({ limit, order, tran }); + this.readOutboxNotifications({ seek, seekEnd, limit, order, tran }); return yield* this.db.withTransactionG(async function* (tran) { return yield* readOutboxNotifications(tran); }); } const notificationIds = this.getOutboxNotificationIds({ + seek, + seekEnd, limit, order, tran, @@ -618,11 +647,15 @@ class NotificationsManager { */ @ready(new notificationsErrors.ErrorNotificationsNotRunning()) public async *readInboxNotifications({ + seek, + seekEnd, unread = false, order = 'asc', limit, tran, }: { + seek?: NotificationId | number | Date; + seekEnd?: NotificationId | number | Date; unread?: boolean; order?: 'asc' | 'desc'; limit?: number; @@ -630,12 +663,21 @@ class NotificationsManager { } = {}): AsyncGenerator { if (tran == null) { const readNotifications = (tran) => - this.readInboxNotifications({ unread, limit, order, tran }); + this.readInboxNotifications({ + seek, + seekEnd, + unread, + limit, + order, + tran, + }); return yield* this.db.withTransactionG(async function* (tran) { return yield* readNotifications(tran); }); } const notificationIds = this.getInboxNotificationIds({ + seek, + seekEnd, unread, limit, order, @@ -713,19 +755,42 @@ class NotificationsManager { } protected async *getInboxNotificationIds({ + seek, + seekEnd, unread = false, order = 'asc', limit, tran, }: { + seek?: NotificationId | number | Date; + seekEnd?: NotificationId | number | Date; unread?: boolean; order?: 'asc' | 'desc'; limit?: number; tran: DBTransaction; }): AsyncGenerator { + const seekId = + seek != null + ? notificationsUtils.extractFromSeek( + seek, + (size) => new Uint8Array(size), + ).notificationId + : undefined; + const seekEndId = + seekEnd != null + ? notificationsUtils.extractFromSeek(seekEnd, (size) => + new Uint8Array(size).fill(0xff), + ).notificationId + : undefined; + const messageIterator = tran.iterator( this.notificationsManagerInboxDbPath, - { valueAsBuffer: false, reverse: order !== 'asc' }, + { + valueAsBuffer: false, + reverse: order !== 'asc', + gte: seekId?.toBuffer(), + lte: seekEndId?.toBuffer(), + }, ); let i = 0; for await (const [keyPath, notificationDb] of messageIterator) { diff --git a/src/notifications/utils.ts b/src/notifications/utils.ts index b61a9ffaa3..cab46f9569 100644 --- a/src/notifications/utils.ts +++ b/src/notifications/utils.ts @@ -5,18 +5,20 @@ import type { Notification, SignedNotification, } from './types'; -import type { NodeId, VaultId } from '../ids/types'; +import type { NodeId, VaultId, NotificationId } from '../ids/types'; import type { KeyPairLocked } from '../keys/types'; +import { IdInternal } from '@matrixai/id'; +import * as sortableIdUtils from '@matrixai/id/dist/IdSortable'; import * as notificationsErrors from './errors'; import { createNotificationIdGenerator } from '../ids'; +import { vaultActions } from '../vaults/types'; +import { never } from '../utils'; +import Token from '../tokens/Token'; import * as nodesUtils from '../nodes/utils'; import * as keysUtils from '../keys/utils'; -import Token from '../tokens/Token'; import * as validationErrors from '../validation/errors'; import * as utils from '../utils'; import * as ids from '../ids'; -import { vaultActions } from '../vaults/types'; -import { never } from '../utils'; function constructGestaltInviteMessage(nodeId: NodeId): string { return `Keynode with ID ${nodeId} has invited this Keynode to join their Gestalt. Accept this invitation by typing the command: xxx`; @@ -206,6 +208,37 @@ function assertVaultShare( } } +function extractFromSeek( + seek: NotificationId | number | Date, + randomSource?: (size: number) => Uint8Array, +): { + notificationId: NotificationId; + timestamp: number; +} { + let notificationId: NotificationId; + let timestamp: number | undefined; + if (seek instanceof IdInternal) { + notificationId = seek; + timestamp = sortableIdUtils.extractTs(seek.toBuffer()) * 1000; + } else if (typeof seek === 'number') { + timestamp = seek; + notificationId = ids.generateNotificationIdFromTimestamp( + seek, + randomSource, + ); + } else { + timestamp = seek.getTime(); + notificationId = ids.generateNotificationIdFromTimestamp( + timestamp, + randomSource, + ); + } + return { + notificationId: notificationId, + timestamp, + }; +} + export { createNotificationIdGenerator, generateNotification, @@ -217,6 +250,7 @@ export { assertGeneral, assertGestaltInvite, assertVaultShare, + extractFromSeek, }; export { encodeNotificationId, decodeNotificationId } from '../ids'; diff --git a/tests/notifications/NotificationsManager.test.ts b/tests/notifications/NotificationsManager.test.ts index 87263f8452..a095adf2f4 100644 --- a/tests/notifications/NotificationsManager.test.ts +++ b/tests/notifications/NotificationsManager.test.ts @@ -882,6 +882,100 @@ describe('NotificationsManager', () => { await acl.unsetNodePerm(senderId); await notificationsManager.stop(); }); + test('can read notifications with seek', async () => { + const generateNotificationId = + notificationsUtils.createNotificationIdGenerator(); + const notificationsManager = + await NotificationsManager.createNotificationsManager({ + acl, + db, + nodeManager, + taskManager, + keyRing, + logger, + }); + const notification1: Notification = { + notificationIdEncoded: notificationsUtils.encodeNotificationId( + generateNotificationId(), + ), + typ: 'notification', + data: { + type: 'General', + message: 'msg1', + }, + iss: senderIdEncoded, + sub: targetIdEncoded, + isRead: false, + }; + const notification2: Notification = { + notificationIdEncoded: notificationsUtils.encodeNotificationId( + generateNotificationId(), + ), + typ: 'notification', + data: { + type: 'General', + message: 'msg2', + }, + iss: senderIdEncoded, + sub: targetIdEncoded, + isRead: false, + }; + const notification3: Notification = { + notificationIdEncoded: notificationsUtils.encodeNotificationId( + generateNotificationId(), + ), + typ: 'notification', + data: { + type: 'General', + message: 'msg3', + }, + iss: senderIdEncoded, + sub: targetIdEncoded, + isRead: false, + }; + await acl.setNodePerm(senderId, { + gestalt: { + notify: null, + }, + vaults: {}, + }); + await notificationsManager.receiveNotification(notification1); + const beforeNotification2Timestamp = Date.now(); + await notificationsManager.receiveNotification(notification2); + const afterNotification2Timestamp = Date.now(); + await notificationsManager.receiveNotification(notification3); + let notifications = await AsyncIterable.as( + notificationsManager.readInboxNotifications({ + order: 'asc', + seek: beforeNotification2Timestamp, + }), + ).toArray(); + expect(notifications).toHaveLength(2); + expect(notifications[0].data['message']).toBe('msg2'); + expect(notifications[1].data['message']).toBe('msg3'); + notifications = await AsyncIterable.as( + notificationsManager.readInboxNotifications({ + order: 'asc', + seekEnd: afterNotification2Timestamp, + }), + ).toArray(); + expect(notifications).toHaveLength(2); + expect(notifications[0].data['message']).toBe('msg1'); + expect(notifications[1].data['message']).toBe('msg2'); + notifications = await AsyncIterable.as( + notificationsManager.readInboxNotifications({ + order: 'asc', + seek: beforeNotification2Timestamp, + seekEnd: afterNotification2Timestamp, + }), + ).toArray(); + expect(notifications).toHaveLength(1); + expect(notifications[0].data['message']).toBe('msg2'); + // Reverse side-effects + await notificationsManager.clearInboxNotifications(); + await acl.unsetNodePerm(senderId); + await notificationsManager.stop(); + }); test('notifications can be capped and oldest notifications deleted', async () => { const generateNotificationId = notificationsUtils.createNotificationIdGenerator();