Skip to content

Commit

Permalink
refactor: Revert "refactor(core): Use typedi to manage EventBus singl…
Browse files Browse the repository at this point in the history
…etons (#5795)" (no-changelog) (#5825)

Revert "refactor(core): Use typedi to manage EventBus singletons (no-changelog) (#5795)"

This reverts commit 522c790.
  • Loading branch information
netroy authored Mar 30, 2023
1 parent 6c35ffa commit 58fda25
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 145 deletions.
58 changes: 27 additions & 31 deletions packages/cli/src/InternalHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import type {
import { Telemetry } from '@/telemetry';
import type { AuthProviderType } from '@db/entities/AuthIdentity';
import { RoleService } from './role/role.service';
import { eventBus } from './eventbus';
import type { User } from '@db/entities/User';
import { N8N_VERSION } from '@/constants';
import * as Db from '@/Db';
import { NodeTypes } from './NodeTypes';
import { MessageEventBus } from '@/eventbus';

function userToPayload(user: User): {
userId: string;
Expand All @@ -51,11 +51,7 @@ function userToPayload(user: User): {
export class InternalHooks implements IInternalHooksClass {
private instanceId: string;

constructor(
private telemetry: Telemetry,
private nodeTypes: NodeTypes,
private eventBus: MessageEventBus,
) {}
constructor(private telemetry: Telemetry, private nodeTypes: NodeTypes) {}

async init(instanceId: string) {
this.instanceId = instanceId;
Expand Down Expand Up @@ -115,7 +111,7 @@ export class InternalHooks implements IInternalHooksClass {
async onWorkflowCreated(user: User, workflow: IWorkflowBase, publicApi: boolean): Promise<void> {
const { nodeGraph } = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.workflow.created',
payload: {
...userToPayload(user),
Expand All @@ -134,7 +130,7 @@ export class InternalHooks implements IInternalHooksClass {

async onWorkflowDeleted(user: User, workflowId: string, publicApi: boolean): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.workflow.deleted',
payload: {
...userToPayload(user),
Expand Down Expand Up @@ -166,7 +162,7 @@ export class InternalHooks implements IInternalHooksClass {
}

void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.workflow.updated',
payload: {
...userToPayload(user),
Expand Down Expand Up @@ -198,7 +194,7 @@ export class InternalHooks implements IInternalHooksClass {
nodeName: string,
): Promise<void> {
const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName);
void this.eventBus.sendNodeEvent({
void eventBus.sendNodeEvent({
eventName: 'n8n.node.started',
payload: {
executionId,
Expand All @@ -216,7 +212,7 @@ export class InternalHooks implements IInternalHooksClass {
nodeName: string,
): Promise<void> {
const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName);
void this.eventBus.sendNodeEvent({
void eventBus.sendNodeEvent({
eventName: 'n8n.node.finished',
payload: {
executionId,
Expand All @@ -234,7 +230,7 @@ export class InternalHooks implements IInternalHooksClass {
): Promise<void> {
void Promise.all([
Db.collections.Execution.update(executionId, { status: 'running' }),
this.eventBus.sendWorkflowEvent({
eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.started',
payload: {
executionId,
Expand All @@ -253,7 +249,7 @@ export class InternalHooks implements IInternalHooksClass {
workflowData?: IWorkflowBase,
): Promise<void> {
void Promise.all([
this.eventBus.sendWorkflowEvent({
eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.crashed',
payload: {
executionId,
Expand Down Expand Up @@ -414,7 +410,7 @@ export class InternalHooks implements IInternalHooksClass {

promises.push(
properties.success
? this.eventBus.sendWorkflowEvent({
? eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.success',
payload: {
executionId,
Expand All @@ -425,7 +421,7 @@ export class InternalHooks implements IInternalHooksClass {
workflowName: workflow.name,
},
})
: this.eventBus.sendWorkflowEvent({
: eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.failed',
payload: {
executionId,
Expand Down Expand Up @@ -473,7 +469,7 @@ export class InternalHooks implements IInternalHooksClass {
publicApi: boolean;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.deleted',
payload: {
...userToPayload(userDeletionData.user),
Expand All @@ -494,7 +490,7 @@ export class InternalHooks implements IInternalHooksClass {
email_sent: boolean;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.invited',
payload: {
...userToPayload(userInviteData.user),
Expand All @@ -516,7 +512,7 @@ export class InternalHooks implements IInternalHooksClass {
public_api: boolean;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.reinvited',
payload: {
...userToPayload(userReinviteData.user),
Expand Down Expand Up @@ -575,7 +571,7 @@ export class InternalHooks implements IInternalHooksClass {

async onUserUpdate(userUpdateData: { user: User; fields_changed: string[] }): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.updated',
payload: {
...userToPayload(userUpdateData.user),
Expand All @@ -594,7 +590,7 @@ export class InternalHooks implements IInternalHooksClass {
invitee: User;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.invitation.accepted',
payload: {
invitee: {
Expand All @@ -613,7 +609,7 @@ export class InternalHooks implements IInternalHooksClass {

async onUserPasswordResetEmailClick(userPasswordResetData: { user: User }): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.reset',
payload: {
...userToPayload(userPasswordResetData.user),
Expand Down Expand Up @@ -647,7 +643,7 @@ export class InternalHooks implements IInternalHooksClass {

async onApiKeyDeleted(apiKeyDeletedData: { user: User; public_api: boolean }): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.api.deleted',
payload: {
...userToPayload(apiKeyDeletedData.user),
Expand All @@ -662,7 +658,7 @@ export class InternalHooks implements IInternalHooksClass {

async onApiKeyCreated(apiKeyCreatedData: { user: User; public_api: boolean }): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.api.created',
payload: {
...userToPayload(apiKeyCreatedData.user),
Expand All @@ -677,7 +673,7 @@ export class InternalHooks implements IInternalHooksClass {

async onUserPasswordResetRequestClick(userPasswordResetData: { user: User }): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.reset.requested',
payload: {
...userToPayload(userPasswordResetData.user),
Expand All @@ -701,7 +697,7 @@ export class InternalHooks implements IInternalHooksClass {
},
): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.signedup',
payload: {
...userToPayload(user),
Expand All @@ -720,7 +716,7 @@ export class InternalHooks implements IInternalHooksClass {
public_api: boolean;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.email.failed',
payload: {
messageType: failedEmailData.message_type,
Expand All @@ -745,7 +741,7 @@ export class InternalHooks implements IInternalHooksClass {
public_api: boolean;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.credentials.created',
payload: {
...userToPayload(userCreatedCredentialsData.user),
Expand Down Expand Up @@ -773,7 +769,7 @@ export class InternalHooks implements IInternalHooksClass {
sharees_removed: number | null;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.user.credentials.shared',
payload: {
...userToPayload(userSharedCredentialsData.user),
Expand Down Expand Up @@ -813,7 +809,7 @@ export class InternalHooks implements IInternalHooksClass {
failure_reason?: string;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.package.installed',
payload: {
...userToPayload(installationData.user),
Expand Down Expand Up @@ -851,7 +847,7 @@ export class InternalHooks implements IInternalHooksClass {
package_author_email?: string;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.package.updated',
payload: {
...userToPayload(updateData.user),
Expand Down Expand Up @@ -884,7 +880,7 @@ export class InternalHooks implements IInternalHooksClass {
package_author_email?: string;
}): Promise<void> {
void Promise.all([
this.eventBus.sendAuditEvent({
eventBus.sendAuditEvent({
eventName: 'n8n.audit.package.deleted',
payload: {
...userToPayload(deleteData.user),
Expand Down
19 changes: 12 additions & 7 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ import { configureMetrics } from './metrics';
import { setupBasicAuth } from './middlewares/basicAuth';
import { setupExternalJWTAuth } from './middlewares/externalJWTAuth';
import { PostHogClient } from './posthog';
import { eventBus } from './eventbus';
import { Container } from 'typedi';
import { InternalHooks } from './InternalHooks';
import {
Expand All @@ -156,7 +157,6 @@ import { getSamlLoginLabel, isSamlLoginEnabled, isSamlLicensed } from './sso/sam
import { SamlController } from './sso/saml/routes/saml.controller.ee';
import { SamlService } from './sso/saml/saml.service.ee';
import { LdapManager } from './Ldap/LdapManager.ee';
import { MessageEventBus } from '@/eventbus';

const exec = promisify(callbackExec);

Expand Down Expand Up @@ -365,7 +365,7 @@ class Server extends AbstractServer {
return this.frontendSettings;
}

private async registerControllers(ignoredEndpoints: Readonly<string[]>) {
private registerControllers(ignoredEndpoints: Readonly<string[]>) {
const { app, externalHooks, activeWorkflowRunner, nodeTypes } = this;
const repositories = Db.collections;
setupAuthMiddlewares(app, ignoredEndpoints, this.restEndpoint, repositories.User);
Expand All @@ -376,11 +376,8 @@ class Server extends AbstractServer {
const postHog = this.postHog;
const samlService = Container.get(SamlService);

const eventBus = Container.get(MessageEventBus);
await eventBus.initialize();

const controllers: object[] = [
new EventBusController(eventBus),
new EventBusController(),
new AuthController({ config, internalHooks, repositories, logger, postHog }),
new OwnerController({ config, internalHooks, repositories, logger }),
new MeController({ externalHooks, internalHooks, repositories, logger }),
Expand Down Expand Up @@ -500,7 +497,7 @@ class Server extends AbstractServer {

await handleLdapInit();

await this.registerControllers(ignoredEndpoints);
this.registerControllers(ignoredEndpoints);

this.app.use(`/${this.restEndpoint}/credentials`, credentialsController);

Expand Down Expand Up @@ -1226,6 +1223,14 @@ class Server extends AbstractServer {
),
);

// ----------------------------------------
// EventBus Setup
// ----------------------------------------

if (!eventBus.isInitialized) {
await eventBus.initialize();
}

// ----------------------------------------
// Webhooks
// ----------------------------------------
Expand Down
7 changes: 2 additions & 5 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { initErrorHandling } from '@/ErrorReporting';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { Push } from '@/push';
import { MessageEventBus } from '@/eventbus';
import { eventBus } from './eventbus';
import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents';
import { Container } from 'typedi';
import { InternalHooks } from './InternalHooks';
Expand All @@ -67,12 +67,9 @@ export class WorkflowRunner {

jobQueue: JobQueue;

eventBus: MessageEventBus;

constructor() {
this.push = Container.get(Push);
this.activeExecutions = Container.get(ActiveExecutions);
this.eventBus = Container.get(MessageEventBus);
}

/**
Expand Down Expand Up @@ -119,7 +116,7 @@ export class WorkflowRunner {
// does contain those messages.
try {
// Search for messages for this executionId in event logs
const eventLogMessages = await this.eventBus.getEventsByExecutionId(executionId);
const eventLogMessages = await eventBus.getEventsByExecutionId(executionId);
// Attempt to recover more better runData from these messages (but don't update the execution db entry yet)
if (eventLogMessages.length > 0) {
const eventLogExecutionData = await recoverExecutionDataFromEventLogMessages(
Expand Down
4 changes: 1 addition & 3 deletions packages/cli/src/api/e2e.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/naming-convention */
import { Container } from 'typedi';
import { Router } from 'express';
import bodyParser from 'body-parser';
import { v4 as uuid } from 'uuid';
import config from '@/config';
import * as Db from '@/Db';
import type { Role } from '@db/entities/Role';
import { hashPassword } from '@/UserManagement/UserManagementHelper';
import { MessageEventBus } from '@/eventbus';
import { eventBus } from '@/eventbus/MessageEventBus/MessageEventBus';

if (process.env.E2E_TESTS !== 'true') {
console.error('E2E endpoints only allowed during E2E tests');
Expand Down Expand Up @@ -80,7 +79,6 @@ const setupUserManagement = async () => {

const resetLogStreaming = async () => {
config.set('enterprise.features.logStreaming', false);
const eventBus = Container.get(MessageEventBus);
for (const id in eventBus.destinations) {
await eventBus.removeDestination(id);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import * as Server from '@/Server';
import { TestWebhooks } from '@/TestWebhooks';
import { getAllInstalledPackages } from '@/CommunityNodes/packageModel';
import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants';
import { eventBus } from '@/eventbus';
import { BaseCommand } from './BaseCommand';
import { InternalHooks } from '@/InternalHooks';
import { License } from '@/License';
import { MessageEventBus } from '@/eventbus';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down Expand Up @@ -133,7 +133,7 @@ export class Start extends BaseCommand {
}

//finally shut down Event Bus
await Container.get(MessageEventBus).close();
await eventBus.close();
} catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error);
}
Expand Down
Loading

0 comments on commit 58fda25

Please sign in to comment.