Skip to content

Commit

Permalink
feat: changed readNotifications and readOutboxNotifications to return
Browse files Browse the repository at this point in the history
AsyncGenerators
  • Loading branch information
amydevs committed Apr 18, 2024
1 parent db56c36 commit 77abc68
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 89 deletions.
4 changes: 2 additions & 2 deletions src/client/handlers/NotificationsOutboxRead.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ class NotificationsOutboxRead extends ServerHandler<
): AsyncGenerator<ClientRPCResponseResult<NotificationMessage>> {
if (ctx.signal.aborted) throw ctx.signal.reason;
const { db, notificationsManager } = this.container;
const notifications = await db.withTransactionF((tran) =>
const notifications = await db.withTransactionF(async (tran) =>
notificationsManager.readOutboxNotifications({
number: input.number,
order: input.order,
tran,
}),
);
for (const notification of notifications) {
for await (const notification of notifications) {
if (ctx.signal.aborted) throw ctx.signal.reason;
yield {
notification: notification,
Expand Down
164 changes: 88 additions & 76 deletions src/notifications/NotificationsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,20 +371,36 @@ class NotificationsManager {
};
}

protected async getOutboxNotificationIds(
tran: DBTransaction,
): Promise<Array<NotificationId>> {
const notificationIds: Array<NotificationId> = [];
protected async* getOutboxNotificationIds({
number = 'all',
order = 'newest',
tran,
}: {
number?: number | 'all';
order?: 'newest' | 'oldest';
tran?: DBTransaction;
} = {}): AsyncGenerator<NotificationId> {
if (tran == null) {
const getOutboxNotificationIds = (tran) =>
this.getOutboxNotificationIds({ number, order, tran });
return yield* this.db.withTransactionG(async function* (tran) {
return yield* getOutboxNotificationIds(tran);
});
}
const messageIterator = tran.iterator<NotificationDB>(
this.notificationsManagerOutboxDbPath,
{ valueAsBuffer: false },
{ valueAsBuffer: false, reverse: order === 'newest' },
);
let i = 0;
for await (const [keyPath] of messageIterator) {
if (number !== 'all' && i >= number) {
break;
}
const key = keyPath[0] as Buffer;
const notificationId = IdInternal.fromBuffer<NotificationId>(key);
notificationIds.push(notificationId);
yield notificationId;
i++;
}
return notificationIds;
}

protected async readOutboxNotificationById(
Expand All @@ -409,40 +425,34 @@ class NotificationsManager {
* Read pending notifications in the outbox.
*/
@ready(new notificationsErrors.ErrorNotificationsNotRunning())
public async readOutboxNotifications({
public async *readOutboxNotifications({
number = 'all',
order = 'newest',
tran,
}: {
unread?: boolean;
number?: number | 'all';
order?: 'newest' | 'oldest';
tran?: DBTransaction;
} = {}): Promise<Array<Notification>> {
} = {}): AsyncGenerator<Notification> {
if (tran == null) {
return this.db.withTransactionF((tran) =>
this.readOutboxNotifications({ number, order, tran }),
);
const readOutboxNotifications = (tran) =>
this.readOutboxNotifications({ number, order, tran });
return yield* this.db.withTransactionG(async function* (tran) {
return yield* readOutboxNotifications(tran);
});
}
let outboxIds = await this.getOutboxNotificationIds(tran);

if (order === 'newest') {
outboxIds.reverse();
}

if (number === 'all' || number > outboxIds.length) {
number = outboxIds.length;
}
outboxIds = outboxIds.slice(0, number);
const outboxIds = this.getOutboxNotificationIds({
number,
order,
tran,
});

const notifications: Array<Notification> = [];
for (const id of outboxIds) {
for await (const id of outboxIds) {
const notification = await this.readOutboxNotificationById(id, tran);
if (notification == null) never();
notifications.push(notification);
yield notification;
}

return notifications;
}

/**
Expand All @@ -455,8 +465,8 @@ class NotificationsManager {
this.clearOutboxNotifications(tran),
);
}
const notificationIds = await this.getOutboxNotificationIds(tran);
for (const id of notificationIds) {
const notificationIds = this.getOutboxNotificationIds({ tran });
for await (const id of notificationIds) {
await this.removeOutboxNotification(id, tran);
}
}
Expand Down Expand Up @@ -530,7 +540,7 @@ class NotificationsManager {
* Read a notification
*/
@ready(new notificationsErrors.ErrorNotificationsNotRunning())
public async readNotifications({
public async *readNotifications({
unread = false,
number = 'all',
order = 'newest',
Expand All @@ -540,36 +550,25 @@ class NotificationsManager {
number?: number | 'all';
order?: 'newest' | 'oldest';
tran?: DBTransaction;
} = {}): Promise<Array<Notification>> {
} = {}): AsyncGenerator<Notification> {
if (tran == null) {
return this.db.withTransactionF((tran) =>
this.readNotifications({ unread, number, order, tran }),
);
}
let notificationIds: Array<NotificationId>;
if (unread) {
notificationIds = await this.getNotificationIds('unread', tran);
} else {
notificationIds = await this.getNotificationIds('all', tran);
}

if (order === 'newest') {
notificationIds.reverse();
}

if (number === 'all' || number > notificationIds.length) {
number = notificationIds.length;
const readNotifications = (tran) =>
this.readNotifications({ number, order, tran });
return yield* this.db.withTransactionG(async function* (tran) {
return yield* readNotifications(tran);
});
}
notificationIds = notificationIds.slice(0, number);

const notifications: Array<Notification> = [];
for (const id of notificationIds) {
const notificationIds = this.getNotificationIds({
unread,
number,
order,
tran,
});
for await (const id of notificationIds) {
const notification = await this.readNotificationById(id, tran);
if (notification == null) never();
notifications.push(notification);
yield notification;
}

return notifications;
}

/**
Expand Down Expand Up @@ -605,8 +604,8 @@ class NotificationsManager {
if (tran == null) {
return this.db.withTransactionF((tran) => this.clearNotifications(tran));
}
const notificationIds = await this.getNotificationIds('all', tran);
for (const id of notificationIds) {
const notificationIds = await this.getNotificationIds({ tran });
for await (const id of notificationIds) {
await this.removeNotification(id, tran);
}
}
Expand All @@ -628,33 +627,47 @@ class NotificationsManager {
notificationDb,
);
return {
...notificationDb,
notificationIdEncoded:
notificationsUtils.encodeNotificationId(notificationId),
...notificationDb,
};
}

protected async getNotificationIds(
type: 'unread' | 'all',
tran: DBTransaction,
): Promise<Array<NotificationId>> {
const notificationIds: Array<NotificationId> = [];
protected async* getNotificationIds({
unread = false,
number = 'all',
order = 'newest',
tran,
}: {
unread?: boolean;
number?: number | 'all';
order?: 'newest' | 'oldest';
tran?: DBTransaction;
} = {}): AsyncGenerator<NotificationId> {
if (tran == null) {
const getNotificationIds = (tran) =>
this.getNotificationIds({ number, order, tran });
return yield* this.db.withTransactionG(async function* (tran) {
return yield* getNotificationIds(tran);
});
}
const messageIterator = tran.iterator<NotificationDB>(
this.notificationsManagerInboxDbPath,
{ valueAsBuffer: false },
{ valueAsBuffer: false, reverse: order === 'newest' },
);
let i = 0;
for await (const [keyPath, notification] of messageIterator) {
if (number !== 'all' && i >= number) {
break;
}
if (notification.isRead && unread) {
continue;
}
const key = keyPath[0] as Buffer;
const notificationId = IdInternal.fromBuffer<NotificationId>(key);
if (type === 'all') {
notificationIds.push(notificationId);
} else if (type === 'unread') {
if (!notification.isRead) {
notificationIds.push(notificationId);
}
}
yield notificationId;
i++;
}
return notificationIds;
}

protected async getNotifications(
Expand Down Expand Up @@ -689,11 +702,10 @@ class NotificationsManager {
protected async getOldestNotificationId(
tran: DBTransaction,
): Promise<NotificationId | undefined> {
const notificationIds = await this.getNotificationIds('all', tran);
if (notificationIds.length === 0) {
return undefined;
for await (const notification of this.getNotificationIds({ order: 'oldest', tran })) {
return notification;
}
return notificationIds[0];
return undefined;
}

protected async removeNotification(
Expand Down
20 changes: 9 additions & 11 deletions tests/notifications/NotificationsManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import path from 'path';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { DB } from '@matrixai/db';
import { IdInternal } from '@matrixai/id';
import { AsyncIterableX as AsyncIterable } from 'ix/asynciterable';
import TaskManager from '@/tasks/TaskManager';
import PolykeyAgent from '@/PolykeyAgent';
import ACL from '@/acl/ACL';
Expand All @@ -28,6 +29,7 @@ import * as keysUtils from '@/keys/utils';
import * as utils from '@/utils';
import * as testUtils from '../utils';
import * as tlsTestsUtils from '../utils/tls';
import 'ix/add/asynciterable-operators/toarray';

describe('NotificationsManager', () => {
const password = 'password';
Expand Down Expand Up @@ -262,8 +264,7 @@ describe('NotificationsManager', () => {
),
])
.then((value) => value.map((value) => value.sendP));
const outboxNotifications =
await notificationsManager.readOutboxNotifications();
const outboxNotifications = await AsyncIterable.as(notificationsManager.readOutboxNotifications()).toArray();
expect(outboxNotifications).toEqual(
expect.arrayContaining([
expect.objectContaining({
Expand All @@ -282,8 +283,7 @@ describe('NotificationsManager', () => {
);
await taskManager.startProcessing();
await Promise.all(sendProms);
const receivedNotifications =
await receiver.notificationsManager.readNotifications();
const receivedNotifications = await AsyncIterable.as(receiver.notificationsManager.readNotifications()).toArray();
expect(receivedNotifications).toHaveLength(3);
expect(receivedNotifications).toEqual(
expect.arrayContaining([
Expand Down Expand Up @@ -1049,12 +1049,12 @@ describe('NotificationsManager', () => {
});
await notificationsManager.receiveNotification(notification1);
await notificationsManager.receiveNotification(notification2);
await notificationsManager.readNotifications({ number: 1 });
await notificationsManager.readNotifications({ number: 1 }).next();
await notificationsManager.stop();
await notificationsManager.start();
const unreadNotifications = await notificationsManager.readNotifications({
const unreadNotifications = await AsyncIterable.as(notificationsManager.readNotifications({
unread: true,
});
})).toArray();
expect(unreadNotifications).toHaveLength(1);
expect(unreadNotifications[0].data).toEqual(notification1.data);
expect(unreadNotifications[0].iss).toBe(notification1.iss);
Expand Down Expand Up @@ -1116,11 +1116,9 @@ describe('NotificationsManager', () => {
});
await notificationsManager.stop();
await notificationsManager.start({ fresh: true });
const receivedNotifications =
await notificationsManager.readNotifications();
const receivedNotifications = await AsyncIterable.as(notificationsManager.readNotifications()).toArray();
expect(receivedNotifications).toHaveLength(0);
const outboxNotifications =
await notificationsManager.readOutboxNotifications();
const outboxNotifications = await AsyncIterable.as(notificationsManager.readOutboxNotifications()).toArray();
expect(outboxNotifications).toHaveLength(0);
// Reverse side-effects
await receiver.notificationsManager.clearNotifications();
Expand Down

0 comments on commit 77abc68

Please sign in to comment.