Skip to content

Commit

Permalink
feat: Reintroduce collaboration feature (#10602)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomi authored Sep 3, 2024
1 parent 35e6a87 commit 2ea2bfe
Show file tree
Hide file tree
Showing 22 changed files with 1,046 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { CollaborationState } from '../collaboration.state';
import type { CacheService } from '@/services/cache/cache.service';
import { mock } from 'jest-mock-extended';

const origDate = global.Date;

const mockDateFactory = (currentDate: string) => {
return class CustomDate extends origDate {
constructor() {
super(currentDate);
}
} as DateConstructor;
};

describe('CollaborationState', () => {
let collaborationState: CollaborationState;
let mockCacheService: jest.Mocked<CacheService>;

beforeEach(() => {
mockCacheService = mock<CacheService>();
collaborationState = new CollaborationState(mockCacheService);
});

afterEach(() => {
global.Date = origDate;
});

const workflowId = 'workflow';

describe('addActiveWorkflowUser', () => {
it('should add workflow user with correct cache key and value', async () => {
// Arrange
global.Date = mockDateFactory('2023-01-01T00:00:00.000Z');

// Act
await collaborationState.addActiveWorkflowUser(workflowId, 'userId');

// Assert
expect(mockCacheService.setHash).toHaveBeenCalledWith('collaboration:workflow', {
userId: '2023-01-01T00:00:00.000Z',
});
});
});

describe('removeActiveWorkflowUser', () => {
it('should remove workflow user with correct cache key', async () => {
// Act
await collaborationState.removeActiveWorkflowUser(workflowId, 'userId');

// Assert
expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith(
'collaboration:workflow',
'userId',
);
});
});

describe('getActiveWorkflowUsers', () => {
it('should get workflows with correct cache key', async () => {
// Act
const users = await collaborationState.getActiveWorkflowUsers(workflowId);

// Assert
expect(mockCacheService.getHash).toHaveBeenCalledWith('collaboration:workflow');
expect(users).toBeEmptyArray();
});

it('should get workflow users that are not expired', async () => {
// Arrange
const nowMinus16Minutes = new Date();
nowMinus16Minutes.setMinutes(nowMinus16Minutes.getMinutes() - 16);
const now = new Date().toISOString();

mockCacheService.getHash.mockResolvedValueOnce({
expiredUserId: nowMinus16Minutes.toISOString(),
notExpiredUserId: now,
});

// Act
const users = await collaborationState.getActiveWorkflowUsers(workflowId);

// Assert
expect(users).toEqual([
{
lastSeen: now,
userId: 'notExpiredUserId',
},
]);
// removes expired users from the cache
expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith(
'collaboration:workflow',
'expiredUserId',
);
});
});
});
35 changes: 35 additions & 0 deletions packages/cli/src/collaboration/collaboration.message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { z } from 'zod';

export type CollaborationMessage = WorkflowOpenedMessage | WorkflowClosedMessage;

export const workflowOpenedMessageSchema = z
.object({
type: z.literal('workflowOpened'),
workflowId: z.string().min(1),
})
.strict();

export const workflowClosedMessageSchema = z
.object({
type: z.literal('workflowClosed'),
workflowId: z.string().min(1),
})
.strict();

export const workflowMessageSchema = z.discriminatedUnion('type', [
workflowOpenedMessageSchema,
workflowClosedMessageSchema,
]);

export type WorkflowOpenedMessage = z.infer<typeof workflowOpenedMessageSchema>;

export type WorkflowClosedMessage = z.infer<typeof workflowClosedMessageSchema>;

export type WorkflowMessage = z.infer<typeof workflowMessageSchema>;

/**
* Parses the given message and ensure it's of type WorkflowMessage
*/
export const parseWorkflowMessage = async (msg: unknown) => {
return await workflowMessageSchema.parseAsync(msg);
};
120 changes: 120 additions & 0 deletions packages/cli/src/collaboration/collaboration.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import type { Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import { Push } from '../push';
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
import { parseWorkflowMessage } from './collaboration.message';
import type { IActiveWorkflowUsersChanged } from '../interfaces';
import type { OnPushMessage } from '@/push/types';
import { UserRepository } from '@/databases/repositories/user.repository';
import type { User } from '@/databases/entities/user';
import { CollaborationState } from '@/collaboration/collaboration.state';
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import { UserService } from '@/services/user.service';
import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow';

/**
* Service for managing collaboration feature between users. E.g. keeping
* track of active users for a workflow.
*/
@Service()
export class CollaborationService {
constructor(
private readonly push: Push,
private readonly state: CollaborationState,
private readonly userRepository: UserRepository,
private readonly userService: UserService,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
) {}

init() {
this.push.on('message', async (event: OnPushMessage) => {
try {
await this.handleUserMessage(event.userId, event.msg);
} catch (error) {
ErrorReporterProxy.error(
new ApplicationError('Error handling CollaborationService push message', {
extra: {
msg: event.msg,
userId: event.userId,
},
cause: error,
}),
);
}
});
}

async handleUserMessage(userId: User['id'], msg: unknown) {
const workflowMessage = await parseWorkflowMessage(msg);

if (workflowMessage.type === 'workflowOpened') {
await this.handleWorkflowOpened(userId, workflowMessage);
} else if (workflowMessage.type === 'workflowClosed') {
await this.handleWorkflowClosed(userId, workflowMessage);
}
}

private async handleWorkflowOpened(userId: User['id'], msg: WorkflowOpenedMessage) {
const { workflowId } = msg;

if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) {
return;
}

await this.state.addActiveWorkflowUser(workflowId, userId);

await this.sendWorkflowUsersChangedMessage(workflowId);
}

private async handleWorkflowClosed(userId: User['id'], msg: WorkflowClosedMessage) {
const { workflowId } = msg;

if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) {
return;
}

await this.state.removeActiveWorkflowUser(workflowId, userId);

await this.sendWorkflowUsersChangedMessage(workflowId);
}

private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) {
// We have already validated that all active workflow users
// have proper access to the workflow, so we don't need to validate it again
const activeWorkflowUsers = await this.state.getActiveWorkflowUsers(workflowId);
const workflowUserIds = activeWorkflowUsers.map((user) => user.userId);

if (workflowUserIds.length === 0) {
return;
}
const users = await this.userRepository.getByIds(this.userRepository.manager, workflowUserIds);

const msgData: IActiveWorkflowUsersChanged = {
workflowId,
activeUsers: await Promise.all(
users.map(async (user) => ({
user: await this.userService.toPublic(user),
lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)!
.lastSeen,
})),
),
};

this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds);
}

private async hasUserAccessToWorkflow(userId: User['id'], workflowId: Workflow['id']) {
const user = await this.userRepository.findOneBy({
id: userId,
});
if (!user) {
return false;
}

const workflow = await this.sharedWorkflowRepository.findWorkflowForUser(workflowId, user, [
'workflow:read',
]);

return !!workflow;
}
}
110 changes: 110 additions & 0 deletions packages/cli/src/collaboration/collaboration.state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import type { ActiveWorkflowUser } from '@/collaboration/collaboration.types';
import { Time } from '@/constants';
import type { Iso8601DateTimeString } from '@/interfaces';
import { CacheService } from '@/services/cache/cache.service';
import type { User } from '@/databases/entities/user';
import { type Workflow } from 'n8n-workflow';
import { Service } from 'typedi';

type WorkflowCacheHash = Record<User['id'], Iso8601DateTimeString>;

/**
* State management for the collaboration service. Workflow active
* users are stored in a hash in the following format:
* {
* [workflowId] -> {
* [userId] -> lastSeenAsIso8601String
* }
* }
*/
@Service()
export class CollaborationState {
/**
* After how many minutes of inactivity a user should be removed
* as being an active user of a workflow.
*/
public readonly inactivityCleanUpTime = 15 * Time.minutes.toMilliseconds;

constructor(private readonly cache: CacheService) {}

/**
* Mark user active for given workflow
*/
async addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const cacheKey = this.formWorkflowCacheKey(workflowId);
const cacheEntry: WorkflowCacheHash = {
[userId]: new Date().toISOString(),
};

await this.cache.setHash(cacheKey, cacheEntry);
}

/**
* Remove user from workflow's active users
*/
async removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const cacheKey = this.formWorkflowCacheKey(workflowId);

await this.cache.deleteFromHash(cacheKey, userId);
}

async getActiveWorkflowUsers(workflowId: Workflow['id']): Promise<ActiveWorkflowUser[]> {
const cacheKey = this.formWorkflowCacheKey(workflowId);

const cacheValue = await this.cache.getHash<Iso8601DateTimeString>(cacheKey);
if (!cacheValue) {
return [];
}

const workflowActiveUsers = this.cacheHashToWorkflowActiveUsers(cacheValue);
const [expired, stillActive] = this.splitToExpiredAndStillActive(workflowActiveUsers);

if (expired.length > 0) {
void this.removeExpiredUsersForWorkflow(workflowId, expired);
}

return stillActive;
}

private formWorkflowCacheKey(workflowId: Workflow['id']) {
return `collaboration:${workflowId}`;
}

private splitToExpiredAndStillActive(workflowUsers: ActiveWorkflowUser[]) {
const expired: ActiveWorkflowUser[] = [];
const stillActive: ActiveWorkflowUser[] = [];

for (const user of workflowUsers) {
if (this.hasUserExpired(user.lastSeen)) {
expired.push(user);
} else {
stillActive.push(user);
}
}

return [expired, stillActive];
}

private async removeExpiredUsersForWorkflow(
workflowId: Workflow['id'],
expiredUsers: ActiveWorkflowUser[],
) {
const cacheKey = this.formWorkflowCacheKey(workflowId);
await Promise.all(
expiredUsers.map(async (user) => await this.cache.deleteFromHash(cacheKey, user.userId)),
);
}

private cacheHashToWorkflowActiveUsers(workflowCacheEntry: WorkflowCacheHash) {
return Object.entries(workflowCacheEntry).map(([userId, lastSeen]) => ({
userId,
lastSeen,
}));
}

private hasUserExpired(lastSeenString: Iso8601DateTimeString) {
const expiryTime = new Date(lastSeenString).getTime() + this.inactivityCleanUpTime;

return Date.now() > expiryTime;
}
}
7 changes: 7 additions & 0 deletions packages/cli/src/collaboration/collaboration.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import type { Iso8601DateTimeString } from '@/interfaces';
import type { User } from '@/databases/entities/user';

export type ActiveWorkflowUser = {
userId: User['id'];
lastSeen: Iso8601DateTimeString;
};
Loading

0 comments on commit 2ea2bfe

Please sign in to comment.