Skip to content

Commit

Permalink
refactor(core): Decouple lifecycle events from internal hooks (no-cha…
Browse files Browse the repository at this point in the history
…ngelog) (#10305)
  • Loading branch information
ivov authored Aug 7, 2024
1 parent b232831 commit 9b977e8
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 119 deletions.
45 changes: 1 addition & 44 deletions packages/cli/src/InternalHooks.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Service } from 'typedi';
import type { ITelemetryTrackProperties } from 'n8n-workflow';
import type { User } from '@db/entities/User';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { Telemetry } from '@/telemetry';
import { MessageEventBus } from './eventbus/MessageEventBus/MessageEventBus';

Expand All @@ -14,28 +13,16 @@ import { MessageEventBus } from './eventbus/MessageEventBus/MessageEventBus';
export class InternalHooks {
constructor(
private readonly telemetry: Telemetry,
workflowStatisticsService: WorkflowStatisticsService,
// Can't use @ts-expect-error because only dev time tsconfig considers this as an error, but not build time
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore - needed until we decouple telemetry
private readonly _eventBus: MessageEventBus, // needed until we decouple telemetry
) {
workflowStatisticsService.on('telemetry.onFirstProductionWorkflowSuccess', (metrics) =>
this.onFirstProductionWorkflowSuccess(metrics),
);
workflowStatisticsService.on('telemetry.onFirstWorkflowDataLoad', (metrics) =>
this.onFirstWorkflowDataLoad(metrics),
);
}
) {}

async init() {
await this.telemetry.init();
}

onFrontendSettingsAPI(pushRef?: string): void {
this.telemetry.track('Session started', { session_id: pushRef });
}

onWorkflowSharingUpdate(workflowId: string, userId: string, userList: string[]) {
const properties: ITelemetryTrackProperties = {
workflow_id: workflowId,
Expand All @@ -46,14 +33,6 @@ export class InternalHooks {
this.telemetry.track('User updated workflow sharing', properties);
}

async onN8nStop(): Promise<void> {
const timeoutPromise = new Promise<void>((resolve) => {
setTimeout(resolve, 3000);
});

return await Promise.race([timeoutPromise, this.telemetry.trackN8nStop()]);
}

onUserInviteEmailClick(userInviteClickData: { inviter: User; invitee: User }) {
this.telemetry.track('User clicked invite link from email', {
user_id: userInviteClickData.invitee.id,
Expand Down Expand Up @@ -85,10 +64,6 @@ export class InternalHooks {
});
}

onInstanceOwnerSetup(instanceOwnerSetupData: { user_id: string }) {
this.telemetry.track('Owner finished instance setup', instanceOwnerSetupData);
}

onEmailFailed(failedEmailData: {
user: User;
message_type:
Expand All @@ -103,22 +78,4 @@ export class InternalHooks {
user_id: failedEmailData.user.id,
});
}

/*
* Execution Statistics
*/
onFirstProductionWorkflowSuccess(data: { user_id: string; workflow_id: string }) {
this.telemetry.track('Workflow first prod success', data);
}

onFirstWorkflowDataLoad(data: {
user_id: string;
workflow_id: string;
node_type: string;
node_id: string;
credential_type?: string;
credential_id?: string;
}) {
this.telemetry.track('Workflow first data fetched', data);
}
}
3 changes: 1 addition & 2 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { Server } from '@/Server';
import { EDITOR_UI_DIST_DIR, LICENSE_FEATURES } from '@/constants';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { InternalHooks } from '@/InternalHooks';
import { License } from '@/License';
import { OrchestrationService } from '@/services/orchestration.service';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
Expand Down Expand Up @@ -110,7 +109,7 @@ export class Start extends BaseCommand {
await Container.get(OrchestrationService).shutdown();
}

await Container.get(InternalHooks).onN8nStop();
Container.get(EventService).emit('instance-stopped');

await Container.get(ActiveExecutions).shutdown();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type { User } from '@db/entities/User';
import type { SettingsRepository } from '@db/repositories/settings.repository';
import type { UserRepository } from '@db/repositories/user.repository';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import type { InternalHooks } from '@/InternalHooks';
import { License } from '@/License';
import type { OwnerRequest } from '@/requests';
import type { UserService } from '@/services/user.service';
Expand All @@ -21,15 +20,14 @@ import { badPasswords } from '@test/testData';

describe('OwnerController', () => {
const configGetSpy = jest.spyOn(config, 'getEnv');
const internalHooks = mock<InternalHooks>();
const authService = mock<AuthService>();
const userService = mock<UserService>();
const userRepository = mock<UserRepository>();
const settingsRepository = mock<SettingsRepository>();
mockInstance(License).isWithinUsersLimit.mockReturnValue(true);
const controller = new OwnerController(
mock(),
internalHooks,
mock(),
settingsRepository,
authService,
userService,
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/controllers/owner.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import { PostHogClient } from '@/posthog';
import { UserService } from '@/services/user.service';
import { Logger } from '@/Logger';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { InternalHooks } from '@/InternalHooks';
import { EventService } from '@/events/event.service';

@RestController('/owner')
export class OwnerController {
constructor(
private readonly logger: Logger,
private readonly internalHooks: InternalHooks,
private readonly eventService: EventService,
private readonly settingsRepository: SettingsRepository,
private readonly authService: AuthService,
private readonly userService: UserService,
Expand Down Expand Up @@ -85,7 +85,7 @@ export class OwnerController {

this.authService.issueCookie(res, owner, req.browserId);

this.internalHooks.onInstanceOwnerSetup({ user_id: owner.id });
this.eventService.emit('instance-owner-setup', { userId: owner.id });

return await this.userService.toPublic(owner, { posthog: this.postHog, withScopes: true });
}
Expand Down
27 changes: 26 additions & 1 deletion packages/cli/src/events/relay-event-map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,35 @@ export type UserLike = {
};

export type RelayEventMap = {
// #region Server
// #region Lifecycle

'server-started': {};

'session-started': {
pushRef?: string;
};

'instance-stopped': {};

'instance-owner-setup': {
userId: string;
};

'first-production-workflow-succeeded': {
projectId: string;
workflowId: string;
userId: string;
};

'first-workflow-data-loaded': {
userId: string;
workflowId: string;
nodeType: string;
nodeId: string;
credentialType?: string;
credentialId?: string;
};

// #endregion

// #region Workflow
Expand Down
50 changes: 49 additions & 1 deletion packages/cli/src/events/telemetry-event-relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ export class TelemetryEventRelay extends EventRelay {
'workflow-deleted': (event) => this.workflowDeleted(event),
'workflow-saved': async (event) => await this.workflowSaved(event),
'server-started': async () => await this.serverStarted(),
'session-started': (event) => this.sessionStarted(event),
'instance-stopped': () => this.instanceStopped(),
'instance-owner-setup': async (event) => await this.instanceOwnerSetup(event),
'first-production-workflow-succeeded': (event) =>
this.firstProductionWorkflowSucceeded(event),
'first-workflow-data-loaded': (event) => this.firstWorkflowDataLoaded(event),
'workflow-post-execute': async (event) => await this.workflowPostExecute(event),
'user-changed-role': (event) => this.userChangedRole(event),
'user-retrieved-user': (event) => this.userRetrievedUser(event),
Expand Down Expand Up @@ -687,7 +693,7 @@ export class TelemetryEventRelay extends EventRelay {

// #endregion

// #region Server
// #region Lifecycle

private async serverStarted() {
const cpus = os.cpus();
Expand Down Expand Up @@ -753,6 +759,48 @@ export class TelemetryEventRelay extends EventRelay {
});
}

private sessionStarted({ pushRef }: RelayEventMap['session-started']) {
this.telemetry.track('Session started', { session_id: pushRef });
}

private instanceStopped() {
this.telemetry.track('User instance stopped');
}

private async instanceOwnerSetup({ userId }: RelayEventMap['instance-owner-setup']) {
this.telemetry.track('Owner finished instance setup', { user_id: userId });
}

private firstProductionWorkflowSucceeded({
projectId,
workflowId,
userId,
}: RelayEventMap['first-production-workflow-succeeded']) {
this.telemetry.track('Workflow first prod success', {
project_id: projectId,
workflow_id: workflowId,
user_id: userId,
});
}

private firstWorkflowDataLoaded({
userId,
workflowId,
nodeType,
nodeId,
credentialType,
credentialId,
}: RelayEventMap['first-workflow-data-loaded']) {
this.telemetry.track('Workflow first data fetched', {
user_id: userId,
workflow_id: workflowId,
node_type: nodeType,
node_id: nodeId,
credential_type: credentialType,
credential_id: credentialId,
});
}

// #endregion

// #region User
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { UserService } from '@/services/user.service';
import { OwnershipService } from '@/services/ownership.service';
import { mockInstance } from '@test/mocking';
import type { Project } from '@/databases/entities/Project';
import type { EventService } from '@/events/event.service';

describe('WorkflowStatisticsService', () => {
const fakeUser = mock<User>({ id: 'abcde-fghij' });
Expand All @@ -44,21 +45,15 @@ describe('WorkflowStatisticsService', () => {
mocked(ownershipService.getProjectOwnerCached).mockResolvedValue(fakeUser);
const updateSettingsMock = jest.spyOn(userService, 'updateSettings').mockImplementation();

const eventService = mock<EventService>();
const workflowStatisticsService = new WorkflowStatisticsService(
mock(),
new WorkflowStatisticsRepository(dataSource, globalConfig),
ownershipService,
userService,
eventService,
);

const onFirstProductionWorkflowSuccess = jest.fn();
const onFirstWorkflowDataLoad = jest.fn();
workflowStatisticsService.on(
'telemetry.onFirstProductionWorkflowSuccess',
onFirstProductionWorkflowSuccess,
);
workflowStatisticsService.on('telemetry.onFirstWorkflowDataLoad', onFirstWorkflowDataLoad);

beforeEach(() => {
jest.clearAllMocks();
});
Expand Down Expand Up @@ -97,11 +92,10 @@ describe('WorkflowStatisticsService', () => {

await workflowStatisticsService.workflowExecutionCompleted(workflow, runData);
expect(updateSettingsMock).toHaveBeenCalledTimes(1);
expect(onFirstProductionWorkflowSuccess).toBeCalledTimes(1);
expect(onFirstProductionWorkflowSuccess).toHaveBeenNthCalledWith(1, {
project_id: fakeProject.id,
user_id: fakeUser.id,
workflow_id: workflow.id,
expect(eventService.emit).toHaveBeenCalledWith('first-production-workflow-succeeded', {
projectId: fakeProject.id,
workflowId: workflow.id,
userId: fakeUser.id,
});
});

Expand All @@ -124,7 +118,7 @@ describe('WorkflowStatisticsService', () => {
startedAt: new Date(),
};
await workflowStatisticsService.workflowExecutionCompleted(workflow, runData);
expect(onFirstProductionWorkflowSuccess).toBeCalledTimes(0);
expect(eventService.emit).not.toHaveBeenCalled();
});

test('should not send metrics for updated entries', async () => {
Expand All @@ -147,7 +141,7 @@ describe('WorkflowStatisticsService', () => {
};
mockDBCall(2);
await workflowStatisticsService.workflowExecutionCompleted(workflow, runData);
expect(onFirstProductionWorkflowSuccess).toBeCalledTimes(0);
expect(eventService.emit).not.toHaveBeenCalled();
});
});

Expand All @@ -164,13 +158,12 @@ describe('WorkflowStatisticsService', () => {
parameters: {},
};
await workflowStatisticsService.nodeFetchedData(workflowId, node);
expect(onFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
user_id: fakeUser.id,
project_id: fakeProject.id,
workflow_id: workflowId,
node_type: node.type,
node_id: node.id,
expect(eventService.emit).toHaveBeenCalledWith('first-workflow-data-loaded', {
userId: fakeUser.id,
project: fakeProject.id,
workflowId,
nodeType: node.type,
nodeId: node.id,
});
});

Expand All @@ -192,15 +185,14 @@ describe('WorkflowStatisticsService', () => {
},
};
await workflowStatisticsService.nodeFetchedData(workflowId, node);
expect(onFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
user_id: fakeUser.id,
project_id: fakeProject.id,
workflow_id: workflowId,
node_type: node.type,
node_id: node.id,
credential_type: 'testCredentials',
credential_id: node.credentials.testCredentials.id,
expect(eventService.emit).toHaveBeenCalledWith('first-workflow-data-loaded', {
userId: fakeUser.id,
project: fakeProject.id,
workflowId,
nodeType: node.type,
nodeId: node.id,
credentialType: 'testCredentials',
credentialId: node.credentials.testCredentials.id,
});
});

Expand All @@ -217,7 +209,7 @@ describe('WorkflowStatisticsService', () => {
parameters: {},
};
await workflowStatisticsService.nodeFetchedData(workflowId, node);
expect(onFirstWorkflowDataLoad).toBeCalledTimes(0);
expect(eventService.emit).not.toHaveBeenCalled();
});
});
});
Loading

0 comments on commit 9b977e8

Please sign in to comment.