Skip to content

Commit

Permalink
feat: added seek and seekEnd to NotificationsManager
Browse files Browse the repository at this point in the history
  • Loading branch information
amydevs committed Apr 29, 2024
1 parent b236805 commit 82553ac
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 39 deletions.
26 changes: 23 additions & 3 deletions src/client/handlers/NotificationsInboxRead.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import type {
NotificationInboxMessage,
NotificationReadMessage,
} from '../types';
import type { NotificationId } from '../../ids/types';
import type NotificationsManager from '../../notifications/NotificationsManager';
import { ServerHandler } from '@matrixai/rpc';
import * as notificationsUtils from '../../notifications/utils';

class NotificationsInboxRead extends ServerHandler<
{
Expand All @@ -24,11 +26,29 @@ class NotificationsInboxRead extends ServerHandler<
): AsyncGenerator<ClientRPCResponseResult<NotificationInboxMessage>> {
if (ctx.signal.aborted) throw ctx.signal.reason;
const { db, notificationsManager } = this.container;
const { seek, seekEnd, unread, order, limit } = input;

let seek_: NotificationId | number | undefined;
if (seek != null) {
seek_ =
typeof seek === 'string'
? notificationsUtils.decodeNotificationId(seek)
: seek;
}
let seekEnd_: NotificationId | number | undefined;
if (seekEnd != null) {
seekEnd_ =
typeof seekEnd === 'string'
? notificationsUtils.decodeNotificationId(seekEnd)
: seekEnd;
}
return db.withTransactionG(async function* (tran) {
const notifications = notificationsManager.readInboxNotifications({
unread: input.unread,
order: input.order,
limit: input.limit,
seek: seek_,
seekEnd: seekEnd_,
unread,
order,
limit,
tran,
});
for await (const notification of notifications) {
Expand Down
38 changes: 21 additions & 17 deletions src/client/handlers/NotificationsOutboxRead.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
NotificationOutboxMessage,
NotificationOutboxReadMessage,
} from '../types';
import type { NotificationId } from '../../ids/types';
import type NotificationsManager from '../../notifications/NotificationsManager';
import { ServerHandler } from '@matrixai/rpc';
import * as notificationsUtils from '../../notifications/utils';
Expand All @@ -25,31 +26,34 @@ class NotificationsOutboxRead extends ServerHandler<
): AsyncGenerator<ClientRPCResponseResult<NotificationOutboxMessage>> {
if (ctx.signal.aborted) throw ctx.signal.reason;
const { db, notificationsManager } = this.container;
const { seek, seekEnd, order, limit } = input;

let seek_: NotificationId | number | undefined;
if (seek != null) {
seek_ =
typeof seek === 'string'
? notificationsUtils.decodeNotificationId(seek)
: seek;
}
let seekEnd_: NotificationId | number | undefined;
if (seekEnd != null) {
seekEnd_ =
typeof seekEnd === 'string'
? notificationsUtils.decodeNotificationId(seekEnd)
: seekEnd;
}
return db.withTransactionG(async function* (tran) {
const notifications = notificationsManager.readOutboxNotifications({
order: input.order,
limit: input.limit,
seek: seek_,
seekEnd: seekEnd_,
order,
limit,
tran,
});
for await (const notification of notifications) {
if (ctx.signal.aborted) throw ctx.signal.reason;
const taskInfo =
await notificationsManager.getOutboxNotificationTaskInfoById(
notificationsUtils.decodeNotificationId(
notification.notificationIdEncoded,
)!,
tran,
);
yield {
notification: notification,
taskMetadata:
taskInfo != null
? {
remainingRetries: taskInfo.parameters[0].retries,
created: taskInfo.created.getTime(),
scheduled: taskInfo.scheduled.getTime(),
}
: undefined,
};
}
});
Expand Down
4 changes: 3 additions & 1 deletion src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,14 @@ type SuccessMessage = {
// Notifications messages

type NotificationReadMessage = {
seek?: NotificationIdEncoded | number;
seekEnd?: NotificationIdEncoded | number;
unread?: boolean;
limit?: number;
order?: 'asc' | 'desc';
};

type NotificationOutboxReadMessage = Omit<NotificationReadMessage, 'unread'>
type NotificationOutboxReadMessage = Omit<NotificationReadMessage, 'unread'>;

type NotificationInboxMessage = {
notification: Notification;
Expand Down
39 changes: 29 additions & 10 deletions src/ids/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ import { IdInternal, IdSortable, IdRandom } from '@matrixai/id';
import * as keysUtilsRandom from '../keys/utils/random';
import * as validationErrors from '../validation/errors';

/**
* Generates an auditId from an epoch timestamp.
*
* @param epoch
* @param randomSource
*/
function generateSortableIdFromTimestamp<T extends IdInternal & number>(
epoch: number,
randomSource: (size: number) => Uint8Array = keysUtilsRandom.getRandomBytes,
): T {
const generator = new IdSortable<T>({
timeSource: () => () => epoch,
randomSource,
});
return generator.get();
}

function createPermIdGenerator(): () => PermissionId {
const generator = new IdRandom<PermissionId>({
randomSource: keysUtilsRandom.getRandomBytes,
Expand Down Expand Up @@ -78,16 +95,8 @@ function decodeAuditEventId(
* @param epoch
* @param randomSource
*/
function generateAuditEventIdFromTimestamp(
epoch: number,
randomSource: (size: number) => Uint8Array = keysUtilsRandom.getRandomBytes,
): AuditEventId {
const generator = new IdSortable<AuditEventId>({
timeSource: () => () => epoch,
randomSource,
});
return generator.get();
}
const generateAuditEventIdFromTimestamp =
generateSortableIdFromTimestamp<AuditEventId>;

/**
* Creates a NodeId generator.
Expand Down Expand Up @@ -518,6 +527,15 @@ function decodeNotificationId(
return notificationId;
}

/**
* Generates a NotificationId from an epoch timestamp.
*
* @param epoch
* @param randomSource
*/
const generateNotificationIdFromTimestamp =
generateSortableIdFromTimestamp<NotificationId>;

export {
createPermIdGenerator,
createAuditEventIdGenerator,
Expand Down Expand Up @@ -563,6 +581,7 @@ export {
createNotificationIdGenerator,
encodeNotificationId,
decodeNotificationId,
generateNotificationIdFromTimestamp,
};

export * from './types';
73 changes: 69 additions & 4 deletions src/notifications/NotificationsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,17 +397,40 @@ class NotificationsManager {
}

protected async *getOutboxNotificationIds({
seek,
seekEnd,
order = 'asc',
limit,
tran,
}: {
seek?: NotificationId | number | Date;
seekEnd?: NotificationId | number | Date;
order?: 'asc' | 'desc';
limit?: number;
tran: DBTransaction;
}): AsyncGenerator<NotificationId> {
const seekId =
seek != null
? notificationsUtils.extractFromSeek(
seek,
(size) => new Uint8Array(size),
).notificationId
: undefined;
const seekEndId =
seekEnd != null
? notificationsUtils.extractFromSeek(seekEnd, (size) =>
new Uint8Array(size).fill(0xff),
).notificationId
: undefined;

const messageIterator = tran.iterator<NotificationDB>(
this.notificationsManagerOutboxDbPath,
{ valueAsBuffer: false, reverse: order !== 'asc' },
{
valueAsBuffer: false,
reverse: order !== 'asc',
gte: seekId?.toBuffer(),
lte: seekEndId?.toBuffer(),
},
);
let i = 0;
for await (const [keyPath] of messageIterator) {
Expand Down Expand Up @@ -444,23 +467,29 @@ class NotificationsManager {
*/
@ready(new notificationsErrors.ErrorNotificationsNotRunning())
public async *readOutboxNotifications({
seek,
seekEnd,
limit,
order = 'asc',
tran,
}: {
seek?: NotificationId | number | Date;
seekEnd?: NotificationId | number | Date;
order?: 'asc' | 'desc';
limit?: number;
tran?: DBTransaction;
} = {}): AsyncGenerator<Notification> {
if (tran == null) {
const readOutboxNotifications = (tran) =>
this.readOutboxNotifications({ limit, order, tran });
this.readOutboxNotifications({ seek, seekEnd, limit, order, tran });
return yield* this.db.withTransactionG(async function* (tran) {
return yield* readOutboxNotifications(tran);
});
}

const notificationIds = this.getOutboxNotificationIds({
seek,
seekEnd,
limit,
order,
tran,
Expand Down Expand Up @@ -618,24 +647,37 @@ class NotificationsManager {
*/
@ready(new notificationsErrors.ErrorNotificationsNotRunning())
public async *readInboxNotifications({
seek,
seekEnd,
unread = false,
order = 'asc',
limit,
tran,
}: {
seek?: NotificationId | number | Date;
seekEnd?: NotificationId | number | Date;
unread?: boolean;
order?: 'asc' | 'desc';
limit?: number;
tran?: DBTransaction;
} = {}): AsyncGenerator<Notification> {
if (tran == null) {
const readNotifications = (tran) =>
this.readInboxNotifications({ unread, limit, order, tran });
this.readInboxNotifications({
seek,
seekEnd,
unread,
limit,
order,
tran,
});
return yield* this.db.withTransactionG(async function* (tran) {
return yield* readNotifications(tran);
});
}
const notificationIds = this.getInboxNotificationIds({
seek,
seekEnd,
unread,
limit,
order,
Expand Down Expand Up @@ -713,19 +755,42 @@ class NotificationsManager {
}

protected async *getInboxNotificationIds({
seek,
seekEnd,
unread = false,
order = 'asc',
limit,
tran,
}: {
seek?: NotificationId | number | Date;
seekEnd?: NotificationId | number | Date;
unread?: boolean;
order?: 'asc' | 'desc';
limit?: number;
tran: DBTransaction;
}): AsyncGenerator<NotificationId> {
const seekId =
seek != null
? notificationsUtils.extractFromSeek(
seek,
(size) => new Uint8Array(size),
).notificationId
: undefined;
const seekEndId =
seekEnd != null
? notificationsUtils.extractFromSeek(seekEnd, (size) =>
new Uint8Array(size).fill(0xff),
).notificationId
: undefined;

const messageIterator = tran.iterator<NotificationDB>(
this.notificationsManagerInboxDbPath,
{ valueAsBuffer: false, reverse: order !== 'asc' },
{
valueAsBuffer: false,
reverse: order !== 'asc',
gte: seekId?.toBuffer(),
lte: seekEndId?.toBuffer(),
},
);
let i = 0;
for await (const [keyPath, notificationDb] of messageIterator) {
Expand Down
Loading

0 comments on commit 82553ac

Please sign in to comment.