Skip to content

Commit

Permalink
feat: add cursor pagination support on sync messages (#33810)
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardogarim authored Nov 19, 2024
1 parent c71d5de commit e7edeac
Show file tree
Hide file tree
Showing 8 changed files with 841 additions and 50 deletions.
6 changes: 6 additions & 0 deletions .changeset/mean-cobras-sneeze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@rocket.chat/rest-typings': minor
'@rocket.chat/meteor': minor
---

Adds cursor pagination on chat.syncMessages endpoint
32 changes: 22 additions & 10 deletions apps/meteor/app/api/server/v1/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
isChatUpdateProps,
isChatGetThreadsListProps,
isChatDeleteProps,
isChatSyncMessagesProps,
} from '@rocket.chat/rest-typings';
import { escapeRegExp } from '@rocket.chat/string-helpers';
import { Meteor } from 'meteor/meteor';
Expand Down Expand Up @@ -74,31 +75,42 @@ API.v1.addRoute(

API.v1.addRoute(
'chat.syncMessages',
{ authRequired: true },
{ authRequired: true, validateParams: isChatSyncMessagesProps },
{
async get() {
const { roomId, lastUpdate } = this.queryParams;
const { roomId, lastUpdate, count, next, previous, type } = this.queryParams;

if (!roomId) {
throw new Meteor.Error('error-roomId-param-not-provided', 'The required "roomId" query param is missing.');
throw new Meteor.Error('error-param-required', 'The required "roomId" query param is missing');
}

if (!lastUpdate && !type) {
throw new Meteor.Error('error-param-required', 'The "type" or "lastUpdate" parameters must be provided');
}

if (!lastUpdate) {
throw new Meteor.Error('error-lastUpdate-param-not-provided', 'The required "lastUpdate" query param is missing.');
} else if (isNaN(Date.parse(lastUpdate))) {
throw new Meteor.Error('error-roomId-param-invalid', 'The "lastUpdate" query parameter must be a valid date.');
if (lastUpdate && isNaN(Date.parse(lastUpdate))) {
throw new Meteor.Error('error-lastUpdate-param-invalid', 'The "lastUpdate" query parameter must be a valid date');
}

const result = await Meteor.callAsync('messages/get', roomId, { lastUpdate: new Date(lastUpdate) });
const getMessagesQuery = {
...(lastUpdate && { lastUpdate: new Date(lastUpdate) }),
...(next && { next }),
...(previous && { previous }),
...(count && { count }),
...(type && { type }),
};

const result = await Meteor.callAsync('messages/get', roomId, getMessagesQuery);

if (!result) {
return API.v1.failure();
}

return API.v1.success({
result: {
updated: await normalizeMessagesForUser(result.updated, this.userId),
deleted: result.deleted,
...(result.updated && { updated: await normalizeMessagesForUser(result.updated, this.userId) }),
...(result.deleted && { deleted: result.deleted }),
...(result.cursor && { cursor: result.cursor }),
},
});
},
Expand Down
11 changes: 4 additions & 7 deletions apps/meteor/server/models/raw/Messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -898,16 +898,13 @@ export class MessagesRaw extends BaseRaw<IMessage> implements IMessagesModel {
return this.find(query, options);
}

findForUpdates(roomId: string, timestamp: Date, options?: FindOptions<IMessage>): FindCursor<IMessage> {
findForUpdates(roomId: IMessage['rid'], timestamp: { $lt: Date } | { $gt: Date }, options?: FindOptions<IMessage>): FindCursor<IMessage> {
const query = {
_hidden: {
$ne: true,
},
rid: roomId,
_updatedAt: {
$gt: timestamp,
},
_hidden: { $ne: true },
_updatedAt: timestamp,
};

return this.find(query, options);
}

Expand Down
236 changes: 206 additions & 30 deletions apps/meteor/server/publications/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,65 +7,241 @@ import type { FindOptions } from 'mongodb';

import { canAccessRoomIdAsync } from '../../app/authorization/server/functions/canAccessRoom';

type CursorPaginationType = 'UPDATED' | 'DELETED';

declare module '@rocket.chat/ddp-client' {
// eslint-disable-next-line @typescript-eslint/naming-convention
interface ServerMethods {
'messages/get': (
rid: IRoom['_id'],
options: { lastUpdate?: Date; latestDate?: Date; oldestDate?: Date; inclusive?: boolean; count?: number; unreads?: boolean },
options: {
lastUpdate?: Date;
latestDate?: Date;
oldestDate?: Date;
inclusive?: boolean;
count?: number;
unreads?: boolean;
next?: string;
previous?: string;
type?: CursorPaginationType;
},
) => Promise<{
updated: IMessage[];
deleted: IMessage[];
cursor: {
next: string | null;
previous: string | null;
};
}>;
}
}

export function extractTimestampFromCursor(cursor: string): Date {
const timestamp = parseInt(cursor, 10);

if (isNaN(timestamp) || new Date(timestamp).toString() === 'Invalid Date') {
throw new Error('Invalid Date');
}

return new Date(timestamp);
}

export function mountCursorQuery({ next, previous, count }: { next?: string; previous?: string; count: number }): {
query: { $gt: Date } | { $lt: Date };
options: FindOptions<IMessage>;
} {
const options: FindOptions<IMessage> = {
sort: { _updatedAt: 1 },
...(next || previous ? { limit: count + 1 } : {}),
};

if (next) {
return { query: { $gt: extractTimestampFromCursor(next) }, options };
}

if (previous) {
return { query: { $lt: extractTimestampFromCursor(previous) }, options: { ...options, sort: { _updatedAt: -1 } } };
}

return { query: { $gt: new Date(0) }, options };
}

export function mountCursorFromMessage(message: IMessage & { _deletedAt?: Date }, type: 'UPDATED' | 'DELETED'): string {
if (type === 'UPDATED' && message._updatedAt) {
return `${message._updatedAt.getTime()}`;
}

if (type === 'DELETED' && message._deletedAt) {
return `${message._deletedAt.getTime()}`;
}

throw new Meteor.Error('error-cursor-not-found', 'Cursor not found', { method: 'messages/get' });
}

export function mountNextCursor(
messages: IMessage[],
count: number,
type: CursorPaginationType,
next?: string,
previous?: string,
): string | null {
if (messages.length === 0) {
return null;
}

if (previous) {
return mountCursorFromMessage(messages[0], type);
}

if (messages.length <= count && next) {
return null;
}

if (messages.length > count && next) {
return mountCursorFromMessage(messages[messages.length - 2], type);
}

return mountCursorFromMessage(messages[messages.length - 1], type);
}

export function mountPreviousCursor(
messages: IMessage[],
count: number,
type: CursorPaginationType,
next?: string,
previous?: string,
): string | null {
if (messages.length === 0) {
return null;
}

if (messages.length <= count && next) {
return mountCursorFromMessage(messages[0], type);
}

if (messages.length > count && next) {
return mountCursorFromMessage(messages[0], type);
}

if (messages.length <= count && previous) {
return null;
}

if (messages.length > count && previous) {
return mountCursorFromMessage(messages[messages.length - 2], type);
}

return mountCursorFromMessage(messages[0], type);
}

export async function handleWithoutPagination(rid: IRoom['_id'], lastUpdate: Date) {
const query = { $gt: lastUpdate };
const options: FindOptions<IMessage> = { sort: { ts: -1 } };

const [updatedMessages, deletedMessages] = await Promise.all([
Messages.findForUpdates(rid, query, options).toArray(),
Messages.trashFindDeletedAfter(lastUpdate, { rid }, { projection: { _id: 1, _deletedAt: 1 }, ...options }).toArray(),
]);

return {
updated: updatedMessages,
deleted: deletedMessages,
};
}

export async function handleCursorPagination(
type: CursorPaginationType,
rid: IRoom['_id'],
count: number,
next?: string,
previous?: string,
) {
const { query, options } = mountCursorQuery({ next, previous, count });

const response =
type === 'UPDATED'
? await Messages.findForUpdates(rid, query, options).toArray()
: ((await Messages.trashFind({ rid, _deletedAt: query }, { projection: { _id: 1, _deletedAt: 1 }, ...options })!.toArray()) ?? []);

const cursor = {
next: mountNextCursor(response, count, type, next, previous),
previous: mountPreviousCursor(response, count, type, next, previous),
};

if (response.length > count) {
response.pop();
}

return {
[type.toLowerCase()]: response,
cursor,
};
}

Meteor.methods<ServerMethods>({
async 'messages/get'(rid, { lastUpdate, latestDate = new Date(), oldestDate, inclusive = false, count = 20, unreads = false }) {
async 'messages/get'(
rid,
{ lastUpdate, latestDate = new Date(), oldestDate, inclusive = false, count = 20, unreads = false, next, previous, type },
) {
check(rid, String);

const fromId = Meteor.userId();

if (!fromId) {
throw new Meteor.Error('error-invalid-user', 'Invalid user', {
method: 'messages/get',
});
throw new Meteor.Error('error-invalid-user', 'Invalid user', { method: 'messages/get' });
}

if (!rid) {
throw new Meteor.Error('error-invalid-room', 'Invalid room', { method: 'messages/get' });
}

if (!(await canAccessRoomIdAsync(rid, fromId))) {
throw new Meteor.Error('error-not-allowed', 'Not allowed', {
method: 'messages/get',
throw new Meteor.Error('error-not-allowed', 'Not allowed', { method: 'messages/get' });
}

if (type && !['UPDATED', 'DELETED'].includes(type)) {
throw new Meteor.Error('error-type-param-not-supported', 'The "type" parameter must be either "UPDATED" or "DELETED"');
}

if ((next || previous) && !type) {
throw new Meteor.Error(
'error-type-param-required',
'The "type" parameter is required when using the "next" or "previous" parameters',
);
}

if (next && previous) {
throw new Meteor.Error('error-cursor-conflict', 'You cannot provide both "next" and "previous" parameters');
}

if ((next || previous) && lastUpdate) {
throw new Meteor.Error(
'error-cursor-and-lastUpdate-conflict',
'The attributes "next", "previous" and "lastUpdate" cannot be used together',
);
}

const hasCursorPagination = !!((next || previous) && count !== null && type);

if (!hasCursorPagination && !lastUpdate) {
return Meteor.callAsync('getChannelHistory', {
rid,
latest: latestDate,
oldest: oldestDate,
inclusive,
count,
unreads,
});
}

const options: FindOptions<IMessage> = {
sort: {
ts: -1,
},
};

if (lastUpdate instanceof Date) {
return {
updated: await Messages.findForUpdates(rid, lastUpdate, {
sort: {
ts: -1,
},
}).toArray(),
deleted: await Messages.trashFindDeletedAfter(lastUpdate, { rid }, { ...options, projection: { _id: 1, _deletedAt: 1 } }).toArray(),
};
if (lastUpdate) {
return handleWithoutPagination(rid, lastUpdate);
}

if (!type) {
throw new Meteor.Error('error-param-required', 'The "type" or "lastUpdate" parameters must be provided');
}

return Meteor.callAsync('getChannelHistory', {
rid,
latest: latestDate,
oldest: oldestDate,
inclusive,
count,
unreads,
});
return handleCursorPagination(type, rid, count, next, previous);
},
});
Loading

0 comments on commit e7edeac

Please sign in to comment.