Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Reintroduce collaboration feature #10602

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { CollaborationState } from '../collaboration.state';
import type { CacheService } from '@/services/cache/cache.service';
import { mock } from 'jest-mock-extended';

const origDate = global.Date;

const mockDateFactory = (currentDate: string) => {
return class CustomDate extends origDate {
constructor() {
super(currentDate);
}
} as DateConstructor;
};

describe('CollaborationState', () => {
let collaborationState: CollaborationState;
let mockCacheService: jest.Mocked<CacheService>;

beforeEach(() => {
mockCacheService = mock<CacheService>();
collaborationState = new CollaborationState(mockCacheService);
});

afterEach(() => {
global.Date = origDate;
});

const workflowId = 'workflow';

describe('addActiveWorkflowUser', () => {
it('should add workflow user with correct cache key and value', async () => {
// Arrange
global.Date = mockDateFactory('2023-01-01T00:00:00.000Z');

// Act
await collaborationState.addActiveWorkflowUser(workflowId, 'userId');

// Assert
expect(mockCacheService.setHash).toHaveBeenCalledWith('collaboration:workflow', {
userId: '2023-01-01T00:00:00.000Z',
});
});
});

describe('removeActiveWorkflowUser', () => {
it('should remove workflow user with correct cache key', async () => {
// Act
await collaborationState.removeActiveWorkflowUser(workflowId, 'userId');

// Assert
expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith(
'collaboration:workflow',
'userId',
);
});
});

describe('getActiveWorkflowUsers', () => {
it('should get workflows with correct cache key', async () => {
// Act
const users = await collaborationState.getActiveWorkflowUsers(workflowId);

// Assert
expect(mockCacheService.getHash).toHaveBeenCalledWith('collaboration:workflow');
expect(users).toBeEmptyArray();
});

it('should get workflow users that are not expired', async () => {
// Arrange
const nowMinus16Minutes = new Date();
nowMinus16Minutes.setMinutes(nowMinus16Minutes.getMinutes() - 16);
const now = new Date().toISOString();

mockCacheService.getHash.mockReturnValueOnce(
Promise.resolve({
expiredUserId: nowMinus16Minutes.toISOString(),
notExpiredUserId: now,
}),
);

// Act
const users = await collaborationState.getActiveWorkflowUsers(workflowId);

// Assert
expect(users).toEqual([
{
lastSeen: now,
userId: 'notExpiredUserId',
},
]);
tomi marked this conversation as resolved.
Show resolved Hide resolved
});
});
});
23 changes: 23 additions & 0 deletions packages/cli/src/collaboration/collaboration.message.ts
tomi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export type CollaborationMessage = WorkflowOpenedMessage | WorkflowClosedMessage;

export type WorkflowOpenedMessage = {
type: 'workflowOpened';
workflowId: string;
};

export type WorkflowClosedMessage = {
type: 'workflowClosed';
workflowId: string;
};

const isWorkflowMessage = (msg: unknown): msg is CollaborationMessage => {
return typeof msg === 'object' && msg !== null && 'type' in msg;
};

export const isWorkflowOpenedMessage = (msg: unknown): msg is WorkflowOpenedMessage => {
return isWorkflowMessage(msg) && msg.type === 'workflowOpened';
};

export const isWorkflowClosedMessage = (msg: unknown): msg is WorkflowClosedMessage => {
return isWorkflowMessage(msg) && msg.type === 'workflowClosed';
};
115 changes: 115 additions & 0 deletions packages/cli/src/collaboration/collaboration.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import type { Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import { Push } from '../push';
import { Logger } from '@/logger';
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
import { isWorkflowClosedMessage, isWorkflowOpenedMessage } from './collaboration.message';
import type { IActiveWorkflowUsersChanged } from '../interfaces';
import type { OnPushMessage } from '@/push/types';
import { UserRepository } from '@/databases/repositories/user.repository';
import type { User } from '@/databases/entities/user';
import { CollaborationState } from '@/collaboration/collaboration.state';
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import { UserService } from '@/services/user.service';

/**
* Service for managing collaboration feature between users. E.g. keeping
* track of active users for a workflow.
*/
@Service()
export class CollaborationService {
constructor(
private readonly logger: Logger,
private readonly push: Push,
private readonly state: CollaborationState,
private readonly userRepository: UserRepository,
private readonly userService: UserService,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
) {}

init() {
this.push.on('message', async (event: OnPushMessage) => {
try {
await this.handleUserMessage(event.userId, event.msg);
} catch (error) {
this.logger.error('Error handling user message', {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we report these to sentry? What's the correct way to do that? Throw an ApplicationError and let the top-level handler take care of it?

error: error as unknown,
msg: event.msg,
userId: event.userId,
});
}
});
}

async handleUserMessage(userId: User['id'], msg: unknown) {
if (isWorkflowOpenedMessage(msg)) {
await this.handleWorkflowOpened(userId, msg);
} else if (isWorkflowClosedMessage(msg)) {
await this.handleWorkflowClosed(userId, msg);
}
}

private async handleWorkflowOpened(userId: User['id'], msg: WorkflowOpenedMessage) {
const { workflowId } = msg;

if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) {
return;
}

await this.state.addActiveWorkflowUser(workflowId, userId);

await this.sendWorkflowUsersChangedMessage(workflowId);
}

private async handleWorkflowClosed(userId: User['id'], msg: WorkflowClosedMessage) {
const { workflowId } = msg;

if (!(await this.hasUserAccessToWorkflow(userId, workflowId))) {
return;
}

await this.state.removeActiveWorkflowUser(workflowId, userId);

await this.sendWorkflowUsersChangedMessage(workflowId);
}

private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) {
// We have already validated that all active workflow users
// have proper access to the workflow, so we don't need to validate it again
const activeWorkflowUsers = await this.state.getActiveWorkflowUsers(workflowId);
const workflowUserIds = activeWorkflowUsers.map((user) => user.userId);

if (workflowUserIds.length === 0) {
return;
}
const users = await this.userRepository.getByIds(this.userRepository.manager, workflowUserIds);

const msgData: IActiveWorkflowUsersChanged = {
workflowId,
activeUsers: await Promise.all(
users.map(async (user) => ({
user: await this.userService.toPublic(user),
lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)!
.lastSeen,
})),
),
};

this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds);
}

private async hasUserAccessToWorkflow(userId: User['id'], workflowId: Workflow['id']) {
const user = await this.userRepository.findOneBy({
id: userId,
});
if (!user) {
return false;
}

const workflow = await this.sharedWorkflowRepository.findWorkflowForUser(workflowId, user, [
'workflow:read',
]);
Comment on lines +114 to +116
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally would want to check the permissions without needing to query the entire workflow. Might do that as a follow up


return !!workflow;
}
}
110 changes: 110 additions & 0 deletions packages/cli/src/collaboration/collaboration.state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import type { ActiveWorkflowUser } from '@/collaboration/collaboration.types';
import { Time } from '@/constants';
import type { Iso8601DateTimeString } from '@/interfaces';
import { CacheService } from '@/services/cache/cache.service';
import type { User } from '@/databases/entities/user';
import { type Workflow } from 'n8n-workflow';
import { Service } from 'typedi';

type WorkflowCacheHash = Record<User['id'], Iso8601DateTimeString>;

/**
* State management for the collaboration service. Workflow active
* users are stored in a hash in the following format:
* {
* [workflowId] -> {
* [userId] -> lastSeenAsIso8601String
* }
* }
*/
@Service()
export class CollaborationState {
/**
* After how many minutes of inactivity a user should be removed
* as being an active user of a workflow.
*/
public readonly inactivityCleanUpTime = 15 * Time.minutes.toMilliseconds;

constructor(private readonly cache: CacheService) {}

/**
* Mark user active for given workflow
*/
async addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const cacheKey = this.formWorkflowCacheKey(workflowId);
const cacheEntry: WorkflowCacheHash = {
[userId]: new Date().toISOString(),
};

await this.cache.setHash(cacheKey, cacheEntry);
}

/**
* Remove user from workflow's active users
*/
async removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const cacheKey = this.formWorkflowCacheKey(workflowId);

await this.cache.deleteFromHash(cacheKey, userId);
}

async getActiveWorkflowUsers(workflowId: Workflow['id']): Promise<ActiveWorkflowUser[]> {
const cacheKey = this.formWorkflowCacheKey(workflowId);

const cacheValue = await this.cache.getHash<Iso8601DateTimeString>(cacheKey);
if (!cacheValue) {
return [];
}

const workflowActiveUsers = this.cacheHashToWorkflowActiveUsers(cacheValue);
const [expired, stillActive] = this.splitToExpiredAndStillActive(workflowActiveUsers);

if (expired.length > 0) {
void this.removeExpiredUsersForWorkflow(workflowId, expired);
}

return stillActive;
}

private formWorkflowCacheKey(workflowId: Workflow['id']) {
return `collaboration:${workflowId}`;
}

private splitToExpiredAndStillActive(workflowUsers: ActiveWorkflowUser[]) {
const expired: ActiveWorkflowUser[] = [];
const stillActive: ActiveWorkflowUser[] = [];

for (const user of workflowUsers) {
if (this.hasUserExpired(user.lastSeen)) {
expired.push(user);
} else {
stillActive.push(user);
}
}

return [expired, stillActive];
}

private async removeExpiredUsersForWorkflow(
workflowId: Workflow['id'],
expiredUsers: ActiveWorkflowUser[],
) {
const cacheKey = this.formWorkflowCacheKey(workflowId);
await Promise.all(
expiredUsers.map(async (user) => await this.cache.deleteFromHash(cacheKey, user.userId)),
);
}

private cacheHashToWorkflowActiveUsers(workflowCacheEntry: WorkflowCacheHash) {
return Object.entries(workflowCacheEntry).map(([userId, lastSeen]) => ({
userId,
lastSeen,
}));
}

private hasUserExpired(lastSeenString: Iso8601DateTimeString) {
const expiryTime = new Date(lastSeenString).getTime() + this.inactivityCleanUpTime;

return Date.now() > expiryTime;
}
}
7 changes: 7 additions & 0 deletions packages/cli/src/collaboration/collaboration.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import type { Iso8601DateTimeString } from '@/interfaces';
import type { User } from '@/databases/entities/user';

export type ActiveWorkflowUser = {
userId: User['id'];
lastSeen: Iso8601DateTimeString;
};
21 changes: 20 additions & 1 deletion packages/cli/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,13 @@ export type IPushData =
| PushDataWorkerStatusMessage
| PushDataWorkflowActivated
| PushDataWorkflowDeactivated
| PushDataWorkflowFailedToActivate;
| PushDataWorkflowFailedToActivate
| PushDataActiveWorkflowUsersChanged;

type PushDataActiveWorkflowUsersChanged = {
data: IActiveWorkflowUsersChanged;
type: 'activeWorkflowUsersChanged';
};

type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
Expand Down Expand Up @@ -350,6 +356,19 @@ export type PushDataNodeDescriptionUpdated = {
type: 'nodeDescriptionUpdated';
};

/** DateTime in the Iso8601 format, e.g. 2024-10-31T00:00:00.123Z */
export type Iso8601DateTimeString = string;

export interface IActiveWorkflowUser {
user: PublicUser;
lastSeen: Iso8601DateTimeString;
}

export interface IActiveWorkflowUsersChanged {
workflowId: Workflow['id'];
activeUsers: IActiveWorkflowUser[];
}

export interface IActiveWorkflowAdded {
workflowId: Workflow['id'];
}
Expand Down
Loading
Loading