Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Reintroduce collaboration feature #10602

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { CollaborationState } from '../collaboration.state';
import type { CacheService } from '@/services/cache/cache.service';
import { mock } from 'jest-mock-extended';

const origDate = global.Date;

const mockDateFactory = (currentDate: string) => {
return class CustomDate extends origDate {
constructor() {
super(currentDate);
}
} as DateConstructor;
};

describe('CollaborationState', () => {
let collaborationState: CollaborationState;
let mockCacheService: jest.Mocked<CacheService>;

beforeEach(() => {
mockCacheService = mock<CacheService>();
collaborationState = new CollaborationState(mockCacheService);
});

afterEach(() => {
global.Date = origDate;
});

const workflowId = 'workflow';

describe('addActiveWorkflowUser', () => {
it('should add workflow user with correct cache key and value', async () => {
// Arrange
global.Date = mockDateFactory('2023-01-01T00:00:00.000Z');

// Act
await collaborationState.addActiveWorkflowUser(workflowId, 'userId');

// Assert
expect(mockCacheService.setHash).toHaveBeenCalledWith('collaboration:workflow', {
userId: '2023-01-01T00:00:00.000Z',
});
});
});

describe('removeActiveWorkflowUser', () => {
it('should remove workflow user with correct cache key', async () => {
// Act
await collaborationState.removeActiveWorkflowUser(workflowId, 'userId');

// Assert
expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith(
'collaboration:workflow',
'userId',
);
});
});

describe('getActiveWorkflowUsers', () => {
it('should get workflows with correct cache key', async () => {
// Act
const users = await collaborationState.getActiveWorkflowUsers(workflowId);

// Assert
expect(mockCacheService.getHash).toHaveBeenCalledWith('collaboration:workflow');
expect(users).toBeEmptyArray();
});

it('should get workflow users that are not expired', async () => {
// Arrange
const nowMinus16Minutes = new Date();
nowMinus16Minutes.setMinutes(nowMinus16Minutes.getMinutes() - 16);
const now = new Date().toISOString();

mockCacheService.getHash.mockResolvedValueOnce({
expiredUserId: nowMinus16Minutes.toISOString(),
notExpiredUserId: now,
});

// Act
const users = await collaborationState.getActiveWorkflowUsers(workflowId);

// Assert
expect(users).toEqual([
{
lastSeen: now,
userId: 'notExpiredUserId',
},
]);
tomi marked this conversation as resolved.
Show resolved Hide resolved
// removes expired users from the cache
expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith(
'collaboration:workflow',
'expiredUserId',
);
});
});
});
35 changes: 35 additions & 0 deletions packages/cli/src/collaboration/collaboration.message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { z } from 'zod';

export type CollaborationMessage = WorkflowOpenedMessage | WorkflowClosedMessage;

export const workflowOpenedMessageSchema = z
.object({
type: z.literal('workflowOpened'),
workflowId: z.string().min(1),
})
.strict();

export const workflowClosedMessageSchema = z
.object({
type: z.literal('workflowClosed'),
workflowId: z.string().min(1),
})
.strict();

export const workflowMessageSchema = z.discriminatedUnion('type', [
workflowOpenedMessageSchema,
workflowClosedMessageSchema,
]);

export type WorkflowOpenedMessage = z.infer<typeof workflowOpenedMessageSchema>;

export type WorkflowClosedMessage = z.infer<typeof workflowClosedMessageSchema>;

export type WorkflowMessage = z.infer<typeof workflowMessageSchema>;

/**
* Parses the given message and ensure it's of type WorkflowMessage
*/
export const parseWorkflowMessage = async (msg: unknown) => {
return await workflowMessageSchema.parseAsync(msg);
};
120 changes: 120 additions & 0 deletions packages/cli/src/collaboration/collaboration.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import type { Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import { Push } from '../push';
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
import { parseWorkflowMessage } from './collaboration.message';
import type { IActiveWorkflowUsersChanged } from '../interfaces';
import type { OnPushMessage } from '@/push/types';
import { UserRepository } from '@/databases/repositories/user.repository';
import type { User } from '@/databases/entities/user';
import { CollaborationState } from '@/collaboration/collaboration.state';
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import { UserService } from '@/services/user.service';
import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow';

/**
* Service for managing collaboration feature between users. E.g. keeping
* track of active users for a workflow.
*/
@Service()
export class CollaborationService {
constructor(
private readonly push: Push,
private readonly state: CollaborationState,
private readonly userRepository: UserRepository,
private readonly userService: UserService,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
) {}

init() {
this.push.on('message', async (event: OnPushMessage) => {
try {
await this.handleUserMessage(event.userId, event.msg);
} catch (error) {
ErrorReporterProxy.error(
new ApplicationError('Error handling CollaborationService push message', {
extra: {
msg: event.msg,
userId: event.userId,
},
cause: error,
}),
);
}
});
}

async handleUserMessage(userId: User['id'], msg: unknown) {
const workflowMessage = await parseWorkflowMessage(msg);

if (workflowMessage.type === 'workflowOpened') {
await this.handleWorkflowOpened(userId, workflowMessage);
} else if (workflowMessage.type === 'workflowClosed') {
await this.handleWorkflowClosed(userId, workflowMessage);
}
}

private async handleWorkflowOpened(userId: User['id'], msg: WorkflowOpenedMessage) {
const { workflowId } = msg;

if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) {
return;
}

await this.state.addActiveWorkflowUser(workflowId, userId);

await this.sendWorkflowUsersChangedMessage(workflowId);
}

private async handleWorkflowClosed(userId: User['id'], msg: WorkflowClosedMessage) {
const { workflowId } = msg;

if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) {
return;
}

await this.state.removeActiveWorkflowUser(workflowId, userId);

await this.sendWorkflowUsersChangedMessage(workflowId);
}

private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) {
// We have already validated that all active workflow users
// have proper access to the workflow, so we don't need to validate it again
const activeWorkflowUsers = await this.state.getActiveWorkflowUsers(workflowId);
const workflowUserIds = activeWorkflowUsers.map((user) => user.userId);

if (workflowUserIds.length === 0) {
return;
}
const users = await this.userRepository.getByIds(this.userRepository.manager, workflowUserIds);

const msgData: IActiveWorkflowUsersChanged = {
workflowId,
activeUsers: await Promise.all(
users.map(async (user) => ({
user: await this.userService.toPublic(user),
lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)!
.lastSeen,
})),
),
};

this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds);
}

private async hasUserAccessToWorkflow(userId: User['id'], workflowId: Workflow['id']) {
const user = await this.userRepository.findOneBy({
id: userId,
});
if (!user) {
return false;
}

const workflow = await this.sharedWorkflowRepository.findWorkflowForUser(workflowId, user, [
'workflow:read',
]);
Comment on lines +114 to +116
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally would want to check the permissions without needing to query the entire workflow. Might do that as a follow up


return !!workflow;
}
}
110 changes: 110 additions & 0 deletions packages/cli/src/collaboration/collaboration.state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import type { ActiveWorkflowUser } from '@/collaboration/collaboration.types';
import { Time } from '@/constants';
import type { Iso8601DateTimeString } from '@/interfaces';
import { CacheService } from '@/services/cache/cache.service';
import type { User } from '@/databases/entities/user';
import { type Workflow } from 'n8n-workflow';
import { Service } from 'typedi';

type WorkflowCacheHash = Record<User['id'], Iso8601DateTimeString>;

/**
* State management for the collaboration service. Workflow active
* users are stored in a hash in the following format:
* {
* [workflowId] -> {
* [userId] -> lastSeenAsIso8601String
* }
* }
*/
@Service()
export class CollaborationState {
/**
* After how many minutes of inactivity a user should be removed
* as being an active user of a workflow.
*/
public readonly inactivityCleanUpTime = 15 * Time.minutes.toMilliseconds;

constructor(private readonly cache: CacheService) {}

/**
* Mark user active for given workflow
*/
async addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const cacheKey = this.formWorkflowCacheKey(workflowId);
const cacheEntry: WorkflowCacheHash = {
[userId]: new Date().toISOString(),
};

await this.cache.setHash(cacheKey, cacheEntry);
}

/**
* Remove user from workflow's active users
*/
async removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const cacheKey = this.formWorkflowCacheKey(workflowId);

await this.cache.deleteFromHash(cacheKey, userId);
}

async getActiveWorkflowUsers(workflowId: Workflow['id']): Promise<ActiveWorkflowUser[]> {
const cacheKey = this.formWorkflowCacheKey(workflowId);

const cacheValue = await this.cache.getHash<Iso8601DateTimeString>(cacheKey);
if (!cacheValue) {
return [];
}

const workflowActiveUsers = this.cacheHashToWorkflowActiveUsers(cacheValue);
const [expired, stillActive] = this.splitToExpiredAndStillActive(workflowActiveUsers);

if (expired.length > 0) {
void this.removeExpiredUsersForWorkflow(workflowId, expired);
}

return stillActive;
}

private formWorkflowCacheKey(workflowId: Workflow['id']) {
return `collaboration:${workflowId}`;
}

private splitToExpiredAndStillActive(workflowUsers: ActiveWorkflowUser[]) {
const expired: ActiveWorkflowUser[] = [];
const stillActive: ActiveWorkflowUser[] = [];

for (const user of workflowUsers) {
if (this.hasUserExpired(user.lastSeen)) {
expired.push(user);
} else {
stillActive.push(user);
}
}

return [expired, stillActive];
}

private async removeExpiredUsersForWorkflow(
workflowId: Workflow['id'],
expiredUsers: ActiveWorkflowUser[],
) {
const cacheKey = this.formWorkflowCacheKey(workflowId);
await Promise.all(
expiredUsers.map(async (user) => await this.cache.deleteFromHash(cacheKey, user.userId)),
);
}

private cacheHashToWorkflowActiveUsers(workflowCacheEntry: WorkflowCacheHash) {
return Object.entries(workflowCacheEntry).map(([userId, lastSeen]) => ({
userId,
lastSeen,
}));
}

private hasUserExpired(lastSeenString: Iso8601DateTimeString) {
const expiryTime = new Date(lastSeenString).getTime() + this.inactivityCleanUpTime;

return Date.now() > expiryTime;
}
}
7 changes: 7 additions & 0 deletions packages/cli/src/collaboration/collaboration.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import type { Iso8601DateTimeString } from '@/interfaces';
import type { User } from '@/databases/entities/user';

export type ActiveWorkflowUser = {
userId: User['id'];
lastSeen: Iso8601DateTimeString;
};
Loading
Loading