diff --git a/packages/cli/src/collaboration/__tests__/collaboration.state.test.ts b/packages/cli/src/collaboration/__tests__/collaboration.state.test.ts new file mode 100644 index 00000000000000..4435645d7a1147 --- /dev/null +++ b/packages/cli/src/collaboration/__tests__/collaboration.state.test.ts @@ -0,0 +1,96 @@ +import { CollaborationState } from '../collaboration.state'; +import type { CacheService } from '@/services/cache/cache.service'; +import { mock } from 'jest-mock-extended'; + +const origDate = global.Date; + +const mockDateFactory = (currentDate: string) => { + return class CustomDate extends origDate { + constructor() { + super(currentDate); + } + } as DateConstructor; +}; + +describe('CollaborationState', () => { + let collaborationState: CollaborationState; + let mockCacheService: jest.Mocked; + + beforeEach(() => { + mockCacheService = mock(); + collaborationState = new CollaborationState(mockCacheService); + }); + + afterEach(() => { + global.Date = origDate; + }); + + const workflowId = 'workflow'; + + describe('addActiveWorkflowUser', () => { + it('should add workflow user with correct cache key and value', async () => { + // Arrange + global.Date = mockDateFactory('2023-01-01T00:00:00.000Z'); + + // Act + await collaborationState.addActiveWorkflowUser(workflowId, 'userId'); + + // Assert + expect(mockCacheService.setHash).toHaveBeenCalledWith('collaboration:workflow', { + userId: '2023-01-01T00:00:00.000Z', + }); + }); + }); + + describe('removeActiveWorkflowUser', () => { + it('should remove workflow user with correct cache key', async () => { + // Act + await collaborationState.removeActiveWorkflowUser(workflowId, 'userId'); + + // Assert + expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith( + 'collaboration:workflow', + 'userId', + ); + }); + }); + + describe('getActiveWorkflowUsers', () => { + it('should get workflows with correct cache key', async () => { + // Act + const users = await collaborationState.getActiveWorkflowUsers(workflowId); + + // Assert + expect(mockCacheService.getHash).toHaveBeenCalledWith('collaboration:workflow'); + expect(users).toBeEmptyArray(); + }); + + it('should get workflow users that are not expired', async () => { + // Arrange + const nowMinus16Minutes = new Date(); + nowMinus16Minutes.setMinutes(nowMinus16Minutes.getMinutes() - 16); + const now = new Date().toISOString(); + + mockCacheService.getHash.mockResolvedValueOnce({ + expiredUserId: nowMinus16Minutes.toISOString(), + notExpiredUserId: now, + }); + + // Act + const users = await collaborationState.getActiveWorkflowUsers(workflowId); + + // Assert + expect(users).toEqual([ + { + lastSeen: now, + userId: 'notExpiredUserId', + }, + ]); + // removes expired users from the cache + expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith( + 'collaboration:workflow', + 'expiredUserId', + ); + }); + }); +}); diff --git a/packages/cli/src/collaboration/collaboration.message.ts b/packages/cli/src/collaboration/collaboration.message.ts new file mode 100644 index 00000000000000..e61a9fe9acb344 --- /dev/null +++ b/packages/cli/src/collaboration/collaboration.message.ts @@ -0,0 +1,35 @@ +import { z } from 'zod'; + +export type CollaborationMessage = WorkflowOpenedMessage | WorkflowClosedMessage; + +export const workflowOpenedMessageSchema = z + .object({ + type: z.literal('workflowOpened'), + workflowId: z.string().min(1), + }) + .strict(); + +export const workflowClosedMessageSchema = z + .object({ + type: z.literal('workflowClosed'), + workflowId: z.string().min(1), + }) + .strict(); + +export const workflowMessageSchema = z.discriminatedUnion('type', [ + workflowOpenedMessageSchema, + workflowClosedMessageSchema, +]); + +export type WorkflowOpenedMessage = z.infer; + +export type WorkflowClosedMessage = z.infer; + +export type WorkflowMessage = z.infer; + +/** + * Parses the given message and ensure it's of type WorkflowMessage + */ +export const parseWorkflowMessage = async (msg: unknown) => { + return await workflowMessageSchema.parseAsync(msg); +}; diff --git a/packages/cli/src/collaboration/collaboration.service.ts b/packages/cli/src/collaboration/collaboration.service.ts new file mode 100644 index 00000000000000..775d3791fc500e --- /dev/null +++ b/packages/cli/src/collaboration/collaboration.service.ts @@ -0,0 +1,120 @@ +import type { Workflow } from 'n8n-workflow'; +import { Service } from 'typedi'; +import { Push } from '../push'; +import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message'; +import { parseWorkflowMessage } from './collaboration.message'; +import type { IActiveWorkflowUsersChanged } from '../interfaces'; +import type { OnPushMessage } from '@/push/types'; +import { UserRepository } from '@/databases/repositories/user.repository'; +import type { User } from '@/databases/entities/user'; +import { CollaborationState } from '@/collaboration/collaboration.state'; +import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository'; +import { UserService } from '@/services/user.service'; +import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow'; + +/** + * Service for managing collaboration feature between users. E.g. keeping + * track of active users for a workflow. + */ +@Service() +export class CollaborationService { + constructor( + private readonly push: Push, + private readonly state: CollaborationState, + private readonly userRepository: UserRepository, + private readonly userService: UserService, + private readonly sharedWorkflowRepository: SharedWorkflowRepository, + ) {} + + init() { + this.push.on('message', async (event: OnPushMessage) => { + try { + await this.handleUserMessage(event.userId, event.msg); + } catch (error) { + ErrorReporterProxy.error( + new ApplicationError('Error handling CollaborationService push message', { + extra: { + msg: event.msg, + userId: event.userId, + }, + cause: error, + }), + ); + } + }); + } + + async handleUserMessage(userId: User['id'], msg: unknown) { + const workflowMessage = await parseWorkflowMessage(msg); + + if (workflowMessage.type === 'workflowOpened') { + await this.handleWorkflowOpened(userId, workflowMessage); + } else if (workflowMessage.type === 'workflowClosed') { + await this.handleWorkflowClosed(userId, workflowMessage); + } + } + + private async handleWorkflowOpened(userId: User['id'], msg: WorkflowOpenedMessage) { + const { workflowId } = msg; + + if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) { + return; + } + + await this.state.addActiveWorkflowUser(workflowId, userId); + + await this.sendWorkflowUsersChangedMessage(workflowId); + } + + private async handleWorkflowClosed(userId: User['id'], msg: WorkflowClosedMessage) { + const { workflowId } = msg; + + if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) { + return; + } + + await this.state.removeActiveWorkflowUser(workflowId, userId); + + await this.sendWorkflowUsersChangedMessage(workflowId); + } + + private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) { + // We have already validated that all active workflow users + // have proper access to the workflow, so we don't need to validate it again + const activeWorkflowUsers = await this.state.getActiveWorkflowUsers(workflowId); + const workflowUserIds = activeWorkflowUsers.map((user) => user.userId); + + if (workflowUserIds.length === 0) { + return; + } + const users = await this.userRepository.getByIds(this.userRepository.manager, workflowUserIds); + + const msgData: IActiveWorkflowUsersChanged = { + workflowId, + activeUsers: await Promise.all( + users.map(async (user) => ({ + user: await this.userService.toPublic(user), + lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)! + .lastSeen, + })), + ), + }; + + this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds); + } + + private async hasUserAccessToWorkflow(userId: User['id'], workflowId: Workflow['id']) { + const user = await this.userRepository.findOneBy({ + id: userId, + }); + if (!user) { + return false; + } + + const workflow = await this.sharedWorkflowRepository.findWorkflowForUser(workflowId, user, [ + 'workflow:read', + ]); + + return !!workflow; + } +} diff --git a/packages/cli/src/collaboration/collaboration.state.ts b/packages/cli/src/collaboration/collaboration.state.ts new file mode 100644 index 00000000000000..d110bf20dda3b3 --- /dev/null +++ b/packages/cli/src/collaboration/collaboration.state.ts @@ -0,0 +1,110 @@ +import type { ActiveWorkflowUser } from '@/collaboration/collaboration.types'; +import { Time } from '@/constants'; +import type { Iso8601DateTimeString } from '@/interfaces'; +import { CacheService } from '@/services/cache/cache.service'; +import type { User } from '@/databases/entities/user'; +import { type Workflow } from 'n8n-workflow'; +import { Service } from 'typedi'; + +type WorkflowCacheHash = Record; + +/** + * State management for the collaboration service. Workflow active + * users are stored in a hash in the following format: + * { + * [workflowId] -> { + * [userId] -> lastSeenAsIso8601String + * } + * } + */ +@Service() +export class CollaborationState { + /** + * After how many minutes of inactivity a user should be removed + * as being an active user of a workflow. + */ + public readonly inactivityCleanUpTime = 15 * Time.minutes.toMilliseconds; + + constructor(private readonly cache: CacheService) {} + + /** + * Mark user active for given workflow + */ + async addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) { + const cacheKey = this.formWorkflowCacheKey(workflowId); + const cacheEntry: WorkflowCacheHash = { + [userId]: new Date().toISOString(), + }; + + await this.cache.setHash(cacheKey, cacheEntry); + } + + /** + * Remove user from workflow's active users + */ + async removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) { + const cacheKey = this.formWorkflowCacheKey(workflowId); + + await this.cache.deleteFromHash(cacheKey, userId); + } + + async getActiveWorkflowUsers(workflowId: Workflow['id']): Promise { + const cacheKey = this.formWorkflowCacheKey(workflowId); + + const cacheValue = await this.cache.getHash(cacheKey); + if (!cacheValue) { + return []; + } + + const workflowActiveUsers = this.cacheHashToWorkflowActiveUsers(cacheValue); + const [expired, stillActive] = this.splitToExpiredAndStillActive(workflowActiveUsers); + + if (expired.length > 0) { + void this.removeExpiredUsersForWorkflow(workflowId, expired); + } + + return stillActive; + } + + private formWorkflowCacheKey(workflowId: Workflow['id']) { + return `collaboration:${workflowId}`; + } + + private splitToExpiredAndStillActive(workflowUsers: ActiveWorkflowUser[]) { + const expired: ActiveWorkflowUser[] = []; + const stillActive: ActiveWorkflowUser[] = []; + + for (const user of workflowUsers) { + if (this.hasUserExpired(user.lastSeen)) { + expired.push(user); + } else { + stillActive.push(user); + } + } + + return [expired, stillActive]; + } + + private async removeExpiredUsersForWorkflow( + workflowId: Workflow['id'], + expiredUsers: ActiveWorkflowUser[], + ) { + const cacheKey = this.formWorkflowCacheKey(workflowId); + await Promise.all( + expiredUsers.map(async (user) => await this.cache.deleteFromHash(cacheKey, user.userId)), + ); + } + + private cacheHashToWorkflowActiveUsers(workflowCacheEntry: WorkflowCacheHash) { + return Object.entries(workflowCacheEntry).map(([userId, lastSeen]) => ({ + userId, + lastSeen, + })); + } + + private hasUserExpired(lastSeenString: Iso8601DateTimeString) { + const expiryTime = new Date(lastSeenString).getTime() + this.inactivityCleanUpTime; + + return Date.now() > expiryTime; + } +} diff --git a/packages/cli/src/collaboration/collaboration.types.ts b/packages/cli/src/collaboration/collaboration.types.ts new file mode 100644 index 00000000000000..d2a0591395c8e1 --- /dev/null +++ b/packages/cli/src/collaboration/collaboration.types.ts @@ -0,0 +1,7 @@ +import type { Iso8601DateTimeString } from '@/interfaces'; +import type { User } from '@/databases/entities/user'; + +export type ActiveWorkflowUser = { + userId: User['id']; + lastSeen: Iso8601DateTimeString; +}; diff --git a/packages/cli/src/interfaces.ts b/packages/cli/src/interfaces.ts index 78a597e5b39774..8b44008261b871 100644 --- a/packages/cli/src/interfaces.ts +++ b/packages/cli/src/interfaces.ts @@ -290,7 +290,13 @@ export type IPushData = | PushDataWorkerStatusMessage | PushDataWorkflowActivated | PushDataWorkflowDeactivated - | PushDataWorkflowFailedToActivate; + | PushDataWorkflowFailedToActivate + | PushDataActiveWorkflowUsersChanged; + +type PushDataActiveWorkflowUsersChanged = { + data: IActiveWorkflowUsersChanged; + type: 'activeWorkflowUsersChanged'; +}; type PushDataWorkflowFailedToActivate = { data: IWorkflowFailedToActivate; @@ -362,6 +368,19 @@ export type PushDataNodeDescriptionUpdated = { type: 'nodeDescriptionUpdated'; }; +/** DateTime in the Iso8601 format, e.g. 2024-10-31T00:00:00.123Z */ +export type Iso8601DateTimeString = string; + +export interface IActiveWorkflowUser { + user: PublicUser; + lastSeen: Iso8601DateTimeString; +} + +export interface IActiveWorkflowUsersChanged { + workflowId: Workflow['id']; + activeUsers: IActiveWorkflowUser[]; +} + export interface IActiveWorkflowAdded { workflowId: Workflow['id']; } diff --git a/packages/cli/src/push/__tests__/websocket.push.test.ts b/packages/cli/src/push/__tests__/websocket.push.test.ts index b1202b3a13b25b..158264827c1147 100644 --- a/packages/cli/src/push/__tests__/websocket.push.test.ts +++ b/packages/cli/src/push/__tests__/websocket.push.test.ts @@ -7,6 +7,7 @@ import { Logger } from '@/logger'; import type { PushDataExecutionRecovered } from '@/interfaces'; import { mockInstance } from '@test/mocking'; +import type { User } from '@/databases/entities/user'; jest.useFakeTimers(); @@ -27,6 +28,7 @@ const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked< describe('WebSocketPush', () => { const pushRef1 = 'test-session1'; const pushRef2 = 'test-session2'; + const userId: User['id'] = 'test-user'; mockInstance(Logger); const webSocketPush = Container.get(WebSocketPush); @@ -35,27 +37,31 @@ describe('WebSocketPush', () => { beforeEach(() => { jest.resetAllMocks(); + mockWebSocket1.removeAllListeners(); + mockWebSocket2.removeAllListeners(); }); it('can add a connection', () => { - webSocketPush.add(pushRef1, mockWebSocket1); + webSocketPush.add(pushRef1, userId, mockWebSocket1); expect(mockWebSocket1.listenerCount('close')).toBe(1); expect(mockWebSocket1.listenerCount('pong')).toBe(1); + expect(mockWebSocket1.listenerCount('message')).toBe(1); }); it('closes a connection', () => { - webSocketPush.add(pushRef1, mockWebSocket1); + webSocketPush.add(pushRef1, userId, mockWebSocket1); mockWebSocket1.emit('close'); + expect(mockWebSocket1.listenerCount('message')).toBe(0); expect(mockWebSocket1.listenerCount('close')).toBe(0); expect(mockWebSocket1.listenerCount('pong')).toBe(0); }); it('sends data to one connection', () => { - webSocketPush.add(pushRef1, mockWebSocket1); - webSocketPush.add(pushRef2, mockWebSocket2); + webSocketPush.add(pushRef1, userId, mockWebSocket1); + webSocketPush.add(pushRef2, userId, mockWebSocket2); const data: PushDataExecutionRecovered = { type: 'executionRecovered', data: { @@ -80,8 +86,8 @@ describe('WebSocketPush', () => { }); it('sends data to all connections', () => { - webSocketPush.add(pushRef1, mockWebSocket1); - webSocketPush.add(pushRef2, mockWebSocket2); + webSocketPush.add(pushRef1, userId, mockWebSocket1); + webSocketPush.add(pushRef2, userId, mockWebSocket2); const data: PushDataExecutionRecovered = { type: 'executionRecovered', data: { @@ -105,12 +111,55 @@ describe('WebSocketPush', () => { }); it('pings all connections', () => { - webSocketPush.add(pushRef1, mockWebSocket1); - webSocketPush.add(pushRef2, mockWebSocket2); + webSocketPush.add(pushRef1, userId, mockWebSocket1); + webSocketPush.add(pushRef2, userId, mockWebSocket2); jest.runOnlyPendingTimers(); expect(mockWebSocket1.ping).toHaveBeenCalled(); expect(mockWebSocket2.ping).toHaveBeenCalled(); }); + + it('sends data to all users connections', () => { + webSocketPush.add(pushRef1, userId, mockWebSocket1); + webSocketPush.add(pushRef2, userId, mockWebSocket2); + const data: PushDataExecutionRecovered = { + type: 'executionRecovered', + data: { + executionId: 'test-execution-id', + }, + }; + + webSocketPush.sendToUsers('executionRecovered', data, [userId]); + + const expectedMsg = JSON.stringify({ + type: 'executionRecovered', + data: { + type: 'executionRecovered', + data: { + executionId: 'test-execution-id', + }, + }, + }); + expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); + expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); + }); + + it('emits message event when connection receives data', () => { + const mockOnMessageReceived = jest.fn(); + webSocketPush.on('message', mockOnMessageReceived); + webSocketPush.add(pushRef1, userId, mockWebSocket1); + webSocketPush.add(pushRef2, userId, mockWebSocket2); + + const data = { test: 'data' }; + const buffer = Buffer.from(JSON.stringify(data)); + + mockWebSocket1.emit('message', buffer); + + expect(mockOnMessageReceived).toHaveBeenCalledWith({ + msg: data, + pushRef: pushRef1, + userId, + }); + }); }); diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 88595403308bac..20b43283de477b 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -1,6 +1,13 @@ import { assert, jsonStringify } from 'n8n-workflow'; import type { IPushDataType } from '@/interfaces'; import type { Logger } from '@/logger'; +import type { User } from '@/databases/entities/user'; +import { TypedEmitter } from '@/typed-emitter'; +import type { OnPushMessage } from '@/push/types'; + +export interface AbstractPushEvents { + message: OnPushMessage; +} /** * Abstract class for two-way push communication. @@ -8,16 +15,20 @@ import type { Logger } from '@/logger'; * * @emits message when a message is received from a client */ -export abstract class AbstractPush { +export abstract class AbstractPush extends TypedEmitter { protected connections: Record = {}; + protected userIdByPushRef: Record = {}; + protected abstract close(connection: T): void; protected abstract sendToOneConnection(connection: T, data: string): void; - constructor(protected readonly logger: Logger) {} + constructor(protected readonly logger: Logger) { + super(); + } - protected add(pushRef: string, connection: T) { - const { connections } = this; + protected add(pushRef: string, userId: User['id'], connection: T) { + const { connections, userIdByPushRef } = this; this.logger.debug('Add editor-UI session', { pushRef }); const existingConnection = connections[pushRef]; @@ -28,6 +39,15 @@ export abstract class AbstractPush { } connections[pushRef] = connection; + userIdByPushRef[pushRef] = userId; + } + + protected onMessageReceived(pushRef: string, msg: unknown) { + this.logger.debug('Received message from editor-UI', { pushRef, msg }); + + const userId = this.userIdByPushRef[pushRef]; + + this.emit('message', { pushRef, userId, msg }); } protected remove(pushRef?: string) { @@ -36,6 +56,7 @@ export abstract class AbstractPush { this.logger.debug('Removed editor-UI session', { pushRef }); delete this.connections[pushRef]; + delete this.userIdByPushRef[pushRef]; } private sendTo(type: IPushDataType, data: unknown, pushRefs: string[]) { @@ -66,6 +87,15 @@ export abstract class AbstractPush { this.sendTo(type, data, [pushRef]); } + sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { + const { connections } = this; + const userPushRefs = Object.keys(connections).filter((pushRef) => + userIds.includes(this.userIdByPushRef[pushRef]), + ); + + this.sendTo(type, data, userPushRefs); + } + closeAllConnections() { for (const pushRef in this.connections) { // Signal the connection that we want to close it. diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index defe07019610e8..b01e085cec289d 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -15,11 +15,13 @@ import { OrchestrationService } from '@/services/orchestration.service'; import { SSEPush } from './sse.push'; import { WebSocketPush } from './websocket.push'; -import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; +import type { OnPushMessage, PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; import { TypedEmitter } from '@/typed-emitter'; +import type { User } from '@/databases/entities/user'; type PushEvents = { editorUiConnected: string; + message: OnPushMessage; }; const useWebSockets = config.getEnv('push.backend') === 'websocket'; @@ -33,16 +35,21 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket'; */ @Service() export class Push extends TypedEmitter { + public isBidirectional = useWebSockets; + private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); constructor(private readonly orchestrationService: OrchestrationService) { super(); + + if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg)); } handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { const { ws, query: { pushRef }, + user, } = req; if (!pushRef) { @@ -55,9 +62,9 @@ export class Push extends TypedEmitter { } if (req.ws) { - (this.backend as WebSocketPush).add(pushRef, req.ws); + (this.backend as WebSocketPush).add(pushRef, user.id, req.ws); } else if (!useWebSockets) { - (this.backend as SSEPush).add(pushRef, { req, res }); + (this.backend as SSEPush).add(pushRef, user.id, { req, res }); } else { res.status(401).send('Unauthorized'); return; @@ -90,6 +97,10 @@ export class Push extends TypedEmitter { return this.backend; } + sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { + this.backend.sendToUsers(type, data, userIds); + } + @OnShutdown() onShutdown() { this.backend.closeAllConnections(); diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index 38779ed730f9e9..e78134eac367b0 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -5,6 +5,7 @@ import { Logger } from '@/logger'; import { AbstractPush } from './abstract.push'; import type { PushRequest, PushResponse } from './types'; +import type { User } from '@/databases/entities/user'; type Connection = { req: PushRequest; res: PushResponse }; @@ -22,8 +23,8 @@ export class SSEPush extends AbstractPush { }); } - add(pushRef: string, connection: Connection) { - super.add(pushRef, connection); + add(pushRef: string, userId: User['id'], connection: Connection) { + super.add(pushRef, userId, connection); this.channel.addClient(connection.req, connection.res); } diff --git a/packages/cli/src/push/types.ts b/packages/cli/src/push/types.ts index a12e582213be95..0a9d6f5b6cad9a 100644 --- a/packages/cli/src/push/types.ts +++ b/packages/cli/src/push/types.ts @@ -2,6 +2,7 @@ import type { Response } from 'express'; import type { WebSocket } from 'ws'; import type { AuthenticatedRequest } from '@/requests'; +import type { User } from '@/databases/entities/user'; // TODO: move all push related types here @@ -11,3 +12,9 @@ export type SSEPushRequest = PushRequest & { ws: undefined }; export type WebSocketPushRequest = PushRequest & { ws: WebSocket }; export type PushResponse = Response & { req: PushRequest }; + +export interface OnPushMessage { + pushRef: string; + userId: User['id']; + msg: unknown; +} diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index 733eebdc605320..79ef00fffb1b1c 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -2,6 +2,8 @@ import type WebSocket from 'ws'; import { Service } from 'typedi'; import { Logger } from '@/logger'; import { AbstractPush } from './abstract.push'; +import type { User } from '@/databases/entities/user'; +import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow'; function heartbeat(this: WebSocket) { this.isAlive = true; @@ -16,17 +18,43 @@ export class WebSocketPush extends AbstractPush { setInterval(() => this.pingAll(), 60 * 1000); } - add(pushRef: string, connection: WebSocket) { + add(pushRef: string, userId: User['id'], connection: WebSocket) { connection.isAlive = true; connection.on('pong', heartbeat); - super.add(pushRef, connection); + super.add(pushRef, userId, connection); + + const onMessage = (data: WebSocket.RawData) => { + try { + const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data); + + this.onMessageReceived(pushRef, JSON.parse(buffer.toString('utf8'))); + } catch (error) { + ErrorReporterProxy.error( + new ApplicationError('Error parsing push message', { + extra: { + userId, + data, + }, + cause: error, + }), + ); + this.logger.error("Couldn't parse message from editor-UI", { + error: error as unknown, + pushRef, + data, + }); + } + }; // Makes sure to remove the session if the connection is closed connection.once('close', () => { connection.off('pong', heartbeat); + connection.off('message', onMessage); this.remove(pushRef); }); + + connection.on('message', onMessage); } protected close(connection: WebSocket): void { diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 9176402a8898cc..82e12b0386c4fa 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -27,7 +27,7 @@ import type { ICredentialsOverwrite } from '@/interfaces'; import { CredentialsOverwrites } from '@/credentials-overwrites'; import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import * as ResponseHelper from '@/response-helper'; -import { setupPushServer, setupPushHandler } from '@/push'; +import { setupPushServer, setupPushHandler, Push } from '@/push'; import { isLdapEnabled } from '@/ldap/helpers.ee'; import { AbstractServer } from '@/abstract-server'; import { PostHogClient } from '@/posthog'; @@ -212,6 +212,18 @@ export class Server extends AbstractServer { const { restEndpoint, app } = this; setupPushHandler(restEndpoint, app); + const push = Container.get(Push); + if (push.isBidirectional) { + const { CollaborationService } = await import('@/collaboration/collaboration.service'); + + const collaborationService = Container.get(CollaborationService); + collaborationService.init(); + } else { + this.logger.warn( + 'Collaboration features are disabled because push is configured unidirectional. Use N8N_PUSH_BACKEND=websocket environment variable to enable them.', + ); + } + if (config.getEnv('executions.mode') === 'queue') { const { ScalingService } = await import('@/scaling/scaling.service'); await Container.get(ScalingService).setupQueue(); diff --git a/packages/cli/test/integration/collaboration/collaboration.service.test.ts b/packages/cli/test/integration/collaboration/collaboration.service.test.ts new file mode 100644 index 00000000000000..81b00dc8667555 --- /dev/null +++ b/packages/cli/test/integration/collaboration/collaboration.service.test.ts @@ -0,0 +1,188 @@ +import { CollaborationService } from '@/collaboration/collaboration.service'; +import { Push } from '@/push'; +import { CacheService } from '@/services/cache/cache.service'; +import { mock } from 'jest-mock-extended'; +import * as testDb from '../shared/test-db'; +import Container from 'typedi'; +import type { User } from '@/databases/entities/user'; +import { createMember, createOwner } from '@test-integration/db/users'; +import type { + WorkflowClosedMessage, + WorkflowOpenedMessage, +} from '@/collaboration/collaboration.message'; +import { createWorkflow, shareWorkflowWithUsers } from '@test-integration/db/workflows'; +import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; +import { mockInstance } from '@test/mocking'; +import { UserService } from '@/services/user.service'; + +describe('CollaborationService', () => { + mockInstance(Push, new Push(mock())); + let pushService: Push; + let collaborationService: CollaborationService; + let owner: User; + let memberWithoutAccess: User; + let memberWithAccess: User; + let workflow: WorkflowEntity; + let userService: UserService; + let cacheService: CacheService; + + beforeAll(async () => { + await testDb.init(); + + pushService = Container.get(Push); + collaborationService = Container.get(CollaborationService); + userService = Container.get(UserService); + cacheService = Container.get(CacheService); + + await cacheService.init(); + + [owner, memberWithAccess, memberWithoutAccess] = await Promise.all([ + createOwner(), + createMember(), + createMember(), + ]); + workflow = await createWorkflow({}, owner); + await shareWorkflowWithUsers(workflow, [memberWithAccess]); + }); + + afterEach(async () => { + jest.resetAllMocks(); + await cacheService.reset(); + }); + + const sendWorkflowOpenedMessage = async (workflowId: string, userId: string) => { + const openMessage: WorkflowOpenedMessage = { + type: 'workflowOpened', + workflowId, + }; + + return await collaborationService.handleUserMessage(userId, openMessage); + }; + + const sendWorkflowClosedMessage = async (workflowId: string, userId: string) => { + const openMessage: WorkflowClosedMessage = { + type: 'workflowClosed', + workflowId, + }; + + return await collaborationService.handleUserMessage(userId, openMessage); + }; + + describe('workflow opened message', () => { + it('should emit activeWorkflowUsersChanged after workflowOpened', async () => { + // Arrange + const sendToUsersSpy = jest.spyOn(pushService, 'sendToUsers'); + + // Act + await sendWorkflowOpenedMessage(workflow.id, owner.id); + await sendWorkflowOpenedMessage(workflow.id, memberWithAccess.id); + + // Assert + expect(sendToUsersSpy).toHaveBeenNthCalledWith( + 1, + 'activeWorkflowUsersChanged', + { + activeUsers: [ + { + lastSeen: expect.any(String), + user: { + ...(await userService.toPublic(owner)), + isPending: false, + }, + }, + ], + workflowId: workflow.id, + }, + [owner.id], + ); + expect(sendToUsersSpy).toHaveBeenNthCalledWith( + 2, + 'activeWorkflowUsersChanged', + { + activeUsers: expect.arrayContaining([ + expect.objectContaining({ + lastSeen: expect.any(String), + user: expect.objectContaining({ + id: owner.id, + }), + }), + expect.objectContaining({ + lastSeen: expect.any(String), + user: expect.objectContaining({ + id: memberWithAccess.id, + }), + }), + ]), + workflowId: workflow.id, + }, + [owner.id, memberWithAccess.id], + ); + }); + + it("should not emit activeWorkflowUsersChanged if user don't have access to the workflow", async () => { + const sendToUsersSpy = jest.spyOn(pushService, 'sendToUsers'); + + // Act + await sendWorkflowOpenedMessage(workflow.id, memberWithoutAccess.id); + + // Assert + expect(sendToUsersSpy).not.toHaveBeenCalled(); + }); + }); + + describe('workflow closed message', () => { + it('should not emit activeWorkflowUsersChanged after workflowClosed when there are no active users', async () => { + // Arrange + const sendToUsersSpy = jest.spyOn(pushService, 'sendToUsers'); + await sendWorkflowOpenedMessage(workflow.id, owner.id); + sendToUsersSpy.mockClear(); + + // Act + await sendWorkflowClosedMessage(workflow.id, owner.id); + + // Assert + expect(sendToUsersSpy).not.toHaveBeenCalled(); + }); + + it('should emit activeWorkflowUsersChanged after workflowClosed when there are active users', async () => { + // Arrange + const sendToUsersSpy = jest.spyOn(pushService, 'sendToUsers'); + await sendWorkflowOpenedMessage(workflow.id, owner.id); + await sendWorkflowOpenedMessage(workflow.id, memberWithAccess.id); + sendToUsersSpy.mockClear(); + + // Act + await sendWorkflowClosedMessage(workflow.id, owner.id); + + // Assert + expect(sendToUsersSpy).toHaveBeenCalledWith( + 'activeWorkflowUsersChanged', + { + activeUsers: expect.arrayContaining([ + expect.objectContaining({ + lastSeen: expect.any(String), + user: expect.objectContaining({ + id: memberWithAccess.id, + }), + }), + ]), + workflowId: workflow.id, + }, + [memberWithAccess.id], + ); + }); + + it("should not emit activeWorkflowUsersChanged if user don't have access to the workflow", async () => { + // Arrange + const sendToUsersSpy = jest.spyOn(pushService, 'sendToUsers'); + await sendWorkflowOpenedMessage(workflow.id, owner.id); + sendToUsersSpy.mockClear(); + + // Act + await sendWorkflowClosedMessage(workflow.id, memberWithoutAccess.id); + + // Assert + expect(sendToUsersSpy).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index f7ad20f9eaf9c3..0f6f7a927e4f5e 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -422,6 +422,16 @@ export interface IExecutionDeleteFilter { ids?: string[]; } +export type PushDataUsersForWorkflow = { + workflowId: string; + activeUsers: Array<{ user: IUser; lastSeen: string }>; +}; + +type PushDataWorkflowUsersChanged = { + data: PushDataUsersForWorkflow; + type: 'activeWorkflowUsersChanged'; +}; + export type IPushData = | PushDataExecutionFinished | PushDataExecutionStarted @@ -436,6 +446,7 @@ export type IPushData = | PushDataWorkerStatusMessage | PushDataActiveWorkflowAdded | PushDataActiveWorkflowRemoved + | PushDataWorkflowUsersChanged | PushDataWorkflowFailedToActivate; export type PushDataActiveWorkflowAdded = { diff --git a/packages/editor-ui/src/components/MainHeader/CollaborationPane.vue b/packages/editor-ui/src/components/MainHeader/CollaborationPane.vue new file mode 100644 index 00000000000000..daabc0a1780bc2 --- /dev/null +++ b/packages/editor-ui/src/components/MainHeader/CollaborationPane.vue @@ -0,0 +1,82 @@ + + + + + diff --git a/packages/editor-ui/src/components/MainHeader/WorkflowDetails.vue b/packages/editor-ui/src/components/MainHeader/WorkflowDetails.vue index 28cca7736f58bd..de687771d8cf45 100644 --- a/packages/editor-ui/src/components/MainHeader/WorkflowDetails.vue +++ b/packages/editor-ui/src/components/MainHeader/WorkflowDetails.vue @@ -22,6 +22,7 @@ import WorkflowTagsDropdown from '@/components/WorkflowTagsDropdown.vue'; import InlineTextEdit from '@/components/InlineTextEdit.vue'; import BreakpointsObserver from '@/components/BreakpointsObserver.vue'; import WorkflowHistoryButton from '@/components/MainHeader/WorkflowHistoryButton.vue'; +import CollaborationPane from '@/components/MainHeader/CollaborationPane.vue'; import { useRootStore } from '@/stores/root.store'; import { useSettingsStore } from '@/stores/settings.store'; @@ -675,6 +676,7 @@ function showCreateWorkflowSuccessToast(id?: string) {
+ { + afterEach(() => { + vi.clearAllMocks(); + }); + + it('should show only current workflow users', async () => { + const { getByTestId, queryByTestId } = renderComponent(); + await waitAllPromises(); + + expect(getByTestId('collaboration-pane')).toBeInTheDocument(); + expect(getByTestId('user-stack-avatars')).toBeInTheDocument(); + expect(getByTestId(`user-stack-avatar-${OWNER_USER.id}`)).toBeInTheDocument(); + expect(getByTestId(`user-stack-avatar-${MEMBER_USER.id}`)).toBeInTheDocument(); + expect(queryByTestId(`user-stack-avatar-${MEMBER_USER_2.id}`)).toBeNull(); + }); + + it('should always render owner first in the list', async () => { + const { getByTestId } = renderComponent(); + await waitAllPromises(); + const firstAvatar = getByTestId('user-stack-avatars').querySelector('.n8n-avatar'); + // Owner is second in the store but should be rendered first + expect(firstAvatar).toHaveAttribute('data-test-id', `user-stack-avatar-${OWNER_USER.id}`); + }); +}); diff --git a/packages/editor-ui/src/composables/useBeforeUnload.ts b/packages/editor-ui/src/composables/useBeforeUnload.ts index 5469c43ee84efa..20095a217afa45 100644 --- a/packages/editor-ui/src/composables/useBeforeUnload.ts +++ b/packages/editor-ui/src/composables/useBeforeUnload.ts @@ -1,9 +1,11 @@ import { useCanvasStore } from '@/stores/canvas.store'; import { useUIStore } from '@/stores/ui.store'; import { useI18n } from '@/composables/useI18n'; -import { computed } from 'vue'; -import { VIEWS } from '@/constants'; +import { computed, ref } from 'vue'; +import { TIME, VIEWS } from '@/constants'; import type { useRoute } from 'vue-router'; +import { useCollaborationStore } from '@/stores/collaboration.store'; +import { useWorkflowsStore } from '@/stores/workflows.store'; /** * Composable to handle the beforeunload event in canvas views. @@ -15,19 +17,31 @@ import type { useRoute } from 'vue-router'; export function useBeforeUnload({ route }: { route: ReturnType }) { const uiStore = useUIStore(); const canvasStore = useCanvasStore(); + const collaborationStore = useCollaborationStore(); + const workflowsStore = useWorkflowsStore(); const i18n = useI18n(); + const unloadTimeout = ref(null); const isDemoRoute = computed(() => route.name === VIEWS.DEMO); function onBeforeUnload(e: BeforeUnloadEvent) { if (isDemoRoute.value || window.preventNodeViewBeforeUnload) { return; } else if (uiStore.stateIsDirty) { + // A bit hacky solution to detecting users leaving the page after prompt: + // 1. Notify that workflow is closed straight away + collaborationStore.notifyWorkflowClosed(workflowsStore.workflowId); + // 2. If user decided to stay on the page we notify that the workflow is opened again + unloadTimeout.value = setTimeout(() => { + collaborationStore.notifyWorkflowOpened(workflowsStore.workflowId); + }, 5 * TIME.SECOND); + e.returnValue = true; //Gecko + IE return true; //Gecko + Webkit, Safari, Chrome etc. } else { canvasStore.startLoading(i18n.baseText('nodeView.redirecting')); + collaborationStore.notifyWorkflowClosed(workflowsStore.workflowId); return; } } @@ -37,6 +51,12 @@ export function useBeforeUnload({ route }: { route: ReturnType } function removeBeforeUnloadEventBindings() { + collaborationStore.notifyWorkflowClosed(workflowsStore.workflowId); + + if (unloadTimeout.value) { + clearTimeout(unloadTimeout.value); + } + window.removeEventListener('beforeunload', onBeforeUnload); } diff --git a/packages/editor-ui/src/constants.ts b/packages/editor-ui/src/constants.ts index f44046139ebabc..9cdf0b87b91f96 100644 --- a/packages/editor-ui/src/constants.ts +++ b/packages/editor-ui/src/constants.ts @@ -639,6 +639,7 @@ export const enum STORES { CLOUD_PLAN = 'cloudPlan', RBAC = 'rbac', PUSH = 'push', + COLLABORATION = 'collaboration', ASSISTANT = 'assistant', BECOME_TEMPLATE_CREATOR = 'becomeTemplateCreator', PROJECTS = 'projects', diff --git a/packages/editor-ui/src/stores/collaboration.store.ts b/packages/editor-ui/src/stores/collaboration.store.ts new file mode 100644 index 00000000000000..e9f4afe168496c --- /dev/null +++ b/packages/editor-ui/src/stores/collaboration.store.ts @@ -0,0 +1,86 @@ +import { defineStore } from 'pinia'; +import { computed, ref } from 'vue'; +import { useWorkflowsStore } from '@/stores/workflows.store'; +import { usePushConnectionStore } from '@/stores/pushConnection.store'; +import { STORES } from '@/constants'; +import type { IUser } from '@/Interface'; +import { useUsersStore } from '@/stores/users.store'; + +type ActiveUsersForWorkflows = { + [workflowId: string]: Array<{ user: IUser; lastSeen: string }>; +}; + +/** + * Store for tracking active users for workflows. I.e. to show + * who is collaboratively viewing/editing the workflow at the same time. + */ +export const useCollaborationStore = defineStore(STORES.COLLABORATION, () => { + const pushStore = usePushConnectionStore(); + const workflowStore = useWorkflowsStore(); + const usersStore = useUsersStore(); + + const usersForWorkflows = ref({}); + const pushStoreEventListenerRemovalFn = ref<(() => void) | null>(null); + + const getUsersForCurrentWorkflow = computed(() => { + return usersForWorkflows.value[workflowStore.workflowId] ?? []; + }); + + function initialize() { + if (pushStoreEventListenerRemovalFn.value) { + return; + } + + pushStoreEventListenerRemovalFn.value = pushStore.addEventListener((event) => { + if (event.type === 'activeWorkflowUsersChanged') { + const workflowId = event.data.workflowId; + usersForWorkflows.value[workflowId] = event.data.activeUsers; + } + }); + } + + function terminate() { + if (typeof pushStoreEventListenerRemovalFn.value === 'function') { + pushStoreEventListenerRemovalFn.value(); + pushStoreEventListenerRemovalFn.value = null; + } + } + + function workflowUsersUpdated(data: ActiveUsersForWorkflows) { + usersForWorkflows.value = data; + } + + function functionRemoveCurrentUserFromActiveUsers(workflowId: string) { + const workflowUsers = usersForWorkflows.value[workflowId]; + if (!workflowUsers) { + return; + } + + usersForWorkflows.value[workflowId] = workflowUsers.filter( + (activeUser) => activeUser.user.id !== usersStore.currentUserId, + ); + } + + function notifyWorkflowOpened(workflowId: string) { + pushStore.send({ + type: 'workflowOpened', + workflowId, + }); + } + + function notifyWorkflowClosed(workflowId: string) { + pushStore.send({ type: 'workflowClosed', workflowId }); + + functionRemoveCurrentUserFromActiveUsers(workflowId); + } + + return { + usersForWorkflows, + initialize, + terminate, + notifyWorkflowOpened, + notifyWorkflowClosed, + workflowUsersUpdated, + getUsersForCurrentWorkflow, + }; +}); diff --git a/packages/editor-ui/src/views/NodeView.v2.vue b/packages/editor-ui/src/views/NodeView.v2.vue index 5ce133bca92cd5..4f5f16caa58fe4 100644 --- a/packages/editor-ui/src/views/NodeView.v2.vue +++ b/packages/editor-ui/src/views/NodeView.v2.vue @@ -101,6 +101,7 @@ import { createEventBus } from 'n8n-design-system'; import type { PinDataSource } from '@/composables/usePinnedData'; import { useClipboard } from '@/composables/useClipboard'; import { useBeforeUnload } from '@/composables/useBeforeUnload'; +import { useCollaborationStore } from '@/stores/collaboration.store'; import { getResourcePermissions } from '@/permissions'; import NodeViewUnfinishedWorkflowMessage from '@/components/NodeViewUnfinishedWorkflowMessage.vue'; @@ -134,6 +135,7 @@ const credentialsStore = useCredentialsStore(); const environmentsStore = useEnvironmentsStore(); const externalSecretsStore = useExternalSecretsStore(); const rootStore = useRootStore(); +const collaborationStore = useCollaborationStore(); const executionsStore = useExecutionsStore(); const canvasStore = useCanvasStore(); const npsSurveyStore = useNpsSurveyStore(); @@ -338,6 +340,8 @@ async function initializeWorkspaceForExistingWorkflow(id: string) { } await projectsStore.setProjectNavActiveIdByWorkflowHomeProject(workflow.value.homeProject); + + collaborationStore.notifyWorkflowOpened(id); } catch (error) { toast.showError(error, i18n.baseText('openWorkflow.workflowNotFoundError')); @@ -1456,6 +1460,7 @@ watch( onBeforeMount(() => { if (!isDemoRoute.value) { pushConnectionStore.pushConnect(); + collaborationStore.initialize(); } }); @@ -1509,6 +1514,7 @@ onBeforeUnmount(() => { onDeactivated(() => { removeBeforeUnloadEventBindings(); + collaborationStore.terminate(); });