From 77abc68a2fcbf239652e17b2ef48f5df1bef941e Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Thu, 18 Apr 2024 16:29:55 +1000 Subject: [PATCH] feat: changed readNotifications and readOutboxNotifications to return AsyncGenerators --- .../handlers/NotificationsOutboxRead.ts | 4 +- src/notifications/NotificationsManager.ts | 164 ++++++++++-------- .../NotificationsManager.test.ts | 20 +-- 3 files changed, 99 insertions(+), 89 deletions(-) diff --git a/src/client/handlers/NotificationsOutboxRead.ts b/src/client/handlers/NotificationsOutboxRead.ts index 072e2e863e..34329fa9ab 100644 --- a/src/client/handlers/NotificationsOutboxRead.ts +++ b/src/client/handlers/NotificationsOutboxRead.ts @@ -24,14 +24,14 @@ class NotificationsOutboxRead extends ServerHandler< ): AsyncGenerator> { if (ctx.signal.aborted) throw ctx.signal.reason; const { db, notificationsManager } = this.container; - const notifications = await db.withTransactionF((tran) => + const notifications = await db.withTransactionF(async (tran) => notificationsManager.readOutboxNotifications({ number: input.number, order: input.order, tran, }), ); - for (const notification of notifications) { + for await (const notification of notifications) { if (ctx.signal.aborted) throw ctx.signal.reason; yield { notification: notification, diff --git a/src/notifications/NotificationsManager.ts b/src/notifications/NotificationsManager.ts index d07a6945b2..275c40a9e8 100644 --- a/src/notifications/NotificationsManager.ts +++ b/src/notifications/NotificationsManager.ts @@ -371,20 +371,36 @@ class NotificationsManager { }; } - protected async getOutboxNotificationIds( - tran: DBTransaction, - ): Promise> { - const notificationIds: Array = []; + protected async* getOutboxNotificationIds({ + number = 'all', + order = 'newest', + tran, + }: { + number?: number | 'all'; + order?: 'newest' | 'oldest'; + tran?: DBTransaction; + } = {}): AsyncGenerator { + if (tran == null) { + const getOutboxNotificationIds = (tran) => + this.getOutboxNotificationIds({ number, order, tran }); + return yield* this.db.withTransactionG(async function* (tran) { + return yield* getOutboxNotificationIds(tran); + }); + } const messageIterator = tran.iterator( this.notificationsManagerOutboxDbPath, - { valueAsBuffer: false }, + { valueAsBuffer: false, reverse: order === 'newest' }, ); + let i = 0; for await (const [keyPath] of messageIterator) { + if (number !== 'all' && i >= number) { + break; + } const key = keyPath[0] as Buffer; const notificationId = IdInternal.fromBuffer(key); - notificationIds.push(notificationId); + yield notificationId; + i++; } - return notificationIds; } protected async readOutboxNotificationById( @@ -409,40 +425,34 @@ class NotificationsManager { * Read pending notifications in the outbox. */ @ready(new notificationsErrors.ErrorNotificationsNotRunning()) - public async readOutboxNotifications({ + public async *readOutboxNotifications({ number = 'all', order = 'newest', tran, }: { - unread?: boolean; number?: number | 'all'; order?: 'newest' | 'oldest'; tran?: DBTransaction; - } = {}): Promise> { + } = {}): AsyncGenerator { if (tran == null) { - return this.db.withTransactionF((tran) => - this.readOutboxNotifications({ number, order, tran }), - ); + const readOutboxNotifications = (tran) => + this.readOutboxNotifications({ number, order, tran }); + return yield* this.db.withTransactionG(async function* (tran) { + return yield* readOutboxNotifications(tran); + }); } - let outboxIds = await this.getOutboxNotificationIds(tran); - if (order === 'newest') { - outboxIds.reverse(); - } - - if (number === 'all' || number > outboxIds.length) { - number = outboxIds.length; - } - outboxIds = outboxIds.slice(0, number); + const outboxIds = this.getOutboxNotificationIds({ + number, + order, + tran, + }); - const notifications: Array = []; - for (const id of outboxIds) { + for await (const id of outboxIds) { const notification = await this.readOutboxNotificationById(id, tran); if (notification == null) never(); - notifications.push(notification); + yield notification; } - - return notifications; } /** @@ -455,8 +465,8 @@ class NotificationsManager { this.clearOutboxNotifications(tran), ); } - const notificationIds = await this.getOutboxNotificationIds(tran); - for (const id of notificationIds) { + const notificationIds = this.getOutboxNotificationIds({ tran }); + for await (const id of notificationIds) { await this.removeOutboxNotification(id, tran); } } @@ -530,7 +540,7 @@ class NotificationsManager { * Read a notification */ @ready(new notificationsErrors.ErrorNotificationsNotRunning()) - public async readNotifications({ + public async *readNotifications({ unread = false, number = 'all', order = 'newest', @@ -540,36 +550,25 @@ class NotificationsManager { number?: number | 'all'; order?: 'newest' | 'oldest'; tran?: DBTransaction; - } = {}): Promise> { + } = {}): AsyncGenerator { if (tran == null) { - return this.db.withTransactionF((tran) => - this.readNotifications({ unread, number, order, tran }), - ); - } - let notificationIds: Array; - if (unread) { - notificationIds = await this.getNotificationIds('unread', tran); - } else { - notificationIds = await this.getNotificationIds('all', tran); - } - - if (order === 'newest') { - notificationIds.reverse(); - } - - if (number === 'all' || number > notificationIds.length) { - number = notificationIds.length; + const readNotifications = (tran) => + this.readNotifications({ number, order, tran }); + return yield* this.db.withTransactionG(async function* (tran) { + return yield* readNotifications(tran); + }); } - notificationIds = notificationIds.slice(0, number); - - const notifications: Array = []; - for (const id of notificationIds) { + const notificationIds = this.getNotificationIds({ + unread, + number, + order, + tran, + }); + for await (const id of notificationIds) { const notification = await this.readNotificationById(id, tran); if (notification == null) never(); - notifications.push(notification); + yield notification; } - - return notifications; } /** @@ -605,8 +604,8 @@ class NotificationsManager { if (tran == null) { return this.db.withTransactionF((tran) => this.clearNotifications(tran)); } - const notificationIds = await this.getNotificationIds('all', tran); - for (const id of notificationIds) { + const notificationIds = await this.getNotificationIds({ tran }); + for await (const id of notificationIds) { await this.removeNotification(id, tran); } } @@ -628,33 +627,47 @@ class NotificationsManager { notificationDb, ); return { + ...notificationDb, notificationIdEncoded: notificationsUtils.encodeNotificationId(notificationId), - ...notificationDb, }; } - protected async getNotificationIds( - type: 'unread' | 'all', - tran: DBTransaction, - ): Promise> { - const notificationIds: Array = []; + protected async* getNotificationIds({ + unread = false, + number = 'all', + order = 'newest', + tran, + }: { + unread?: boolean; + number?: number | 'all'; + order?: 'newest' | 'oldest'; + tran?: DBTransaction; + } = {}): AsyncGenerator { + if (tran == null) { + const getNotificationIds = (tran) => + this.getNotificationIds({ number, order, tran }); + return yield* this.db.withTransactionG(async function* (tran) { + return yield* getNotificationIds(tran); + }); + } const messageIterator = tran.iterator( this.notificationsManagerInboxDbPath, - { valueAsBuffer: false }, + { valueAsBuffer: false, reverse: order === 'newest' }, ); + let i = 0; for await (const [keyPath, notification] of messageIterator) { + if (number !== 'all' && i >= number) { + break; + } + if (notification.isRead && unread) { + continue; + } const key = keyPath[0] as Buffer; const notificationId = IdInternal.fromBuffer(key); - if (type === 'all') { - notificationIds.push(notificationId); - } else if (type === 'unread') { - if (!notification.isRead) { - notificationIds.push(notificationId); - } - } + yield notificationId; + i++; } - return notificationIds; } protected async getNotifications( @@ -689,11 +702,10 @@ class NotificationsManager { protected async getOldestNotificationId( tran: DBTransaction, ): Promise { - const notificationIds = await this.getNotificationIds('all', tran); - if (notificationIds.length === 0) { - return undefined; + for await (const notification of this.getNotificationIds({ order: 'oldest', tran })) { + return notification; } - return notificationIds[0]; + return undefined; } protected async removeNotification( diff --git a/tests/notifications/NotificationsManager.test.ts b/tests/notifications/NotificationsManager.test.ts index 20ebc4c55b..7a8293b4c2 100644 --- a/tests/notifications/NotificationsManager.test.ts +++ b/tests/notifications/NotificationsManager.test.ts @@ -11,6 +11,7 @@ import path from 'path'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import { DB } from '@matrixai/db'; import { IdInternal } from '@matrixai/id'; +import { AsyncIterableX as AsyncIterable } from 'ix/asynciterable'; import TaskManager from '@/tasks/TaskManager'; import PolykeyAgent from '@/PolykeyAgent'; import ACL from '@/acl/ACL'; @@ -28,6 +29,7 @@ import * as keysUtils from '@/keys/utils'; import * as utils from '@/utils'; import * as testUtils from '../utils'; import * as tlsTestsUtils from '../utils/tls'; +import 'ix/add/asynciterable-operators/toarray'; describe('NotificationsManager', () => { const password = 'password'; @@ -262,8 +264,7 @@ describe('NotificationsManager', () => { ), ]) .then((value) => value.map((value) => value.sendP)); - const outboxNotifications = - await notificationsManager.readOutboxNotifications(); + const outboxNotifications = await AsyncIterable.as(notificationsManager.readOutboxNotifications()).toArray(); expect(outboxNotifications).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -282,8 +283,7 @@ describe('NotificationsManager', () => { ); await taskManager.startProcessing(); await Promise.all(sendProms); - const receivedNotifications = - await receiver.notificationsManager.readNotifications(); + const receivedNotifications = await AsyncIterable.as(receiver.notificationsManager.readNotifications()).toArray(); expect(receivedNotifications).toHaveLength(3); expect(receivedNotifications).toEqual( expect.arrayContaining([ @@ -1049,12 +1049,12 @@ describe('NotificationsManager', () => { }); await notificationsManager.receiveNotification(notification1); await notificationsManager.receiveNotification(notification2); - await notificationsManager.readNotifications({ number: 1 }); + await notificationsManager.readNotifications({ number: 1 }).next(); await notificationsManager.stop(); await notificationsManager.start(); - const unreadNotifications = await notificationsManager.readNotifications({ + const unreadNotifications = await AsyncIterable.as(notificationsManager.readNotifications({ unread: true, - }); + })).toArray(); expect(unreadNotifications).toHaveLength(1); expect(unreadNotifications[0].data).toEqual(notification1.data); expect(unreadNotifications[0].iss).toBe(notification1.iss); @@ -1116,11 +1116,9 @@ describe('NotificationsManager', () => { }); await notificationsManager.stop(); await notificationsManager.start({ fresh: true }); - const receivedNotifications = - await notificationsManager.readNotifications(); + const receivedNotifications = await AsyncIterable.as(notificationsManager.readNotifications()).toArray(); expect(receivedNotifications).toHaveLength(0); - const outboxNotifications = - await notificationsManager.readOutboxNotifications(); + const outboxNotifications = await AsyncIterable.as(notificationsManager.readOutboxNotifications()).toArray(); expect(outboxNotifications).toHaveLength(0); // Reverse side-effects await receiver.notificationsManager.clearNotifications();