Skip to content

Commit

Permalink
feat: Reintroduce collaboration feature
Browse files Browse the repository at this point in the history
Collaboration feature was removed because it was missing authorization checks.
This adds checks that user can't send workflow opened or closed message, unless
they have access to that workflow. Also uses `cache` for storing the collaboration
state, so the feature works correctly with multiple mains.

The feature is currently only enabled for the new canvas, as that should come live
soon anyways.
  • Loading branch information
tomi committed Aug 29, 2024
1 parent c988931 commit 4427285
Show file tree
Hide file tree
Showing 22 changed files with 1,019 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { CollaborationState } from '../collaboration.state';
import type { CacheService } from '@/services/cache/cache.service';
import { mock } from 'jest-mock-extended';

const origDate = global.Date;

const mockDateFactory = (currentDate: string) => {
return class CustomDate extends origDate {
constructor() {
super(currentDate);
}
} as DateConstructor;
};

describe('CollaborationState', () => {
let collaborationState: CollaborationState;
let mockCacheService: jest.Mocked<CacheService>;

beforeEach(() => {
mockCacheService = mock<CacheService>();
collaborationState = new CollaborationState(mockCacheService);
});

afterEach(() => {
global.Date = origDate;
});

const workflowId = 'workflow';

describe('addActiveWorkflowUser', () => {
it('should add workflow user with correct cache key and value', async () => {
// Arrange
global.Date = mockDateFactory('2023-01-01T00:00:00.000Z');

// Act
await collaborationState.addActiveWorkflowUser(workflowId, 'userId');

// Assert
expect(mockCacheService.setHash).toHaveBeenCalledWith('collaboration:workflow', {
userId: '2023-01-01T00:00:00.000Z',
});
});
});

describe('removeActiveWorkflowUser', () => {
it('should remove workflow user with correct cache key', async () => {
// Act
await collaborationState.removeActiveWorkflowUser(workflowId, 'userId');

// Assert
expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith(
'collaboration:workflow',
'userId',
);
});
});

describe('getActiveWorkflowUsers', () => {
it('should get workflows with correct cache key', async () => {
// Act
const users = await collaborationState.getActiveWorkflowUsers(workflowId);

// Assert
expect(mockCacheService.getHash).toHaveBeenCalledWith('collaboration:workflow');
expect(users).toBeEmptyArray();
});

it('should get workflow users that are not expired', async () => {
// Arrange
const nowMinus16Minutes = new Date();
nowMinus16Minutes.setMinutes(nowMinus16Minutes.getMinutes() - 16);
const now = new Date().toISOString();

mockCacheService.getHash.mockReturnValueOnce(
Promise.resolve({
expiredUserId: nowMinus16Minutes.toISOString(),
notExpiredUserId: now,
}),
);

// Act
const users = await collaborationState.getActiveWorkflowUsers(workflowId);

// Assert
expect(users).toEqual([
{
lastSeen: now,
userId: 'notExpiredUserId',
},
]);
});
});
});
23 changes: 23 additions & 0 deletions packages/cli/src/collaboration/collaboration.message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export type CollaborationMessage = WorkflowOpenedMessage | WorkflowClosedMessage;

export type WorkflowOpenedMessage = {
type: 'workflowOpened';
workflowId: string;
};

export type WorkflowClosedMessage = {
type: 'workflowClosed';
workflowId: string;
};

const isWorkflowMessage = (msg: unknown): msg is CollaborationMessage => {
return typeof msg === 'object' && msg !== null && 'type' in msg;
};

export const isWorkflowOpenedMessage = (msg: unknown): msg is WorkflowOpenedMessage => {
return isWorkflowMessage(msg) && msg.type === 'workflowOpened';
};

export const isWorkflowClosedMessage = (msg: unknown): msg is WorkflowClosedMessage => {
return isWorkflowMessage(msg) && msg.type === 'workflowClosed';
};
115 changes: 115 additions & 0 deletions packages/cli/src/collaboration/collaboration.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import type { Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import { Push } from '../push';
import { Logger } from '@/logger';
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
import { isWorkflowClosedMessage, isWorkflowOpenedMessage } from './collaboration.message';
import type { IActiveWorkflowUsersChanged } from '../interfaces';
import type { OnPushMessage } from '@/push/types';
import { UserRepository } from '@/databases/repositories/user.repository';
import type { User } from '@/databases/entities/user';
import { CollaborationState } from '@/collaboration/collaboration.state';
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import { UserService } from '@/services/user.service';

/**
* Service for managing collaboration feature between users. E.g. keeping
* track of active users for a workflow.
*/
@Service()
export class CollaborationService {
constructor(
private readonly logger: Logger,
private readonly push: Push,
private readonly state: CollaborationState,
private readonly userRepository: UserRepository,
private readonly userService: UserService,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
) {}

init() {
this.push.on('message', async (event: OnPushMessage) => {
try {
await this.handleUserMessage(event.userId, event.msg);
} catch (error) {
this.logger.error('Error handling user message', {
error: error as unknown,
msg: event.msg,
userId: event.userId,
});
}
});
}

async handleUserMessage(userId: User['id'], msg: unknown) {
if (isWorkflowOpenedMessage(msg)) {
await this.handleWorkflowOpened(userId, msg);
} else if (isWorkflowClosedMessage(msg)) {
await this.handleWorkflowClosed(userId, msg);
}
}

private async handleWorkflowOpened(userId: User['id'], msg: WorkflowOpenedMessage) {
const { workflowId } = msg;

if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) {
return;
}

await this.state.addActiveWorkflowUser(workflowId, userId);

await this.sendWorkflowUsersChangedMessage(workflowId);
}

private async handleWorkflowClosed(userId: User['id'], msg: WorkflowClosedMessage) {
const { workflowId } = msg;

if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) {
return;
}

await this.state.removeActiveWorkflowUser(workflowId, userId);

await this.sendWorkflowUsersChangedMessage(workflowId);
}

private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) {
// We have already validated that all active workflow users
// have proper access to the workflow, so we don't need to validate it again
const activeWorkflowUsers = await this.state.getActiveWorkflowUsers(workflowId);
const workflowUserIds = activeWorkflowUsers.map((user) => user.userId);

if (workflowUserIds.length === 0) {
return;
}
const users = await this.userRepository.getByIds(this.userRepository.manager, workflowUserIds);

const msgData: IActiveWorkflowUsersChanged = {
workflowId,
activeUsers: await Promise.all(
users.map(async (user) => ({
user: await this.userService.toPublic(user),
lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)!
.lastSeen,
})),
),
};

this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds);
}

private async hasUserAccessToWorkflow(userId: User['id'], workflowId: Workflow['id']) {
const user = await this.userRepository.findOneBy({
id: userId,
});
if (!user) {
return false;
}

const workflow = await this.sharedWorkflowRepository.findWorkflowForUser(workflowId, user, [
'workflow:read',
]);

return !!workflow;
}
}
110 changes: 110 additions & 0 deletions packages/cli/src/collaboration/collaboration.state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import type { ActiveWorkflowUser } from '@/collaboration/collaboration.types';
import { Time } from '@/constants';
import type { Iso8601DateTimeString } from '@/interfaces';
import { CacheService } from '@/services/cache/cache.service';
import type { User } from '@/databases/entities/user';
import { type Workflow } from 'n8n-workflow';
import { Service } from 'typedi';

type WorkflowCacheHash = Record<User['id'], Iso8601DateTimeString>;

/**
* State management for the collaboration service. Workflow active
* users are stored in a hash in the following format:
* {
* [workflowId] -> {
* [userId] -> lastSeenAsIso8601String
* }
* }
*/
@Service()
export class CollaborationState {
/**
* After how many minutes of inactivity a user should be removed
* as being an active user of a workflow.
*/
public readonly inactivityCleanUpTime = 15 * Time.minutes.toMilliseconds;

constructor(private readonly cache: CacheService) {}

/**
* Mark user active for given workflow
*/
async addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const cacheKey = this.formWorkflowCacheKey(workflowId);
const cacheEntry: WorkflowCacheHash = {
[userId]: new Date().toISOString(),
};

await this.cache.setHash(cacheKey, cacheEntry);
}

/**
* Remove user from workflow's active users
*/
async removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const cacheKey = this.formWorkflowCacheKey(workflowId);

await this.cache.deleteFromHash(cacheKey, userId);
}

async getActiveWorkflowUsers(workflowId: Workflow['id']): Promise<ActiveWorkflowUser[]> {
const cacheKey = this.formWorkflowCacheKey(workflowId);

const cacheValue = await this.cache.getHash<Iso8601DateTimeString>(cacheKey);
if (!cacheValue) {
return [];
}

const workflowActiveUsers = this.cacheHashToWorkflowActiveUsers(cacheValue);
const [expired, stillActive] = this.splitToExpiredAndStillActive(workflowActiveUsers);

if (expired.length > 0) {
void this.removeExpiredUsersForWorkflow(workflowId, expired);
}

return stillActive;
}

private formWorkflowCacheKey(workflowId: Workflow['id']) {
return `collaboration:${workflowId}`;
}

private splitToExpiredAndStillActive(workflowUsers: ActiveWorkflowUser[]) {
const expired: ActiveWorkflowUser[] = [];
const stillActive: ActiveWorkflowUser[] = [];

for (const user of workflowUsers) {
if (this.hasUserExpired(user.lastSeen)) {
expired.push(user);
} else {
stillActive.push(user);
}
}

return [expired, stillActive];
}

private async removeExpiredUsersForWorkflow(
workflowId: Workflow['id'],
expiredUsers: ActiveWorkflowUser[],
) {
const cacheKey = this.formWorkflowCacheKey(workflowId);
await Promise.all(
expiredUsers.map(async (user) => await this.cache.deleteFromHash(cacheKey, user.userId)),
);
}

private cacheHashToWorkflowActiveUsers(workflowCacheEntry: WorkflowCacheHash) {
return Object.entries(workflowCacheEntry).map(([userId, lastSeen]) => ({
userId,
lastSeen,
}));
}

private hasUserExpired(lastSeenString: Iso8601DateTimeString) {
const expiryTime = new Date(lastSeenString).getTime() + this.inactivityCleanUpTime;

return Date.now() > expiryTime;
}
}
7 changes: 7 additions & 0 deletions packages/cli/src/collaboration/collaboration.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import type { Iso8601DateTimeString } from '@/interfaces';
import type { User } from '@/databases/entities/user';

export type ActiveWorkflowUser = {
userId: User['id'];
lastSeen: Iso8601DateTimeString;
};
21 changes: 20 additions & 1 deletion packages/cli/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,13 @@ export type IPushData =
| PushDataWorkerStatusMessage
| PushDataWorkflowActivated
| PushDataWorkflowDeactivated
| PushDataWorkflowFailedToActivate;
| PushDataWorkflowFailedToActivate
| PushDataActiveWorkflowUsersChanged;

type PushDataActiveWorkflowUsersChanged = {
data: IActiveWorkflowUsersChanged;
type: 'activeWorkflowUsersChanged';
};

type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
Expand Down Expand Up @@ -350,6 +356,19 @@ export type PushDataNodeDescriptionUpdated = {
type: 'nodeDescriptionUpdated';
};

/** DateTime in the Iso8601 format, e.g. 2024-10-31T00:00:00.123Z */
export type Iso8601DateTimeString = string;

export interface IActiveWorkflowUser {
user: PublicUser;
lastSeen: Iso8601DateTimeString;
}

export interface IActiveWorkflowUsersChanged {
workflowId: Workflow['id'];
activeUsers: IActiveWorkflowUser[];
}

export interface IActiveWorkflowAdded {
workflowId: Workflow['id'];
}
Expand Down
Loading

0 comments on commit 4427285

Please sign in to comment.