Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Decouple event bus from internal hooks (no-changelog) #9724

Merged
merged 29 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
650 changes: 121 additions & 529 deletions packages/cli/src/InternalHooks.ts

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { CredentialsRepository } from '@db/repositories/credentials.repository';
import { SharedCredentialsRepository } from '@db/repositories/sharedCredentials.repository';
import { ProjectRepository } from '@/databases/repositories/project.repository';
import { InternalHooks } from '@/InternalHooks';
import { EventRelay } from '@/eventbus/event-relay.service';

export async function getCredentials(credentialId: string): Promise<ICredentialsDb | null> {
return await Container.get(CredentialsRepository).findOneBy({ id: credentialId });
Expand Down Expand Up @@ -59,6 +60,12 @@ export async function saveCredential(
credential_id: credential.id,
public_api: true,
});
Container.get(EventRelay).emit('credentials-created', {
user,
credentialName: credential.name,
credentialType: credential.type,
credentialId: credential.id,
});

return await Db.transaction(async (transactionManager) => {
const savedCredential = await transactionManager.save<CredentialsEntity>(credential);
Expand Down Expand Up @@ -95,6 +102,12 @@ export async function removeCredential(
credential_type: credentials.type,
credential_id: credentials.id,
});
Container.get(EventRelay).emit('credentials-deleted', {
user,
credentialName: credentials.name,
credentialType: credentials.type,
credentialId: credentials.id,
});
return await Container.get(CredentialsRepository).remove(credentials);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { SharedWorkflowRepository } from '@/databases/repositories/sharedWorkflo
import { TagRepository } from '@/databases/repositories/tag.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { ProjectRepository } from '@/databases/repositories/project.repository';
import { EventRelay } from '@/eventbus/event-relay.service';

export = {
createWorkflow: [
Expand All @@ -56,6 +57,10 @@ export = {

await Container.get(ExternalHooks).run('workflow.afterCreate', [createdWorkflow]);
void Container.get(InternalHooks).onWorkflowCreated(req.user, createdWorkflow, project, true);
Container.get(EventRelay).emit('workflow-created', {
workflow: createdWorkflow,
user: req.user,
});

return res.json(createdWorkflow);
},
Expand Down Expand Up @@ -233,6 +238,11 @@ export = {

await Container.get(ExternalHooks).run('workflow.afterUpdate', [updateData]);
void Container.get(InternalHooks).onWorkflowSaved(req.user, updateData, true);
Container.get(EventRelay).emit('workflow-saved', {
user: req.user,
workflowId: updateData.id,
workflowName: updateData.name,
});

return res.json(updatedWorkflow);
},
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/UserManagement/email/UserManagementMailer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { toError } from '@/utils';

import type { InviteEmailData, PasswordResetData, SendEmailResult } from './Interfaces';
import { NodeMailer } from './NodeMailer';
import { EventRelay } from '@/eventbus/event-relay.service';

type Template = HandlebarsTemplateDelegate<unknown>;
type TemplateName = 'invite' | 'passwordReset' | 'workflowShared' | 'credentialsShared';
Expand Down Expand Up @@ -144,6 +145,10 @@ export class UserManagementMailer {
message_type: 'Workflow shared',
public_api: false,
});
Container.get(EventRelay).emit('email-failed', {
user: sharer,
messageType: 'Workflow shared',
});

const error = toError(e);

Expand Down Expand Up @@ -199,6 +204,10 @@ export class UserManagementMailer {
message_type: 'Credentials shared',
public_api: false,
});
Container.get(EventRelay).emit('email-failed', {
user: sharer,
messageType: 'Credentials shared',
});

const error = toError(e);

Expand Down
49 changes: 39 additions & 10 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import { WorkflowRepository } from './databases/repositories/workflow.repository
import { UrlService } from './services/url.service';
import { WorkflowExecutionService } from './workflows/workflowExecution.service';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { EventRelay } from './eventbus/event-relay.service';

const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');

Expand Down Expand Up @@ -392,17 +393,21 @@ export function hookFunctionsPreExecute(): IWorkflowExecuteHooks {
*/
function hookFunctionsSave(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const internalHooks = Container.get(InternalHooks);
const eventsService = Container.get(EventsService);
const eventRelay = Container.get(EventRelay);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
void internalHooks.onNodeBeforeExecute(this.executionId, this.workflowData, nodeName);
const { executionId, workflowData: workflow } = this;

eventRelay.emit('node-pre-execute', { executionId, workflow, nodeName });
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
void internalHooks.onNodePostExecute(this.executionId, this.workflowData, nodeName);
const { executionId, workflowData: workflow } = this;

eventRelay.emit('node-post-execute', { executionId, workflow, nodeName });
},
],
workflowExecuteBefore: [],
Expand Down Expand Up @@ -541,20 +546,27 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const internalHooks = Container.get(InternalHooks);
const eventsService = Container.get(EventsService);
const eventRelay = Container.get(EventRelay);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
void internalHooks.onNodeBeforeExecute(this.executionId, this.workflowData, nodeName);
const { executionId, workflowData: workflow } = this;

eventRelay.emit('node-pre-execute', { executionId, workflow, nodeName });
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
void internalHooks.onNodePostExecute(this.executionId, this.workflowData, nodeName);
const { executionId, workflowData: workflow } = this;

eventRelay.emit('node-post-execute', { executionId, workflow, nodeName });
},
],
workflowExecuteBefore: [
async function (): Promise<void> {
void internalHooks.onWorkflowBeforeExecute(this.executionId, this.workflowData);
const { executionId, workflowData } = this;

eventRelay.emit('workflow-pre-execute', { executionId, data: workflowData });
},
],
workflowExecuteAfter: [
Expand Down Expand Up @@ -622,9 +634,17 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
eventsService.emit('workflowExecutionCompleted', this.workflowData, fullRunData);
}
},
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
// send tracking and event log events, but don't wait for them
void internalHooks.onWorkflowPostExecute(this.executionId, this.workflowData, fullRunData);
async function (this: WorkflowHooks, runData: IRun): Promise<void> {
const { executionId, workflowData: workflow } = this;

void internalHooks.onWorkflowPostExecute(executionId, workflow, runData);
eventRelay.emit('workflow-post-execute', {
workflowId: workflow.id,
workflowName: workflow.name,
executionId,
success: runData.status === 'success',
isManual: runData.mode === 'manual',
});
},
async function (this: WorkflowHooks, fullRunData: IRun) {
const externalHooks = Container.get(ExternalHooks);
Expand Down Expand Up @@ -765,6 +785,7 @@ async function executeWorkflow(

const nodeTypes = Container.get(NodeTypes);
const activeExecutions = Container.get(ActiveExecutions);
const eventRelay = Container.get(EventRelay);

const workflowData =
options.loadedWorkflowData ??
Expand Down Expand Up @@ -792,7 +813,7 @@ async function executeWorkflow(
executionId = options.parentExecutionId ?? (await activeExecutions.add(runData));
}

void internalHooks.onWorkflowBeforeExecute(executionId || '', runData);
Container.get(EventRelay).emit('workflow-pre-execute', { executionId, data: runData });

let data;
try {
Expand Down Expand Up @@ -905,6 +926,14 @@ async function executeWorkflow(
await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]);

void internalHooks.onWorkflowPostExecute(executionId, workflowData, data, additionalData.userId);
eventRelay.emit('workflow-post-execute', {
workflowId: workflowData.id,
workflowName: workflowData.name,
executionId,
success: data.status === 'success',
isManual: data.mode === 'manual',
userId: additionalData.userId,
});

// subworkflow either finished, or is in status waiting due to a wait node, both cases are considered successes here
if (data.finished === true || data.status === 'waiting') {
Expand Down
12 changes: 11 additions & 1 deletion packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { InternalHooks } from '@/InternalHooks';
import { Logger } from '@/Logger';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
import { EventRelay } from './eventbus/event-relay.service';

@Service()
export class WorkflowRunner {
Expand All @@ -52,6 +53,7 @@ export class WorkflowRunner {
private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly nodeTypes: NodeTypes,
private readonly permissionChecker: PermissionChecker,
private readonly eventRelay: EventRelay,
) {
if (this.executionsMode === 'queue') {
this.jobQueue = Container.get(Queue);
Expand Down Expand Up @@ -145,7 +147,7 @@ export class WorkflowRunner {
await this.enqueueExecution(executionId, data, loadStaticData, realtime);
} else {
await this.runMainProcess(executionId, data, loadStaticData, executionId);
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data);
this.eventRelay.emit('workflow-pre-execute', { executionId, data });
}

// only run these when not in queue mode or when the execution is manual,
Expand All @@ -164,6 +166,14 @@ export class WorkflowRunner {
executionData,
data.userId,
);
this.eventRelay.emit('workflow-post-execute', {
workflowId: data.workflowData.id,
workflowName: data.workflowData.name,
executionId,
success: executionData?.status === 'success',
isManual: data.executionMode === 'manual',
userId: data.userId,
});
if (this.externalHooks.exists('workflow.postExecute')) {
try {
await this.externalHooks.run('workflow.postExecute', [
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/auth/methods/ldap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
updateLdapUserOnLocalDb,
} from '@/Ldap/helpers';
import type { User } from '@db/entities/User';
import { EventRelay } from '@/eventbus/event-relay.service';

export const handleLdapLogin = async (
loginId: string,
Expand Down Expand Up @@ -54,6 +55,7 @@ export const handleLdapLogin = async (
user_type: 'ldap',
was_disabled_ldap_user: false,
});
Container.get(EventRelay).emit('user-signed-up', { user });
return user;
}
} else {
Expand Down
11 changes: 7 additions & 4 deletions packages/cli/src/controllers/auth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { ForbiddenError } from '@/errors/response-errors/forbidden.error';
import { ApplicationError } from 'n8n-workflow';
import { UserRepository } from '@/databases/repositories/user.repository';
import { EventRelay } from '@/eventbus/event-relay.service';

@RestController()
export class AuthController {
Expand All @@ -35,6 +36,7 @@ export class AuthController {
private readonly userService: UserService,
private readonly license: License,
private readonly userRepository: UserRepository,
private readonly eventRelay: EventRelay,
private readonly postHog?: PostHogClient,
) {}

Expand Down Expand Up @@ -90,17 +92,17 @@ export class AuthController {
}

this.authService.issueCookie(res, user, req.browserId);
void this.internalHooks.onUserLoginSuccess({

this.eventRelay.emit('user-logged-in', {
user,
authenticationMethod: usedAuthenticationMethod,
});

return await this.userService.toPublic(user, { posthog: this.postHog, withScopes: true });
}
void this.internalHooks.onUserLoginFailed({
user: email,
this.eventRelay.emit('user-login-failed', {
authenticationMethod: usedAuthenticationMethod,
reason: 'wrong credentials',
userEmail: email,
});
throw new AuthError('Wrong username or password. Do you have caps lock on?');
}
Expand Down Expand Up @@ -177,6 +179,7 @@ export class AuthController {
}

void this.internalHooks.onUserInviteEmailClick({ inviter, invitee });
this.eventRelay.emit('user-invite-email-click', { inviter, invitee });

const { firstName, lastName } = inviter;
return { inviter: { firstName, lastName } };
Expand Down
37 changes: 37 additions & 0 deletions packages/cli/src/controllers/communityPackages.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Push } from '@/push';
import { CommunityPackagesService } from '@/services/communityPackages.service';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { EventRelay } from '@/eventbus/event-relay.service';

const {
PACKAGE_NOT_INSTALLED,
Expand All @@ -38,6 +39,7 @@ export class CommunityPackagesController {
private readonly push: Push,
private readonly internalHooks: InternalHooks,
private readonly communityPackagesService: CommunityPackagesService,
private readonly eventRelay: EventRelay,
) {}

// TODO: move this into a new decorator `@IfConfig('executions.mode', 'queue')`
Expand Down Expand Up @@ -114,6 +116,14 @@ export class CommunityPackagesController {
package_version: parsed.version,
failure_reason: errorMessage,
});
this.eventRelay.emit('community-package-installed', {
user: req.user,
inputString: name,
packageName: parsed.packageName,
success: false,
packageVersion: parsed.version,
failureReason: errorMessage,
});

let message = [`Error loading package "${name}" `, errorMessage].join(':');
if (error instanceof Error && error.cause instanceof Error) {
Expand Down Expand Up @@ -144,6 +154,16 @@ export class CommunityPackagesController {
package_author: installedPackage.authorName,
package_author_email: installedPackage.authorEmail,
});
this.eventRelay.emit('community-package-installed', {
user: req.user,
inputString: name,
packageName: parsed.packageName,
success: true,
packageVersion: parsed.version,
packageNodeNames: installedPackage.installedNodes.map((node) => node.name),
packageAuthor: installedPackage.authorName,
packageAuthorEmail: installedPackage.authorEmail,
});

return installedPackage;
}
Expand Down Expand Up @@ -233,6 +253,14 @@ export class CommunityPackagesController {
package_author: installedPackage.authorName,
package_author_email: installedPackage.authorEmail,
});
this.eventRelay.emit('community-package-deleted', {
user: req.user,
packageName: name,
packageVersion: installedPackage.installedVersion,
packageNodeNames: installedPackage.installedNodes.map((node) => node.name),
packageAuthor: installedPackage.authorName,
packageAuthorEmail: installedPackage.authorEmail,
});
}

@Patch('/')
Expand Down Expand Up @@ -281,6 +309,15 @@ export class CommunityPackagesController {
package_author: newInstalledPackage.authorName,
package_author_email: newInstalledPackage.authorEmail,
});
this.eventRelay.emit('community-package-updated', {
user: req.user,
packageName: name,
packageVersionCurrent: previouslyInstalledPackage.installedVersion,
packageVersionNew: newInstalledPackage.installedVersion,
packageNodeNames: newInstalledPackage.installedNodes.map((n) => n.name),
packageAuthor: newInstalledPackage.authorName,
packageAuthorEmail: newInstalledPackage.authorEmail,
});

return newInstalledPackage;
} catch (error) {
Expand Down
Loading