From e7edeac3bdd22da0a04b8e873d5a008e249fb4be Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Tue, 19 Nov 2024 18:29:09 -0300 Subject: [PATCH] feat: add cursor pagination support on sync messages (#33810) --- .changeset/mean-cobras-sneeze.md | 6 + apps/meteor/app/api/server/v1/chat.ts | 32 +- apps/meteor/server/models/raw/Messages.ts | 11 +- apps/meteor/server/publications/messages.ts | 236 ++++++++++++-- apps/meteor/tests/end-to-end/api/chat.ts | 299 ++++++++++++++++++ .../unit/server/publications/messages.spec.ts | 275 ++++++++++++++++ .../src/models/IMessagesModel.ts | 2 +- packages/rest-typings/src/v1/chat.ts | 30 +- 8 files changed, 841 insertions(+), 50 deletions(-) create mode 100644 .changeset/mean-cobras-sneeze.md create mode 100644 apps/meteor/tests/unit/server/publications/messages.spec.ts diff --git a/.changeset/mean-cobras-sneeze.md b/.changeset/mean-cobras-sneeze.md new file mode 100644 index 000000000000..39717f0c0d89 --- /dev/null +++ b/.changeset/mean-cobras-sneeze.md @@ -0,0 +1,6 @@ +--- +'@rocket.chat/rest-typings': minor +'@rocket.chat/meteor': minor +--- + +Adds cursor pagination on chat.syncMessages endpoint diff --git a/apps/meteor/app/api/server/v1/chat.ts b/apps/meteor/app/api/server/v1/chat.ts index 05eb7208c9b4..3eb819070cd6 100644 --- a/apps/meteor/app/api/server/v1/chat.ts +++ b/apps/meteor/app/api/server/v1/chat.ts @@ -7,6 +7,7 @@ import { isChatUpdateProps, isChatGetThreadsListProps, isChatDeleteProps, + isChatSyncMessagesProps, } from '@rocket.chat/rest-typings'; import { escapeRegExp } from '@rocket.chat/string-helpers'; import { Meteor } from 'meteor/meteor'; @@ -74,22 +75,32 @@ API.v1.addRoute( API.v1.addRoute( 'chat.syncMessages', - { authRequired: true }, + { authRequired: true, validateParams: isChatSyncMessagesProps }, { async get() { - const { roomId, lastUpdate } = this.queryParams; + const { roomId, lastUpdate, count, next, previous, type } = this.queryParams; if (!roomId) { - throw new Meteor.Error('error-roomId-param-not-provided', 'The required "roomId" query param is missing.'); + throw new Meteor.Error('error-param-required', 'The required "roomId" query param is missing'); + } + + if (!lastUpdate && !type) { + throw new Meteor.Error('error-param-required', 'The "type" or "lastUpdate" parameters must be provided'); } - if (!lastUpdate) { - throw new Meteor.Error('error-lastUpdate-param-not-provided', 'The required "lastUpdate" query param is missing.'); - } else if (isNaN(Date.parse(lastUpdate))) { - throw new Meteor.Error('error-roomId-param-invalid', 'The "lastUpdate" query parameter must be a valid date.'); + if (lastUpdate && isNaN(Date.parse(lastUpdate))) { + throw new Meteor.Error('error-lastUpdate-param-invalid', 'The "lastUpdate" query parameter must be a valid date'); } - const result = await Meteor.callAsync('messages/get', roomId, { lastUpdate: new Date(lastUpdate) }); + const getMessagesQuery = { + ...(lastUpdate && { lastUpdate: new Date(lastUpdate) }), + ...(next && { next }), + ...(previous && { previous }), + ...(count && { count }), + ...(type && { type }), + }; + + const result = await Meteor.callAsync('messages/get', roomId, getMessagesQuery); if (!result) { return API.v1.failure(); @@ -97,8 +108,9 @@ API.v1.addRoute( return API.v1.success({ result: { - updated: await normalizeMessagesForUser(result.updated, this.userId), - deleted: result.deleted, + ...(result.updated && { updated: await normalizeMessagesForUser(result.updated, this.userId) }), + ...(result.deleted && { deleted: result.deleted }), + ...(result.cursor && { cursor: result.cursor }), }, }); }, diff --git a/apps/meteor/server/models/raw/Messages.ts b/apps/meteor/server/models/raw/Messages.ts index 252d07af754d..e84abc3ed481 100644 --- a/apps/meteor/server/models/raw/Messages.ts +++ b/apps/meteor/server/models/raw/Messages.ts @@ -898,16 +898,13 @@ export class MessagesRaw extends BaseRaw implements IMessagesModel { return this.find(query, options); } - findForUpdates(roomId: string, timestamp: Date, options?: FindOptions): FindCursor { + findForUpdates(roomId: IMessage['rid'], timestamp: { $lt: Date } | { $gt: Date }, options?: FindOptions): FindCursor { const query = { - _hidden: { - $ne: true, - }, rid: roomId, - _updatedAt: { - $gt: timestamp, - }, + _hidden: { $ne: true }, + _updatedAt: timestamp, }; + return this.find(query, options); } diff --git a/apps/meteor/server/publications/messages.ts b/apps/meteor/server/publications/messages.ts index ebf813a637d0..21fcfd3524c0 100644 --- a/apps/meteor/server/publications/messages.ts +++ b/apps/meteor/server/publications/messages.ts @@ -7,29 +7,188 @@ import type { FindOptions } from 'mongodb'; import { canAccessRoomIdAsync } from '../../app/authorization/server/functions/canAccessRoom'; +type CursorPaginationType = 'UPDATED' | 'DELETED'; + declare module '@rocket.chat/ddp-client' { // eslint-disable-next-line @typescript-eslint/naming-convention interface ServerMethods { 'messages/get': ( rid: IRoom['_id'], - options: { lastUpdate?: Date; latestDate?: Date; oldestDate?: Date; inclusive?: boolean; count?: number; unreads?: boolean }, + options: { + lastUpdate?: Date; + latestDate?: Date; + oldestDate?: Date; + inclusive?: boolean; + count?: number; + unreads?: boolean; + next?: string; + previous?: string; + type?: CursorPaginationType; + }, ) => Promise<{ updated: IMessage[]; deleted: IMessage[]; + cursor: { + next: string | null; + previous: string | null; + }; }>; } } +export function extractTimestampFromCursor(cursor: string): Date { + const timestamp = parseInt(cursor, 10); + + if (isNaN(timestamp) || new Date(timestamp).toString() === 'Invalid Date') { + throw new Error('Invalid Date'); + } + + return new Date(timestamp); +} + +export function mountCursorQuery({ next, previous, count }: { next?: string; previous?: string; count: number }): { + query: { $gt: Date } | { $lt: Date }; + options: FindOptions; +} { + const options: FindOptions = { + sort: { _updatedAt: 1 }, + ...(next || previous ? { limit: count + 1 } : {}), + }; + + if (next) { + return { query: { $gt: extractTimestampFromCursor(next) }, options }; + } + + if (previous) { + return { query: { $lt: extractTimestampFromCursor(previous) }, options: { ...options, sort: { _updatedAt: -1 } } }; + } + + return { query: { $gt: new Date(0) }, options }; +} + +export function mountCursorFromMessage(message: IMessage & { _deletedAt?: Date }, type: 'UPDATED' | 'DELETED'): string { + if (type === 'UPDATED' && message._updatedAt) { + return `${message._updatedAt.getTime()}`; + } + + if (type === 'DELETED' && message._deletedAt) { + return `${message._deletedAt.getTime()}`; + } + + throw new Meteor.Error('error-cursor-not-found', 'Cursor not found', { method: 'messages/get' }); +} + +export function mountNextCursor( + messages: IMessage[], + count: number, + type: CursorPaginationType, + next?: string, + previous?: string, +): string | null { + if (messages.length === 0) { + return null; + } + + if (previous) { + return mountCursorFromMessage(messages[0], type); + } + + if (messages.length <= count && next) { + return null; + } + + if (messages.length > count && next) { + return mountCursorFromMessage(messages[messages.length - 2], type); + } + + return mountCursorFromMessage(messages[messages.length - 1], type); +} + +export function mountPreviousCursor( + messages: IMessage[], + count: number, + type: CursorPaginationType, + next?: string, + previous?: string, +): string | null { + if (messages.length === 0) { + return null; + } + + if (messages.length <= count && next) { + return mountCursorFromMessage(messages[0], type); + } + + if (messages.length > count && next) { + return mountCursorFromMessage(messages[0], type); + } + + if (messages.length <= count && previous) { + return null; + } + + if (messages.length > count && previous) { + return mountCursorFromMessage(messages[messages.length - 2], type); + } + + return mountCursorFromMessage(messages[0], type); +} + +export async function handleWithoutPagination(rid: IRoom['_id'], lastUpdate: Date) { + const query = { $gt: lastUpdate }; + const options: FindOptions = { sort: { ts: -1 } }; + + const [updatedMessages, deletedMessages] = await Promise.all([ + Messages.findForUpdates(rid, query, options).toArray(), + Messages.trashFindDeletedAfter(lastUpdate, { rid }, { projection: { _id: 1, _deletedAt: 1 }, ...options }).toArray(), + ]); + + return { + updated: updatedMessages, + deleted: deletedMessages, + }; +} + +export async function handleCursorPagination( + type: CursorPaginationType, + rid: IRoom['_id'], + count: number, + next?: string, + previous?: string, +) { + const { query, options } = mountCursorQuery({ next, previous, count }); + + const response = + type === 'UPDATED' + ? await Messages.findForUpdates(rid, query, options).toArray() + : ((await Messages.trashFind({ rid, _deletedAt: query }, { projection: { _id: 1, _deletedAt: 1 }, ...options })!.toArray()) ?? []); + + const cursor = { + next: mountNextCursor(response, count, type, next, previous), + previous: mountPreviousCursor(response, count, type, next, previous), + }; + + if (response.length > count) { + response.pop(); + } + + return { + [type.toLowerCase()]: response, + cursor, + }; +} + Meteor.methods({ - async 'messages/get'(rid, { lastUpdate, latestDate = new Date(), oldestDate, inclusive = false, count = 20, unreads = false }) { + async 'messages/get'( + rid, + { lastUpdate, latestDate = new Date(), oldestDate, inclusive = false, count = 20, unreads = false, next, previous, type }, + ) { check(rid, String); const fromId = Meteor.userId(); if (!fromId) { - throw new Meteor.Error('error-invalid-user', 'Invalid user', { - method: 'messages/get', - }); + throw new Meteor.Error('error-invalid-user', 'Invalid user', { method: 'messages/get' }); } if (!rid) { @@ -37,35 +196,52 @@ Meteor.methods({ } if (!(await canAccessRoomIdAsync(rid, fromId))) { - throw new Meteor.Error('error-not-allowed', 'Not allowed', { - method: 'messages/get', + throw new Meteor.Error('error-not-allowed', 'Not allowed', { method: 'messages/get' }); + } + + if (type && !['UPDATED', 'DELETED'].includes(type)) { + throw new Meteor.Error('error-type-param-not-supported', 'The "type" parameter must be either "UPDATED" or "DELETED"'); + } + + if ((next || previous) && !type) { + throw new Meteor.Error( + 'error-type-param-required', + 'The "type" parameter is required when using the "next" or "previous" parameters', + ); + } + + if (next && previous) { + throw new Meteor.Error('error-cursor-conflict', 'You cannot provide both "next" and "previous" parameters'); + } + + if ((next || previous) && lastUpdate) { + throw new Meteor.Error( + 'error-cursor-and-lastUpdate-conflict', + 'The attributes "next", "previous" and "lastUpdate" cannot be used together', + ); + } + + const hasCursorPagination = !!((next || previous) && count !== null && type); + + if (!hasCursorPagination && !lastUpdate) { + return Meteor.callAsync('getChannelHistory', { + rid, + latest: latestDate, + oldest: oldestDate, + inclusive, + count, + unreads, }); } - const options: FindOptions = { - sort: { - ts: -1, - }, - }; - - if (lastUpdate instanceof Date) { - return { - updated: await Messages.findForUpdates(rid, lastUpdate, { - sort: { - ts: -1, - }, - }).toArray(), - deleted: await Messages.trashFindDeletedAfter(lastUpdate, { rid }, { ...options, projection: { _id: 1, _deletedAt: 1 } }).toArray(), - }; + if (lastUpdate) { + return handleWithoutPagination(rid, lastUpdate); + } + + if (!type) { + throw new Meteor.Error('error-param-required', 'The "type" or "lastUpdate" parameters must be provided'); } - return Meteor.callAsync('getChannelHistory', { - rid, - latest: latestDate, - oldest: oldestDate, - inclusive, - count, - unreads, - }); + return handleCursorPagination(type, rid, count, next, previous); }, }); diff --git a/apps/meteor/tests/end-to-end/api/chat.ts b/apps/meteor/tests/end-to-end/api/chat.ts index 8633b92f6ddc..2e23e2613779 100644 --- a/apps/meteor/tests/end-to-end/api/chat.ts +++ b/apps/meteor/tests/end-to-end/api/chat.ts @@ -3149,6 +3149,305 @@ describe('[Chat]', () => { filterDiscussionsByText(text); }); }); + + describe('[/chat.syncMessages]', () => { + let testChannel: IRoom; + + before(async () => { + testChannel = (await createRoom({ type: 'c', name: `channel.test.syncMessages.${Date.now()}` })).body.channel; + }); + + after(() => deleteRoom({ type: 'c', roomId: testChannel._id })); + + it('should return an error when the required "roomId" parameter is not sent', (done) => { + void request + .get(api('chat.syncMessages')) + .set(credentials) + .expect('Content-Type', 'application/json') + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.errorType).to.be.equal('invalid-params'); + expect(res.body.error).to.include(`must have required property 'roomId'`); + }) + .end(done); + }); + + it('should return an error when the neither "lastUpdate" or "type" parameter is sent', (done) => { + void request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id }) + .expect('Content-Type', 'application/json') + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.errorType).to.be.equal('error-param-required'); + expect(res.body.error).to.include('The "type" or "lastUpdate" parameters must be provided'); + }) + .end(done); + }); + + it('should return an error when the "lastUpdate" parameter is invalid', (done) => { + void request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: 'invalid-room', lastUpdate: 'invalid-date' }) + .expect('Content-Type', 'application/json') + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.errorType).to.be.equal('error-lastUpdate-param-invalid'); + expect(res.body.error).to.include('The "lastUpdate" query parameter must be a valid date'); + }) + .end(done); + }); + + it('should return an error when user provides an invalid roomId', (done) => { + void request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: 'invalid-room', lastUpdate: new Date().toISOString() }) + .expect('Content-Type', 'application/json') + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.errorType).to.be.equal('error-not-allowed'); + expect(res.body.error).to.include('Not allowed'); + }) + .end(done); + }); + + it('should return an error when the "type" parameter is not supported', (done) => { + void request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id, type: 'invalid-type', next: new Date().toISOString() }) + .expect('Content-Type', 'application/json') + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.errorType).to.be.equal('invalid-params'); + expect(res.body.error).to.include('must be equal to one of the allowed values'); + }) + .end(done); + }); + + it('should return an error when the "next" or "previous" parameter is sent without the "type" parameter', async () => { + const nextResponse = await request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id, next: new Date().toISOString() }); + + const previousResponse = await request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id, previous: new Date().toISOString() }); + + expect(nextResponse.statusCode).to.equal(400); + expect(nextResponse.body).to.have.property('success', false); + expect(nextResponse.body.errorType).to.be.equal('error-param-required'); + expect(nextResponse.body.error).to.include('The "type" or "lastUpdate" parameters must be provided'); + + expect(previousResponse.statusCode).to.equal(400); + expect(previousResponse.body).to.have.property('success', false); + expect(previousResponse.body.errorType).to.be.equal('error-param-required'); + expect(previousResponse.body.error).to.include('The "type" or "lastUpdate" parameters must be provided'); + }); + + it('should return an error when both "next" and "previous" are sent', (done) => { + void request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id, type: 'UPDATED', next: new Date().toISOString(), previous: new Date().toISOString() }) + .expect('Content-Type', 'application/json') + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.errorType).to.be.equal('error-cursor-conflict'); + expect(res.body.error).to.include('You cannot provide both "next" and "previous" parameters'); + }) + .end(done); + }); + + it('should return an error when both "next" or "previous" and "lastUpdate" are sent', (done) => { + void request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id, type: 'UPDATED', next: new Date().toISOString(), lastUpdate: new Date().toISOString() }) + .expect('Content-Type', 'application/json') + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.errorType).to.be.equal('error-cursor-and-lastUpdate-conflict'); + expect(res.body.error).to.include('The attributes "next", "previous" and "lastUpdate" cannot be used together'); + }) + .end(done); + }); + + it('should return an error when neither "type" or "lastUpdate" are sent', (done) => { + void request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id }) + .expect('Content-Type', 'application/json') + .expect(400) + .expect((res) => { + expect(res.body).to.have.property('success', false); + expect(res.body.errorType).to.be.equal('error-param-required'); + expect(res.body.error).to.include('The "type" or "lastUpdate" parameters must be provided'); + }) + .end(done); + }); + + it('should return an empty response when there are no messages to sync', (done) => { + void request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id, lastUpdate: new Date().toISOString() }) + .expect('Content-Type', 'application/json') + .expect(200) + .expect((res) => { + expect(res.body).to.have.property('success', true); + expect(res.body.result).to.have.property('updated').and.to.be.an('array'); + expect(res.body.result).to.have.property('deleted').and.to.be.an('array'); + expect(res.body.result.updated).to.have.lengthOf(0); + expect(res.body.result.deleted).to.have.lengthOf(0); + }) + .end(done); + }); + + it('should return all updated and deleted messages since "lastUpdate" parameter date', async () => { + const lastUpdate = new Date().toISOString(); + + // Create two messages isolated to avoid ts conflict + const firstMessage = await sendSimpleMessage({ roomId: testChannel._id, text: 'First Message' }); + const secondMessage = await sendSimpleMessage({ roomId: testChannel._id, text: 'Second Message' }); + + const response = await request.get(api('chat.syncMessages')).set(credentials).query({ roomId: testChannel._id, lastUpdate }); + + expect(response.body.result.updated).to.have.lengthOf(2); + expect(response.body.result.updated[0]._id).to.be.equal(secondMessage.body.message._id); + expect(response.body.result.updated[1]._id).to.be.equal(firstMessage.body.message._id); + expect(response.body.result.deleted).to.have.lengthOf(0); + + await deleteMessage({ roomId: testChannel._id, msgId: firstMessage.body.message._id }); + + const responseAfterDelete = await request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id, lastUpdate }); + + expect(responseAfterDelete.body.result.updated).to.have.lengthOf(1); + expect(responseAfterDelete.body.result.updated[0]._id).to.be.equal(secondMessage.body.message._id); + expect(responseAfterDelete.body.result.deleted).to.have.lengthOf(1); + expect(responseAfterDelete.body.result.deleted[0]._id).to.be.equal(firstMessage.body.message._id); + + await deleteMessage({ roomId: testChannel._id, msgId: secondMessage.body.message._id }); + }); + + it('should return all updated messages with a cursor when "type" parameter is "UPDATED"', async () => { + const lastUpdate = new Date(); + const firstMessage = await sendSimpleMessage({ roomId: testChannel._id, text: 'First Message' }); + const secondMessage = await sendSimpleMessage({ roomId: testChannel._id, text: 'Second Message' }); + const thirdMessage = await sendSimpleMessage({ roomId: testChannel._id, text: 'Third Message' }); + + const response = await request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ + roomId: testChannel._id, + next: new Date(lastUpdate).getTime().toString(), + type: 'UPDATED', + count: 2, + }); + + expect(response.body.result.updated).to.have.lengthOf(2); + expect(response.body.result.updated[0]._id).to.be.equal(firstMessage.body.message._id); + expect(response.body.result.updated[1]._id).to.be.equal(secondMessage.body.message._id); + expect(response.body.result.cursor) + .to.have.property('next') + .and.to.equal(new Date(response.body.result.updated[response.body.result.updated.length - 1]._updatedAt).getTime().toString()); + expect(response.body.result.cursor) + .to.have.property('previous') + .and.to.equal(new Date(response.body.result.updated[0]._updatedAt).getTime().toString()); + expect(response.body.result.cursor) + .to.have.property('previous') + .and.to.equal(new Date(firstMessage.body.message._updatedAt).getTime().toString()); + + const responseWithNext = await request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: testChannel._id, next: response.body.result.cursor.next, type: 'UPDATED', count: 2 }); + + expect(responseWithNext.body.result.updated).to.have.lengthOf(1); + expect(responseWithNext.body.result.updated[0]._id).to.be.equal(thirdMessage.body.message._id); + expect(responseWithNext.body.result.cursor).to.have.property('next').and.to.be.null; + expect(responseWithNext.body.result.cursor) + .to.have.property('previous') + .and.to.equal(new Date(thirdMessage.body.message._updatedAt).getTime().toString()); + + await Promise.all([ + deleteMessage({ roomId: testChannel._id, msgId: firstMessage.body.message._id }), + deleteMessage({ roomId: testChannel._id, msgId: secondMessage.body.message._id }), + deleteMessage({ roomId: testChannel._id, msgId: thirdMessage.body.message._id }), + ]); + }); + + it('should return all deleted messages with a cursor when "type" parameter is "DELETED"', async () => { + const newChannel = (await createRoom({ type: 'c', name: `channel.test.syncMessages.${Date.now()}` })).body.channel; + const lastUpdate = new Date(); + const firstMessage = (await sendSimpleMessage({ roomId: newChannel._id, text: 'First Message' })).body.message; + const secondMessage = (await sendSimpleMessage({ roomId: newChannel._id, text: 'Second Message' })).body.message; + const thirdMessage = (await sendSimpleMessage({ roomId: newChannel._id, text: 'Third Message' })).body.message; + + const response = await request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: newChannel._id, next: lastUpdate.getTime().toString(), type: 'DELETED', count: 2 }); + + expect(response.body.result.deleted).to.have.lengthOf(0); + expect(response.body.result.cursor).to.have.property('next').and.to.be.null; + expect(response.body.result.cursor).to.have.property('previous').and.to.be.null; + + const firstDeletedMessage = (await deleteMessage({ roomId: newChannel._id, msgId: firstMessage._id })).body.message; + const secondDeletedMessage = (await deleteMessage({ roomId: newChannel._id, msgId: secondMessage._id })).body.message; + const thirdDeletedMessage = (await deleteMessage({ roomId: newChannel._id, msgId: thirdMessage._id })).body.message; + + const responseAfterDelete = await request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: newChannel._id, next: lastUpdate.getTime().toString(), type: 'DELETED', count: 2 }); + + expect(responseAfterDelete.body.result.deleted).to.have.lengthOf(2); + expect(responseAfterDelete.body.result.deleted[0]._id).to.be.equal(firstDeletedMessage._id); + expect(responseAfterDelete.body.result.deleted[1]._id).to.be.equal(secondDeletedMessage._id); + expect(responseAfterDelete.body.result.cursor) + .to.have.property('next') + .and.to.equal( + new Date(responseAfterDelete.body.result.deleted[responseAfterDelete.body.result.deleted.length - 1]._deletedAt) + .getTime() + .toString(), + ); + expect(responseAfterDelete.body.result.cursor) + .to.have.property('previous') + .and.to.equal(new Date(responseAfterDelete.body.result.deleted[0]._deletedAt).getTime().toString()); + + const responseAfterDeleteWithPrevious = await request + .get(api('chat.syncMessages')) + .set(credentials) + .query({ roomId: newChannel._id, next: responseAfterDelete.body.result.cursor.next, type: 'DELETED', count: 2 }); + + expect(responseAfterDeleteWithPrevious.body.result.deleted).to.have.lengthOf(1); + expect(responseAfterDeleteWithPrevious.body.result.deleted[0]._id).to.be.equal(thirdDeletedMessage._id); + expect(responseAfterDeleteWithPrevious.body.result.cursor).to.have.property('next').and.to.be.null; + expect(responseAfterDeleteWithPrevious.body.result.cursor) + .to.have.property('previous') + .and.to.equal(new Date(responseAfterDeleteWithPrevious.body.result.deleted[0]._deletedAt).getTime().toString()); + + await deleteRoom({ type: 'c', roomId: newChannel._id }); + }); + }); }); describe('Threads', () => { diff --git a/apps/meteor/tests/unit/server/publications/messages.spec.ts b/apps/meteor/tests/unit/server/publications/messages.spec.ts new file mode 100644 index 000000000000..cb4a86b6cb55 --- /dev/null +++ b/apps/meteor/tests/unit/server/publications/messages.spec.ts @@ -0,0 +1,275 @@ +import type { IMessage } from '@rocket.chat/core-typings'; +import { expect } from 'chai'; +import { describe, it } from 'mocha'; +import proxyquire from 'proxyquire'; +import sinon from 'sinon'; + +const mockMeteorError = class extends Error { + constructor( + public error: string, + public reason: string, + public details: any, + ) { + super(reason); + this.name = 'Meteor.Error'; + } +}; + +const messagesMock = { + findForUpdates: sinon.stub(), + trashFindDeletedAfter: sinon.stub(), + trashFind: sinon.stub(), +}; + +const { + extractTimestampFromCursor, + mountCursorQuery, + mountCursorFromMessage, + mountNextCursor, + mountPreviousCursor, + handleWithoutPagination, + handleCursorPagination, +} = proxyquire.noCallThru().load('../../../../server/publications/messages', { + 'meteor/check': { + check: sinon.stub(), + }, + 'meteor/meteor': { + 'Meteor': { + methods: sinon.stub(), + Error: mockMeteorError, + }, + '@global': true, + }, + '@rocket.chat/models': { + Messages: messagesMock, + }, +}); + +describe('extractTimestampFromCursor', () => { + it('should return the correct timestamp', () => { + const cursor = new Date('2024-01-01T00:00:00.000Z').getTime().toString(); + const timestamp = extractTimestampFromCursor(cursor); + expect(timestamp).to.be.an.instanceOf(Date); + expect(timestamp.getTime()).to.equal(parseInt(cursor, 10)); + }); + + it('should handle non-date compliant string', () => { + expect(() => extractTimestampFromCursor('not-a-date')).to.throw(Error, 'Invalid Date'); + }); +}); + +describe('mountCursorQuery', () => { + const mockDate = new Date('2024-01-01T00:00:00.000Z').getTime(); + + it('should return a query with $gt when next is provided', () => { + const result = mountCursorQuery({ next: mockDate.toString() }); + expect(result.query).to.deep.equal({ $gt: new Date(mockDate) }); + }); + + it('should return a query with $lt when previous is provided', () => { + const result = mountCursorQuery({ previous: mockDate.toString() }); + expect(result.query).to.deep.equal({ $lt: new Date(mockDate) }); + }); + + it('should return a query with $gt and sort when neither next nor previous is provided', () => { + const result = mountCursorQuery({ count: 10 }); + expect(result.query).to.deep.equal({ $gt: new Date(0) }); + expect(result.options).to.deep.equal({ sort: { _updatedAt: 1 } }); + }); + + it('should return a query with $gt and ascending sort when next is provided', () => { + const result = mountCursorQuery({ next: mockDate.toString(), count: 10 }); + expect(result.query).to.deep.equal({ $gt: new Date(mockDate) }); + expect(result.options).to.deep.equal({ sort: { _updatedAt: 1 }, limit: 10 + 1 }); + }); + + it('should return a query with $gt and descending sort when previous is provided', () => { + const result = mountCursorQuery({ previous: mockDate.toString(), count: 10 }); + expect(result.query).to.deep.equal({ $lt: new Date(mockDate) }); + expect(result.options).to.deep.equal({ sort: { _updatedAt: -1 }, limit: 10 + 1 }); + }); +}); + +describe('mountCursorFromMessage', () => { + it('should return the updated timestamp for UPDATED type', () => { + const message: Pick = { + _updatedAt: new Date('2024-01-01T00:00:00Z'), + }; + + const result = mountCursorFromMessage(message, 'UPDATED'); + expect(result).to.equal(`${message._updatedAt.getTime()}`); + }); + + it('should return the deleted timestamp for DELETED type', () => { + const message: Partial & { _deletedAt: Date } = { + _deletedAt: new Date('2024-01-01T00:00:00Z'), + }; + + const result = mountCursorFromMessage(message, 'DELETED'); + expect(result).to.equal(`${message._deletedAt.getTime()}`); + }); + + it('should throw an error if DELETED type and _deletedAt is not present', () => { + const message: Pick = { + _updatedAt: new Date('2024-01-01T00:00:00Z'), + }; + + expect(() => mountCursorFromMessage(message, 'DELETED')).to.throw(mockMeteorError, 'Cursor not found'); + }); +}); + +describe('mountNextCursor', () => { + const mockMessage = (timestamp: number): Pick => ({ + _id: '1', + ts: new Date(timestamp), + _updatedAt: new Date(timestamp), + }); + + it('should return null if messages array is empty', () => { + expect(mountNextCursor([], 10, 'UPDATED')).to.be.null; + }); + + it('should return the first message cursor if previous is provided', () => { + const messages = [mockMessage(1000), mockMessage(2000)]; + expect(mountNextCursor(messages, 10, 'UPDATED', undefined, 'prev')).to.equal('1000'); + }); + + it('should return null if messages length is less than or equal to count and next is provided', () => { + const messages = [mockMessage(1000), mockMessage(2000)]; + expect(mountNextCursor(messages, 2, 'UPDATED', 'next')).to.be.null; + }); + + it('should return the second last message cursor if messages length is greater than count and next is provided', () => { + const messages = [mockMessage(1000), mockMessage(2000), mockMessage(3000)]; + expect(mountNextCursor(messages, 2, 'UPDATED', 'next')).to.equal('2000'); + }); + + it('should return the last message cursor if no next or previous is provided', () => { + const messages = [mockMessage(1000), mockMessage(2000)]; + expect(mountNextCursor(messages, 10, 'UPDATED')).to.equal('2000'); + }); +}); + +describe('mountPreviousCursor', () => { + const mockMessage = (timestamp: number): Pick => ({ + _id: '1', + ts: new Date(timestamp), + _updatedAt: new Date(timestamp), + }); + + it('should return null if messages array is empty', () => { + expect(mountPreviousCursor([], 10, 'UPDATED')).to.be.null; + }); + + it('should return the first message cursor if messages length is less than or equal to count and next is provided', () => { + const messages = [mockMessage(1000)]; + expect(mountPreviousCursor(messages, 1, 'UPDATED', 'nextCursor')).to.equal('1000'); + }); + + it('should return the first message cursor if messages length is greater than count and next is provided', () => { + const messages = [mockMessage(1000), mockMessage(2000)]; + expect(mountPreviousCursor(messages, 1, 'UPDATED', 'nextCursor')).to.equal('1000'); + }); + + it('should return null if messages length is less than or equal to count and previous is provided', () => { + const messages = [mockMessage(1000)]; + expect(mountPreviousCursor(messages, 1, 'UPDATED', undefined, 'previousCursor')).to.be.null; + }); + + it('should return the second last message cursor if messages length is greater than count and previous is provided', () => { + const messages = [mockMessage(1000), mockMessage(2000), mockMessage(3000)]; + expect(mountPreviousCursor(messages, 2, 'UPDATED', undefined, 'previousCursor')).to.equal('2000'); + }); + + it('should return the first message cursor if no next or previous is provided', () => { + const messages = [mockMessage(1000)]; + expect(mountPreviousCursor(messages, 1, 'UPDATED')).to.equal('1000'); + }); +}); + +describe('handleWithoutPagination', () => { + afterEach(() => { + messagesMock.findForUpdates.reset(); + messagesMock.trashFindDeletedAfter.reset(); + }); + + it('should return updated and deleted messages', async () => { + const rid = 'roomId'; + const lastUpdate = new Date(); + + const updatedMessages = [{ _id: '1', text: 'Hello' }]; + const deletedMessages = [{ _id: '2', _deletedAt: new Date() }]; + + messagesMock.findForUpdates.returns({ toArray: sinon.stub().resolves(updatedMessages) }); + messagesMock.trashFindDeletedAfter.returns({ toArray: sinon.stub().resolves(deletedMessages) }); + + const result = await handleWithoutPagination(rid, lastUpdate); + + expect(result).to.deep.equal({ + updated: updatedMessages, + deleted: deletedMessages, + }); + + sinon.assert.calledWith(messagesMock.findForUpdates, rid, { $gt: lastUpdate }, { sort: { ts: -1 } }); + sinon.assert.calledWith( + messagesMock.trashFindDeletedAfter, + lastUpdate, + { rid }, + { projection: { _id: 1, _deletedAt: 1 }, sort: { ts: -1 } }, + ); + }); + + it('should handle empty results', async () => { + const rid = 'roomId'; + const lastUpdate = new Date(); + + messagesMock.findForUpdates.returns({ toArray: sinon.stub().resolves([]) }); + messagesMock.trashFindDeletedAfter.returns({ toArray: sinon.stub().resolves([]) }); + + const result = await handleWithoutPagination(rid, lastUpdate); + + expect(result).to.deep.equal({ + updated: [], + deleted: [], + }); + }); +}); + +describe('handleCursorPagination', () => { + it('should return updated messages and cursor when type is UPDATED', async () => { + const rid = 'roomId'; + const count = 10; + const messages = [{ _id: 'msg1', _updatedAt: new Date('2024-01-01T00:00:00Z') }]; + messagesMock.findForUpdates.returns({ toArray: sinon.stub().resolves(messages) }); + + const result = await handleCursorPagination('UPDATED', rid, count); + + expect(messagesMock.findForUpdates.calledOnce).to.be.true; + expect(result.updated).to.deep.equal(messages); + expect(result.cursor).to.have.keys(['next', 'previous']); + }); + + it('should return deleted messages and cursor when type is DELETED', async () => { + const rid = 'roomId'; + const count = 10; + const messages = [{ _id: 'msg1', _deletedAt: new Date() }]; + messagesMock.trashFind.returns({ toArray: sinon.stub().resolves(messages) }); + + const result = await handleCursorPagination('DELETED', rid, count); + + expect(messagesMock.trashFind.calledOnce).to.be.true; + expect(result.deleted).to.deep.equal(messages); + expect(result.cursor).to.have.keys(['next', 'previous']); + }); + + it('should handle empty response correctly', async () => { + const rid = 'roomId'; + const count = 10; + messagesMock.findForUpdates.returns({ toArray: sinon.stub().resolves([]) }); + + const result = await handleCursorPagination('UPDATED', rid, count); + + expect(result.updated).to.deep.equal([]); + expect(result.cursor).to.deep.equal({ next: null, previous: null }); + }); +}); diff --git a/packages/model-typings/src/models/IMessagesModel.ts b/packages/model-typings/src/models/IMessagesModel.ts index 116b00fc6fee..e50e71f179bc 100644 --- a/packages/model-typings/src/models/IMessagesModel.ts +++ b/packages/model-typings/src/models/IMessagesModel.ts @@ -206,7 +206,7 @@ export interface IMessagesModel extends IBaseModel { getLastTimestamp(options?: FindOptions): Promise; findOneBySlackBotIdAndSlackTs(slackBotId: string, slackTs: Date): Promise; findByRoomIdAndMessageIds(rid: string, messageIds: string[], options?: FindOptions): FindCursor; - findForUpdates(roomId: string, timestamp: Date, options?: FindOptions): FindCursor; + findForUpdates(roomId: IMessage['rid'], timestamp: { $lt: Date } | { $gt: Date }, options?: FindOptions): FindCursor; updateUsernameOfEditByUserId(userId: string, username: string): Promise; updateAllUsernamesByUserId(userId: string, username: string): Promise; diff --git a/packages/rest-typings/src/v1/chat.ts b/packages/rest-typings/src/v1/chat.ts index 84e51b3c379c..3d1217042e4e 100644 --- a/packages/rest-typings/src/v1/chat.ts +++ b/packages/rest-typings/src/v1/chat.ts @@ -605,7 +605,11 @@ export const isChatGetMentionedMessagesProps = ajv.compile type ChatSyncMessages = { roomId: IRoom['_id']; - lastUpdate: string; + lastUpdate?: string; + count?: number; + next?: string; + previous?: string; + type?: 'UPDATED' | 'DELETED'; }; const ChatSyncMessagesSchema = { @@ -616,9 +620,27 @@ const ChatSyncMessagesSchema = { }, lastUpdate: { type: 'string', + nullable: true, + }, + count: { + type: 'number', + nullable: true, + }, + next: { + type: 'string', + nullable: true, + }, + previous: { + type: 'string', + nullable: true, + }, + type: { + type: 'string', + enum: ['UPDATED', 'DELETED'], + nullable: true, }, }, - required: ['roomId', 'lastUpdate'], + required: ['roomId'], additionalProperties: false, }; @@ -966,6 +988,10 @@ export type ChatEndpoints = { result: { updated: IMessage[]; deleted: IMessage[]; + cursor: { + next: string | null; + previous: string | null; + }; }; }; };