Skip to content

Commit

Permalink
Fix issue with subscriptions not being streamed correctly to clients
Browse files Browse the repository at this point in the history
  • Loading branch information
KevLehman committed Aug 26, 2022
1 parent 0790192 commit a131094
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 27 deletions.
4 changes: 2 additions & 2 deletions apps/meteor/app/livechat/server/lib/Helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import { callbacks } from '../../../../lib/callbacks';
import { Logger } from '../../../logger';
import { settings } from '../../../settings/server';
import { Apps, AppEvents } from '../../../apps/server';
import notifications from '../../../notifications/server/lib/Notifications';
import { sendNotification } from '../../../lib/server';
import { sendMessage } from '../../../lib/server/functions/sendMessage';
import { queueInquiry, saveQueueInquiry } from './QueueManager';
import { validateEmail as validatorFunc } from '../../../../lib/emailValidator';
import { api } from '../../../../server/sdk/api';

const logger = new Logger('LivechatHelper');

Expand Down Expand Up @@ -268,7 +268,7 @@ export const normalizeAgent = (agentId) => {
export const dispatchAgentDelegated = (rid, agentId) => {
const agent = normalizeAgent(agentId);

notifications.streamLivechatRoom.emit(rid, {
api.broadcast('omnichannel.room', rid, {
type: 'agentData',
data: agent,
});
Expand Down
6 changes: 3 additions & 3 deletions apps/meteor/app/livechat/server/lib/Livechat.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import { FileUpload } from '../../../file-upload/server';
import { normalizeTransferredByData, parseAgentCustomFields, updateDepartmentAgents, validateEmail } from './Helper';
import { Apps, AppEvents } from '../../../apps/server';
import { businessHourManager } from '../business-hour';
import notifications from '../../../notifications/server/lib/Notifications';
import { addUserRoles } from '../../../../server/lib/roles/addUserRoles';
import { removeUserFromRoles } from '../../../../server/lib/roles/removeUserFromRoles';
import { VideoConf } from '../../../../server/sdk';
import { api } from '../../../../server/sdk/api';

const logger = new Logger('Livechat');

Expand Down Expand Up @@ -1373,7 +1373,7 @@ export const Livechat = {
}

LivechatRooms.findOpenByAgent(userId).forEach((room) => {
notifications.streamLivechatRoom.emit(room._id, {
api.broadcast('omnichannel.room', room._id, {
type: 'agentStatus',
status,
});
Expand All @@ -1389,7 +1389,7 @@ export const Livechat = {
},

notifyRoomVisitorChange(roomId, visitor) {
notifications.streamLivechatRoom.emit(roomId, {
api.broadcast('omnichannel.room', roomId, {
type: 'visitorData',
visitor,
});
Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/ee/app/livechat-enterprise/server/lib/Helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import { Users, LivechatInquiry, LivechatRooms, Messages } from '../../../../../
import { settings } from '../../../../../app/settings/server';
import { RoutingManager } from '../../../../../app/livechat/server/lib/RoutingManager';
import { dispatchAgentDelegated } from '../../../../../app/livechat/server/lib/Helper';
import notifications from '../../../../../app/notifications/server/lib/Notifications';
import { logger, helperLogger } from './logger';
import { OmnichannelQueueInactivityMonitor } from './QueueInactivityMonitor';
import { api } from '../../../../../server/sdk/api';

export const getMaxNumberSimultaneousChat = async ({ agentId, departmentId }) => {
if (departmentId) {
Expand Down Expand Up @@ -81,7 +81,7 @@ export const dispatchInquiryPosition = async (inquiry, queueInfo) => {
const { position, department } = inquiry;
const data = await normalizeQueueInfo({ position, queueInfo, department });
const propagateInquiryPosition = Meteor.bindEnvironment((inquiry) => {
notifications.streamLivechatRoom.emit(inquiry.rid, {
api.broadcast('omnichannel.room', inquiry.rid, {
type: 'queueData',
data,
});
Expand Down
6 changes: 3 additions & 3 deletions apps/meteor/ee/server/lib/registerServiceModels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import { IntegrationsRaw } from '../../../server/models/raw/Integrations';
import { EmailInboxRaw } from '../../../server/models/raw/EmailInbox';
import { PbxEventsRaw } from '../../../server/models/raw/PbxEvents';

// TODO add trash param to model instances
export const registerServiceModels = (db: Db): void => {
// TODO add trash param to appropiate model instances
export const registerServiceModels = (db: Db, trash?: any): void => {
registerModel('IRolesModel', () => new RolesRaw(db));
registerModel('IRoomsModel', () => new RoomsRaw(db));
registerModel('ISettingsModel', () => new SettingsRaw(db));
registerModel('ISubscriptionsModel', () => new SubscriptionsRaw(db));
registerModel('ISubscriptionsModel', () => new SubscriptionsRaw(db, trash));
registerModel('ITeamModel', () => new TeamRaw(db));
registerModel('ITeamMemberModel', () => new TeamMemberRaw(db));
registerModel('IUsersModel', () => new UsersRaw(db));
Expand Down
6 changes: 3 additions & 3 deletions apps/meteor/ee/server/services/authorization/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { Authorization } from '../../../../server/services/authorization/service
import { getConnection } from '../mongo';
import { registerServiceModels } from '../../lib/registerServiceModels';

getConnection().then((db) => {
registerServiceModels(db);
getConnection().then(({ database, trash }) => {
registerServiceModels(database, trash);

api.registerService(new Authorization(db));
api.registerService(new Authorization(database));
});
14 changes: 8 additions & 6 deletions apps/meteor/ee/server/services/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export enum Collections {
Subscriptions = 'rocketchat_subscription',
UserSession = 'usersSessions',
User = 'users',
Trash = 'rocketchat_trash',
Trash = 'rocketchat__trash',
Messages = 'rocketchat_message',
Rooms = 'rocketchat_room',
Settings = 'rocketchat_settings',
Expand All @@ -27,27 +27,29 @@ function connectDb(options?: MongoClientOptions): Promise<MongoClient> {

let db: Db;

export const getConnection = ((): ((options?: MongoClientOptions) => Promise<Db>) => {
export const getConnection = ((): ((options?: MongoClientOptions) => Promise<{ database: Db; trash: Collection }>) => {
let client: Promise<MongoClient>;

return async (options): Promise<Db> => {
return async (options): Promise<{ database: Db; trash: Collection }> => {
if (db) {
return db;
return { database: db, trash: db.collection(Collections.Trash) };
}
if (!client) {
client = connectDb(options);
client.then((c) => {
db = c.db(name);
});
}

// if getConnection was called multiple times before it was connected, wait for the connection
return (await client).db(name);
const database = (await client).db(name);
return { database, trash: database.collection(Collections.Trash) };
};
})();

export async function getCollection<T>(name: Collections): Promise<Collection<T>> {
if (!db) {
db = await getConnection();
db = (await getConnection()).database;
}
return db.collection<T>(name);
}
2 changes: 1 addition & 1 deletion apps/meteor/ee/server/services/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@
"typescript": "~4.5.5"
},
"volta": {
"extends": "../../package.json"
"extends": "../../../package.json"
}
}
2 changes: 1 addition & 1 deletion apps/meteor/ee/server/services/stream-hub/StreamHub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class StreamHub extends ServiceClass implements IServiceClass {
protected name = 'hub';

async created(): Promise<void> {
const db = await getConnection({ maxPoolSize: 1 });
const db = (await getConnection({ maxPoolSize: 1 })).database;

const watcher = new DatabaseWatcher({ db });

Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/ee/server/services/stream-hub/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { api } from '../../../../server/sdk/api';
import { getConnection } from '../mongo';
import { registerServiceModels } from '../../lib/registerServiceModels';

getConnection().then(async (db) => {
registerServiceModels(db);
getConnection().then(async ({ database, trash }) => {
registerServiceModels(database, trash);

// need to import StreamHub service after models are registered
const { StreamHub } = await import('./StreamHub');
Expand Down
7 changes: 5 additions & 2 deletions apps/meteor/server/modules/listeners/listeners.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ export class ListenersModule {

// emit a removed event on msg stream to remove the user's stream-room-messages subscription when the user is removed from room
if (clientAction === 'removed') {
notifications.streamRoomMessage.__emit(subscription.u._id, clientAction, subscription);
notifications.streamRoomMessage.emit(subscription.u._id, clientAction, subscription);
}

notifications.streamUser.__emit(subscription.u._id, clientAction, subscription);
notifications.streamUser.emit(subscription.u._id, clientAction, subscription);

notifications.notifyUserInThisInstance(subscription.u._id, 'subscriptions-changed', clientAction, subscription);
});
Expand Down Expand Up @@ -334,5 +334,8 @@ export class ListenersModule {
service.onEvent('connector.statuschanged', (enabled): void => {
notifications.notifyLoggedInThisInstance('voip.statuschanged', enabled);
});
service.onEvent('omnichannel.room', (roomId, data): void => {
notifications.streamLivechatRoom.emitWithoutBroadcast(roomId, data);
});
}
}
4 changes: 4 additions & 0 deletions apps/meteor/server/sdk/lib/Events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ export type EventSignatures = {
diff?: undefined | Record<string, any>;
id: string;
}): void;
'omnichannel.room'(
roomId: string,
data: { type: 'agentStatus'; status: string } | { type: 'queueData' | 'agentData'; data: { [k: string]: unknown } },
): void;

// Send all events from here
'voip.events'(userId: string, data: VoipEventDataSignature): void;
Expand Down
4 changes: 2 additions & 2 deletions ee/apps/ddp-streamer/src/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { registerServiceModels } from '../../../../apps/meteor/ee/server/lib/reg

export const notifications = new NotificationsModule(Stream);

getConnection().then((db) => {
registerServiceModels(db);
getConnection().then(({ database, trash }) => {
registerServiceModels(database, trash);

notifications.configure();
});

0 comments on commit a131094

Please sign in to comment.