From a131094bded6b7441e4b485a456b74597b5c4e10 Mon Sep 17 00:00:00 2001 From: Kevin Aleman Date: Fri, 26 Aug 2022 10:36:57 -0600 Subject: [PATCH] Fix issue with subscriptions not being streamed correctly to clients --- apps/meteor/app/livechat/server/lib/Helper.js | 4 ++-- apps/meteor/app/livechat/server/lib/Livechat.js | 6 +++--- .../app/livechat-enterprise/server/lib/Helper.js | 4 ++-- apps/meteor/ee/server/lib/registerServiceModels.ts | 6 +++--- .../ee/server/services/authorization/service.ts | 6 +++--- apps/meteor/ee/server/services/mongo.ts | 14 ++++++++------ apps/meteor/ee/server/services/package.json | 2 +- .../ee/server/services/stream-hub/StreamHub.ts | 2 +- .../ee/server/services/stream-hub/service.ts | 4 ++-- .../server/modules/listeners/listeners.module.ts | 7 +++++-- apps/meteor/server/sdk/lib/Events.ts | 4 ++++ ee/apps/ddp-streamer/src/streams.ts | 4 ++-- 12 files changed, 36 insertions(+), 27 deletions(-) diff --git a/apps/meteor/app/livechat/server/lib/Helper.js b/apps/meteor/app/livechat/server/lib/Helper.js index 892568524ef2..f26890c35655 100644 --- a/apps/meteor/app/livechat/server/lib/Helper.js +++ b/apps/meteor/app/livechat/server/lib/Helper.js @@ -21,11 +21,11 @@ import { callbacks } from '../../../../lib/callbacks'; import { Logger } from '../../../logger'; import { settings } from '../../../settings/server'; import { Apps, AppEvents } from '../../../apps/server'; -import notifications from '../../../notifications/server/lib/Notifications'; import { sendNotification } from '../../../lib/server'; import { sendMessage } from '../../../lib/server/functions/sendMessage'; import { queueInquiry, saveQueueInquiry } from './QueueManager'; import { validateEmail as validatorFunc } from '../../../../lib/emailValidator'; +import { api } from '../../../../server/sdk/api'; const logger = new Logger('LivechatHelper'); @@ -268,7 +268,7 @@ export const normalizeAgent = (agentId) => { export const dispatchAgentDelegated = (rid, agentId) => { const agent = normalizeAgent(agentId); - notifications.streamLivechatRoom.emit(rid, { + api.broadcast('omnichannel.room', rid, { type: 'agentData', data: agent, }); diff --git a/apps/meteor/app/livechat/server/lib/Livechat.js b/apps/meteor/app/livechat/server/lib/Livechat.js index 17e7cd36ce0b..4ae363597153 100644 --- a/apps/meteor/app/livechat/server/lib/Livechat.js +++ b/apps/meteor/app/livechat/server/lib/Livechat.js @@ -37,10 +37,10 @@ import { FileUpload } from '../../../file-upload/server'; import { normalizeTransferredByData, parseAgentCustomFields, updateDepartmentAgents, validateEmail } from './Helper'; import { Apps, AppEvents } from '../../../apps/server'; import { businessHourManager } from '../business-hour'; -import notifications from '../../../notifications/server/lib/Notifications'; import { addUserRoles } from '../../../../server/lib/roles/addUserRoles'; import { removeUserFromRoles } from '../../../../server/lib/roles/removeUserFromRoles'; import { VideoConf } from '../../../../server/sdk'; +import { api } from '../../../../server/sdk/api'; const logger = new Logger('Livechat'); @@ -1373,7 +1373,7 @@ export const Livechat = { } LivechatRooms.findOpenByAgent(userId).forEach((room) => { - notifications.streamLivechatRoom.emit(room._id, { + api.broadcast('omnichannel.room', room._id, { type: 'agentStatus', status, }); @@ -1389,7 +1389,7 @@ export const Livechat = { }, notifyRoomVisitorChange(roomId, visitor) { - notifications.streamLivechatRoom.emit(roomId, { + api.broadcast('omnichannel.room', roomId, { type: 'visitorData', visitor, }); diff --git a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.js b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.js index dc955411c4b1..88a4592a5944 100644 --- a/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.js +++ b/apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.js @@ -12,9 +12,9 @@ import { Users, LivechatInquiry, LivechatRooms, Messages } from '../../../../../ import { settings } from '../../../../../app/settings/server'; import { RoutingManager } from '../../../../../app/livechat/server/lib/RoutingManager'; import { dispatchAgentDelegated } from '../../../../../app/livechat/server/lib/Helper'; -import notifications from '../../../../../app/notifications/server/lib/Notifications'; import { logger, helperLogger } from './logger'; import { OmnichannelQueueInactivityMonitor } from './QueueInactivityMonitor'; +import { api } from '../../../../../server/sdk/api'; export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }) => { if (departmentId) { @@ -81,7 +81,7 @@ export const dispatchInquiryPosition = async (inquiry, queueInfo) => { const { position, department } = inquiry; const data = await normalizeQueueInfo({ position, queueInfo, department }); const propagateInquiryPosition = Meteor.bindEnvironment((inquiry) => { - notifications.streamLivechatRoom.emit(inquiry.rid, { + api.broadcast('omnichannel.room', inquiry.rid, { type: 'queueData', data, }); diff --git a/apps/meteor/ee/server/lib/registerServiceModels.ts b/apps/meteor/ee/server/lib/registerServiceModels.ts index 3ac218897b30..7d79f6e14f06 100644 --- a/apps/meteor/ee/server/lib/registerServiceModels.ts +++ b/apps/meteor/ee/server/lib/registerServiceModels.ts @@ -20,12 +20,12 @@ import { IntegrationsRaw } from '../../../server/models/raw/Integrations'; import { EmailInboxRaw } from '../../../server/models/raw/EmailInbox'; import { PbxEventsRaw } from '../../../server/models/raw/PbxEvents'; -// TODO add trash param to model instances -export const registerServiceModels = (db: Db): void => { +// TODO add trash param to appropiate model instances +export const registerServiceModels = (db: Db, trash?: any): void => { registerModel('IRolesModel', () => new RolesRaw(db)); registerModel('IRoomsModel', () => new RoomsRaw(db)); registerModel('ISettingsModel', () => new SettingsRaw(db)); - registerModel('ISubscriptionsModel', () => new SubscriptionsRaw(db)); + registerModel('ISubscriptionsModel', () => new SubscriptionsRaw(db, trash)); registerModel('ITeamModel', () => new TeamRaw(db)); registerModel('ITeamMemberModel', () => new TeamMemberRaw(db)); registerModel('IUsersModel', () => new UsersRaw(db)); diff --git a/apps/meteor/ee/server/services/authorization/service.ts b/apps/meteor/ee/server/services/authorization/service.ts index 08cc60d61a81..230334e53ec0 100644 --- a/apps/meteor/ee/server/services/authorization/service.ts +++ b/apps/meteor/ee/server/services/authorization/service.ts @@ -5,8 +5,8 @@ import { Authorization } from '../../../../server/services/authorization/service import { getConnection } from '../mongo'; import { registerServiceModels } from '../../lib/registerServiceModels'; -getConnection().then((db) => { - registerServiceModels(db); +getConnection().then(({ database, trash }) => { + registerServiceModels(database, trash); - api.registerService(new Authorization(db)); + api.registerService(new Authorization(database)); }); diff --git a/apps/meteor/ee/server/services/mongo.ts b/apps/meteor/ee/server/services/mongo.ts index c5e1446694d7..e15c1de06bbd 100644 --- a/apps/meteor/ee/server/services/mongo.ts +++ b/apps/meteor/ee/server/services/mongo.ts @@ -9,7 +9,7 @@ export enum Collections { Subscriptions = 'rocketchat_subscription', UserSession = 'usersSessions', User = 'users', - Trash = 'rocketchat_trash', + Trash = 'rocketchat__trash', Messages = 'rocketchat_message', Rooms = 'rocketchat_room', Settings = 'rocketchat_settings', @@ -27,12 +27,12 @@ function connectDb(options?: MongoClientOptions): Promise { let db: Db; -export const getConnection = ((): ((options?: MongoClientOptions) => Promise) => { +export const getConnection = ((): ((options?: MongoClientOptions) => Promise<{ database: Db; trash: Collection }>) => { let client: Promise; - return async (options): Promise => { + return async (options): Promise<{ database: Db; trash: Collection }> => { if (db) { - return db; + return { database: db, trash: db.collection(Collections.Trash) }; } if (!client) { client = connectDb(options); @@ -40,14 +40,16 @@ export const getConnection = ((): ((options?: MongoClientOptions) => Promise db = c.db(name); }); } + // if getConnection was called multiple times before it was connected, wait for the connection - return (await client).db(name); + const database = (await client).db(name); + return { database, trash: database.collection(Collections.Trash) }; }; })(); export async function getCollection(name: Collections): Promise> { if (!db) { - db = await getConnection(); + db = (await getConnection()).database; } return db.collection(name); } diff --git a/apps/meteor/ee/server/services/package.json b/apps/meteor/ee/server/services/package.json index 9d6c15cd42a1..f22ecc5cc627 100644 --- a/apps/meteor/ee/server/services/package.json +++ b/apps/meteor/ee/server/services/package.json @@ -68,6 +68,6 @@ "typescript": "~4.5.5" }, "volta": { - "extends": "../../package.json" + "extends": "../../../package.json" } } diff --git a/apps/meteor/ee/server/services/stream-hub/StreamHub.ts b/apps/meteor/ee/server/services/stream-hub/StreamHub.ts index 99d2f076adf7..72738d7e92af 100755 --- a/apps/meteor/ee/server/services/stream-hub/StreamHub.ts +++ b/apps/meteor/ee/server/services/stream-hub/StreamHub.ts @@ -9,7 +9,7 @@ export class StreamHub extends ServiceClass implements IServiceClass { protected name = 'hub'; async created(): Promise { - const db = await getConnection({ maxPoolSize: 1 }); + const db = (await getConnection({ maxPoolSize: 1 })).database; const watcher = new DatabaseWatcher({ db }); diff --git a/apps/meteor/ee/server/services/stream-hub/service.ts b/apps/meteor/ee/server/services/stream-hub/service.ts index 7a395c8c4945..cef7b157f2d3 100755 --- a/apps/meteor/ee/server/services/stream-hub/service.ts +++ b/apps/meteor/ee/server/services/stream-hub/service.ts @@ -4,8 +4,8 @@ import { api } from '../../../../server/sdk/api'; import { getConnection } from '../mongo'; import { registerServiceModels } from '../../lib/registerServiceModels'; -getConnection().then(async (db) => { - registerServiceModels(db); +getConnection().then(async ({ database, trash }) => { + registerServiceModels(database, trash); // need to import StreamHub service after models are registered const { StreamHub } = await import('./StreamHub'); diff --git a/apps/meteor/server/modules/listeners/listeners.module.ts b/apps/meteor/server/modules/listeners/listeners.module.ts index 2ca66ed5db11..922db5272ece 100644 --- a/apps/meteor/server/modules/listeners/listeners.module.ts +++ b/apps/meteor/server/modules/listeners/listeners.module.ts @@ -135,10 +135,10 @@ export class ListenersModule { // emit a removed event on msg stream to remove the user's stream-room-messages subscription when the user is removed from room if (clientAction === 'removed') { - notifications.streamRoomMessage.__emit(subscription.u._id, clientAction, subscription); + notifications.streamRoomMessage.emit(subscription.u._id, clientAction, subscription); } - notifications.streamUser.__emit(subscription.u._id, clientAction, subscription); + notifications.streamUser.emit(subscription.u._id, clientAction, subscription); notifications.notifyUserInThisInstance(subscription.u._id, 'subscriptions-changed', clientAction, subscription); }); @@ -334,5 +334,8 @@ export class ListenersModule { service.onEvent('connector.statuschanged', (enabled): void => { notifications.notifyLoggedInThisInstance('voip.statuschanged', enabled); }); + service.onEvent('omnichannel.room', (roomId, data): void => { + notifications.streamLivechatRoom.emitWithoutBroadcast(roomId, data); + }); } } diff --git a/apps/meteor/server/sdk/lib/Events.ts b/apps/meteor/server/sdk/lib/Events.ts index a853ff1b976f..4c683f62aa52 100644 --- a/apps/meteor/server/sdk/lib/Events.ts +++ b/apps/meteor/server/sdk/lib/Events.ts @@ -124,6 +124,10 @@ export type EventSignatures = { diff?: undefined | Record; id: string; }): void; + 'omnichannel.room'( + roomId: string, + data: { type: 'agentStatus'; status: string } | { type: 'queueData' | 'agentData'; data: { [k: string]: unknown } }, + ): void; // Send all events from here 'voip.events'(userId: string, data: VoipEventDataSignature): void; diff --git a/ee/apps/ddp-streamer/src/streams.ts b/ee/apps/ddp-streamer/src/streams.ts index c1113f08d7b1..ea6f11df71c2 100644 --- a/ee/apps/ddp-streamer/src/streams.ts +++ b/ee/apps/ddp-streamer/src/streams.ts @@ -5,8 +5,8 @@ import { registerServiceModels } from '../../../../apps/meteor/ee/server/lib/reg export const notifications = new NotificationsModule(Stream); -getConnection().then((db) => { - registerServiceModels(db); +getConnection().then(({ database, trash }) => { + registerServiceModels(database, trash); notifications.configure(); });