diff --git a/packages/cli/src/InternalHooks.ts b/packages/cli/src/InternalHooks.ts index 57e7b890fa0bf..ea36dc04b8627 100644 --- a/packages/cli/src/InternalHooks.ts +++ b/packages/cli/src/InternalHooks.ts @@ -25,11 +25,11 @@ import type { import { Telemetry } from '@/telemetry'; import type { AuthProviderType } from '@db/entities/AuthIdentity'; import { RoleService } from './role/role.service'; +import { eventBus } from './eventbus'; import type { User } from '@db/entities/User'; import { N8N_VERSION } from '@/constants'; import * as Db from '@/Db'; import { NodeTypes } from './NodeTypes'; -import { MessageEventBus } from '@/eventbus'; function userToPayload(user: User): { userId: string; @@ -51,11 +51,7 @@ function userToPayload(user: User): { export class InternalHooks implements IInternalHooksClass { private instanceId: string; - constructor( - private telemetry: Telemetry, - private nodeTypes: NodeTypes, - private eventBus: MessageEventBus, - ) {} + constructor(private telemetry: Telemetry, private nodeTypes: NodeTypes) {} async init(instanceId: string) { this.instanceId = instanceId; @@ -115,7 +111,7 @@ export class InternalHooks implements IInternalHooksClass { async onWorkflowCreated(user: User, workflow: IWorkflowBase, publicApi: boolean): Promise { const { nodeGraph } = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes); void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.workflow.created', payload: { ...userToPayload(user), @@ -134,7 +130,7 @@ export class InternalHooks implements IInternalHooksClass { async onWorkflowDeleted(user: User, workflowId: string, publicApi: boolean): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.workflow.deleted', payload: { ...userToPayload(user), @@ -166,7 +162,7 @@ export class InternalHooks implements IInternalHooksClass { } void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.workflow.updated', payload: { ...userToPayload(user), @@ -198,7 +194,7 @@ export class InternalHooks implements IInternalHooksClass { nodeName: string, ): Promise { const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName); - void this.eventBus.sendNodeEvent({ + void eventBus.sendNodeEvent({ eventName: 'n8n.node.started', payload: { executionId, @@ -216,7 +212,7 @@ export class InternalHooks implements IInternalHooksClass { nodeName: string, ): Promise { const nodeInWorkflow = workflow.nodes.find((node) => node.name === nodeName); - void this.eventBus.sendNodeEvent({ + void eventBus.sendNodeEvent({ eventName: 'n8n.node.finished', payload: { executionId, @@ -234,7 +230,7 @@ export class InternalHooks implements IInternalHooksClass { ): Promise { void Promise.all([ Db.collections.Execution.update(executionId, { status: 'running' }), - this.eventBus.sendWorkflowEvent({ + eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.started', payload: { executionId, @@ -253,7 +249,7 @@ export class InternalHooks implements IInternalHooksClass { workflowData?: IWorkflowBase, ): Promise { void Promise.all([ - this.eventBus.sendWorkflowEvent({ + eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.crashed', payload: { executionId, @@ -414,7 +410,7 @@ export class InternalHooks implements IInternalHooksClass { promises.push( properties.success - ? this.eventBus.sendWorkflowEvent({ + ? eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.success', payload: { executionId, @@ -425,7 +421,7 @@ export class InternalHooks implements IInternalHooksClass { workflowName: workflow.name, }, }) - : this.eventBus.sendWorkflowEvent({ + : eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.failed', payload: { executionId, @@ -473,7 +469,7 @@ export class InternalHooks implements IInternalHooksClass { publicApi: boolean; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.deleted', payload: { ...userToPayload(userDeletionData.user), @@ -494,7 +490,7 @@ export class InternalHooks implements IInternalHooksClass { email_sent: boolean; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.invited', payload: { ...userToPayload(userInviteData.user), @@ -516,7 +512,7 @@ export class InternalHooks implements IInternalHooksClass { public_api: boolean; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.reinvited', payload: { ...userToPayload(userReinviteData.user), @@ -575,7 +571,7 @@ export class InternalHooks implements IInternalHooksClass { async onUserUpdate(userUpdateData: { user: User; fields_changed: string[] }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.updated', payload: { ...userToPayload(userUpdateData.user), @@ -594,7 +590,7 @@ export class InternalHooks implements IInternalHooksClass { invitee: User; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.invitation.accepted', payload: { invitee: { @@ -613,7 +609,7 @@ export class InternalHooks implements IInternalHooksClass { async onUserPasswordResetEmailClick(userPasswordResetData: { user: User }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.reset', payload: { ...userToPayload(userPasswordResetData.user), @@ -647,7 +643,7 @@ export class InternalHooks implements IInternalHooksClass { async onApiKeyDeleted(apiKeyDeletedData: { user: User; public_api: boolean }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.api.deleted', payload: { ...userToPayload(apiKeyDeletedData.user), @@ -662,7 +658,7 @@ export class InternalHooks implements IInternalHooksClass { async onApiKeyCreated(apiKeyCreatedData: { user: User; public_api: boolean }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.api.created', payload: { ...userToPayload(apiKeyCreatedData.user), @@ -677,7 +673,7 @@ export class InternalHooks implements IInternalHooksClass { async onUserPasswordResetRequestClick(userPasswordResetData: { user: User }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.reset.requested', payload: { ...userToPayload(userPasswordResetData.user), @@ -701,7 +697,7 @@ export class InternalHooks implements IInternalHooksClass { }, ): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.signedup', payload: { ...userToPayload(user), @@ -720,7 +716,7 @@ export class InternalHooks implements IInternalHooksClass { public_api: boolean; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.email.failed', payload: { messageType: failedEmailData.message_type, @@ -745,7 +741,7 @@ export class InternalHooks implements IInternalHooksClass { public_api: boolean; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.credentials.created', payload: { ...userToPayload(userCreatedCredentialsData.user), @@ -773,7 +769,7 @@ export class InternalHooks implements IInternalHooksClass { sharees_removed: number | null; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.user.credentials.shared', payload: { ...userToPayload(userSharedCredentialsData.user), @@ -813,7 +809,7 @@ export class InternalHooks implements IInternalHooksClass { failure_reason?: string; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.package.installed', payload: { ...userToPayload(installationData.user), @@ -851,7 +847,7 @@ export class InternalHooks implements IInternalHooksClass { package_author_email?: string; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.package.updated', payload: { ...userToPayload(updateData.user), @@ -884,7 +880,7 @@ export class InternalHooks implements IInternalHooksClass { package_author_email?: string; }): Promise { void Promise.all([ - this.eventBus.sendAuditEvent({ + eventBus.sendAuditEvent({ eventName: 'n8n.audit.package.deleted', payload: { ...userToPayload(deleteData.user), diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 256b129f4b925..5bffa178c7e08 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -146,6 +146,7 @@ import { configureMetrics } from './metrics'; import { setupBasicAuth } from './middlewares/basicAuth'; import { setupExternalJWTAuth } from './middlewares/externalJWTAuth'; import { PostHogClient } from './posthog'; +import { eventBus } from './eventbus'; import { Container } from 'typedi'; import { InternalHooks } from './InternalHooks'; import { @@ -156,7 +157,6 @@ import { getSamlLoginLabel, isSamlLoginEnabled, isSamlLicensed } from './sso/sam import { SamlController } from './sso/saml/routes/saml.controller.ee'; import { SamlService } from './sso/saml/saml.service.ee'; import { LdapManager } from './Ldap/LdapManager.ee'; -import { MessageEventBus } from '@/eventbus'; const exec = promisify(callbackExec); @@ -365,7 +365,7 @@ class Server extends AbstractServer { return this.frontendSettings; } - private async registerControllers(ignoredEndpoints: Readonly) { + private registerControllers(ignoredEndpoints: Readonly) { const { app, externalHooks, activeWorkflowRunner, nodeTypes } = this; const repositories = Db.collections; setupAuthMiddlewares(app, ignoredEndpoints, this.restEndpoint, repositories.User); @@ -376,11 +376,8 @@ class Server extends AbstractServer { const postHog = this.postHog; const samlService = Container.get(SamlService); - const eventBus = Container.get(MessageEventBus); - await eventBus.initialize(); - const controllers: object[] = [ - new EventBusController(eventBus), + new EventBusController(), new AuthController({ config, internalHooks, repositories, logger, postHog }), new OwnerController({ config, internalHooks, repositories, logger }), new MeController({ externalHooks, internalHooks, repositories, logger }), @@ -500,7 +497,7 @@ class Server extends AbstractServer { await handleLdapInit(); - await this.registerControllers(ignoredEndpoints); + this.registerControllers(ignoredEndpoints); this.app.use(`/${this.restEndpoint}/credentials`, credentialsController); @@ -1226,6 +1223,14 @@ class Server extends AbstractServer { ), ); + // ---------------------------------------- + // EventBus Setup + // ---------------------------------------- + + if (!eventBus.isInitialized) { + await eventBus.initialize(); + } + // ---------------------------------------- // Webhooks // ---------------------------------------- diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index ab7877fff3b8b..16683b57c9acc 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -55,7 +55,7 @@ import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; import { initErrorHandling } from '@/ErrorReporting'; import { PermissionChecker } from '@/UserManagement/PermissionChecker'; import { Push } from '@/push'; -import { MessageEventBus } from '@/eventbus'; +import { eventBus } from './eventbus'; import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents'; import { Container } from 'typedi'; import { InternalHooks } from './InternalHooks'; @@ -67,12 +67,9 @@ export class WorkflowRunner { jobQueue: JobQueue; - eventBus: MessageEventBus; - constructor() { this.push = Container.get(Push); this.activeExecutions = Container.get(ActiveExecutions); - this.eventBus = Container.get(MessageEventBus); } /** @@ -119,7 +116,7 @@ export class WorkflowRunner { // does contain those messages. try { // Search for messages for this executionId in event logs - const eventLogMessages = await this.eventBus.getEventsByExecutionId(executionId); + const eventLogMessages = await eventBus.getEventsByExecutionId(executionId); // Attempt to recover more better runData from these messages (but don't update the execution db entry yet) if (eventLogMessages.length > 0) { const eventLogExecutionData = await recoverExecutionDataFromEventLogMessages( diff --git a/packages/cli/src/api/e2e.api.ts b/packages/cli/src/api/e2e.api.ts index 02c3fcf2297be..befd913114449 100644 --- a/packages/cli/src/api/e2e.api.ts +++ b/packages/cli/src/api/e2e.api.ts @@ -4,7 +4,6 @@ /* eslint-disable @typescript-eslint/no-unsafe-argument */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/naming-convention */ -import { Container } from 'typedi'; import { Router } from 'express'; import bodyParser from 'body-parser'; import { v4 as uuid } from 'uuid'; @@ -12,7 +11,7 @@ import config from '@/config'; import * as Db from '@/Db'; import type { Role } from '@db/entities/Role'; import { hashPassword } from '@/UserManagement/UserManagementHelper'; -import { MessageEventBus } from '@/eventbus'; +import { eventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; if (process.env.E2E_TESTS !== 'true') { console.error('E2E endpoints only allowed during E2E tests'); @@ -80,7 +79,6 @@ const setupUserManagement = async () => { const resetLogStreaming = async () => { config.set('enterprise.features.logStreaming', false); - const eventBus = Container.get(MessageEventBus); for (const id in eventBus.destinations) { await eventBus.removeDestination(id); } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index d8969a416c38a..db53900de2393 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -25,10 +25,10 @@ import * as Server from '@/Server'; import { TestWebhooks } from '@/TestWebhooks'; import { getAllInstalledPackages } from '@/CommunityNodes/packageModel'; import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants'; +import { eventBus } from '@/eventbus'; import { BaseCommand } from './BaseCommand'; import { InternalHooks } from '@/InternalHooks'; import { License } from '@/License'; -import { MessageEventBus } from '@/eventbus'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -133,7 +133,7 @@ export class Start extends BaseCommand { } //finally shut down Event Bus - await Container.get(MessageEventBus).close(); + await eventBus.close(); } catch (error) { await this.exitWithCrash('There was an error shutting down n8n.', error); } diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 46b88cd37da43..5d63f57b26de3 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -1,4 +1,3 @@ -import { Service } from 'typedi'; import { LoggerProxy } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import type { DeleteResult } from 'typeorm'; @@ -38,9 +37,12 @@ export interface MessageWithCallback { confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void; } -@Service() export class MessageEventBus extends EventEmitter { - private isInitialized = false; + private static instance: MessageEventBus; + + isInitialized: boolean; + + logWriter: MessageEventBusLogWriter; destinations: { [key: string]: MessageEventBusDestination; @@ -48,8 +50,16 @@ export class MessageEventBus extends EventEmitter { private pushIntervalTimer: NodeJS.Timer; - constructor(private logWriter: MessageEventBusLogWriter) { + constructor() { super(); + this.isInitialized = false; + } + + static getInstance(): MessageEventBus { + if (!MessageEventBus.instance) { + MessageEventBus.instance = new MessageEventBus(); + } + return MessageEventBus.instance; } /** @@ -83,7 +93,7 @@ export class MessageEventBus extends EventEmitter { } LoggerProxy.debug('Initializing event writer'); - await this.logWriter.startThread(); + this.logWriter = await MessageEventBusLogWriter.getInstance(); // unsent event check: // - find unsent messages in current event log(s) @@ -92,9 +102,9 @@ export class MessageEventBus extends EventEmitter { LoggerProxy.debug('Checking for unsent event messages'); const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions(); LoggerProxy.debug( - `Start logging into ${this.logWriter.getLogFileName() ?? 'unknown filename'} `, + `Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `, ); - this.logWriter.startLogging(); + this.logWriter?.startLogging(); await this.send(unsentAndUnfinished.unsentMessages); if (Object.keys(unsentAndUnfinished.unfinishedExecutions).length > 0) { @@ -161,7 +171,7 @@ export class MessageEventBus extends EventEmitter { async close() { LoggerProxy.debug('Shutting down event writer...'); - await this.logWriter.close(); + await this.logWriter?.close(); for (const destinationName of Object.keys(this.destinations)) { LoggerProxy.debug( `Shutting down event destination ${this.destinations[destinationName].getId()}...`, @@ -176,7 +186,7 @@ export class MessageEventBus extends EventEmitter { msgs = [msgs]; } for (const msg of msgs) { - this.logWriter.putMessage(msg); + this.logWriter?.putMessage(msg); // if there are no set up destinations, immediately mark the event as sent if (!this.shouldSendMsg(msg)) { this.confirmSent(msg, { id: '0', name: 'eventBus' }); @@ -201,7 +211,7 @@ export class MessageEventBus extends EventEmitter { } confirmSent(msg: EventMessageTypes, source?: EventMessageConfirmSource) { - this.logWriter.confirmMessageSent(msg.id, source); + this.logWriter?.confirmMessageSent(msg.id, source); } private hasAnyDestinationSubscribedToEvent(msg: EventMessageTypes): boolean { @@ -246,7 +256,7 @@ export class MessageEventBus extends EventEmitter { async getEventsFailed(amount = 5): Promise { const result: FailedEventSummary[] = []; try { - const queryResult = await this.logWriter.getMessagesAll(); + const queryResult = await this.logWriter?.getMessagesAll(); const uniques = uniqby(queryResult, 'id'); const filteredExecutionIds = uniques .filter((e) => @@ -286,25 +296,25 @@ export class MessageEventBus extends EventEmitter { } async getEventsAll(): Promise { - const queryResult = await this.logWriter.getMessagesAll(); + const queryResult = await this.logWriter?.getMessagesAll(); const filtered = uniqby(queryResult, 'id'); return filtered; } async getEventsSent(): Promise { - const queryResult = await this.logWriter.getMessagesSent(); + const queryResult = await this.logWriter?.getMessagesSent(); const filtered = uniqby(queryResult, 'id'); return filtered; } async getEventsUnsent(): Promise { - const queryResult = await this.logWriter.getMessagesUnsent(); + const queryResult = await this.logWriter?.getMessagesUnsent(); const filtered = uniqby(queryResult, 'id'); return filtered; } async getUnfinishedExecutions(): Promise> { - const queryResult = await this.logWriter.getUnfinishedExecutions(); + const queryResult = await this.logWriter?.getUnfinishedExecutions(); return queryResult; } @@ -312,7 +322,7 @@ export class MessageEventBus extends EventEmitter { unsentMessages: EventMessageTypes[]; unfinishedExecutions: Record; }> { - const queryResult = await this.logWriter.getUnsentAndUnfinishedExecutions(); + const queryResult = await this.logWriter?.getUnsentAndUnfinishedExecutions(); return queryResult; } @@ -326,7 +336,7 @@ export class MessageEventBus extends EventEmitter { executionId: string, logHistory?: number, ): Promise { - const result = await this.logWriter.getMessagesByExecutionId(executionId, logHistory); + const result = await this.logWriter?.getMessagesByExecutionId(executionId, logHistory); return result; } /** @@ -345,3 +355,5 @@ export class MessageEventBus extends EventEmitter { await this.send(new EventMessageNode(options)); } } + +export const eventBus = MessageEventBus.getInstance(); diff --git a/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts b/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts index 766417eee3de2..af95df2e88cad 100644 --- a/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts +++ b/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts @@ -1,4 +1,3 @@ -import { Container } from 'typedi'; import { parse, stringify } from 'flatted'; import type { IRun, IRunExecutionData, ITaskData } from 'n8n-workflow'; import { NodeOperationError, WorkflowOperationError } from 'n8n-workflow'; @@ -6,11 +5,12 @@ import * as Db from '@/Db'; import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses'; import type { DateTime } from 'luxon'; import { Push } from '@/push'; -import type { IPushDataExecutionRecovered } from '@/Interfaces'; -import { workflowExecutionCompleted } from '@/events/WorkflowStatistics'; +import type { IPushDataExecutionRecovered } from '../../Interfaces'; +import { workflowExecutionCompleted } from '../../events/WorkflowStatistics'; +import { eventBus } from './MessageEventBus'; +import { Container } from 'typedi'; import { InternalHooks } from '@/InternalHooks'; import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; -import { MessageEventBus } from './MessageEventBus'; export async function recoverExecutionDataFromEventLogMessages( executionId: string, @@ -201,7 +201,7 @@ export async function recoverExecutionDataFromEventLogMessages( await workflowExecutionCompleted(executionEntry.workflowData, iRunData); // wait for UI to be back up and send the execution data - Container.get(MessageEventBus).once('editorUiConnected', function handleUiBackUp() { + eventBus.once('editorUiConnected', function handleUiBackUp() { // add a small timeout to make sure the UI is back up setTimeout(() => { Container.get(Push).send('executionRecovered', { diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index 207509272a9f2..87293f6149432 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -1,17 +1,14 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Service } from 'typedi'; -import { once as eventOnce } from 'events'; +import { isEventMessageOptions } from '../EventMessageClasses/AbstractEventMessage'; +import { UserSettings } from 'n8n-core'; import path, { parse } from 'path'; import { Worker } from 'worker_threads'; import { createReadStream, existsSync, rmSync } from 'fs'; import readline from 'readline'; -import { UserSettings } from 'n8n-core'; import { jsonParse, LoggerProxy } from 'n8n-workflow'; import remove from 'lodash.remove'; import config from '@/config'; -import { inTest } from '@/constants'; -import { isEventMessageOptions } from '../EventMessageClasses/AbstractEventMessage'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; import type { EventMessageReturnMode } from '../MessageEventBus/MessageEventBus'; import type { EventMessageTypes } from '../EventMessageClasses'; @@ -20,6 +17,15 @@ import { EventMessageConfirm, isEventMessageConfirm, } from '../EventMessageClasses/EventMessageConfirm'; +import { once as eventOnce } from 'events'; +import { inTest } from '../../constants'; + +interface MessageEventBusLogWriterConstructorOptions { + logBaseName?: string; + logBasePath?: string; + keepNumberOfFiles?: number; + maxFileSizeInKB?: number; +} export interface MessageEventBusLogWriterOptions { logFullBasePath: string; @@ -36,25 +42,43 @@ interface ReadMessagesFromLogFileResult { /** * MessageEventBusWriter for Files */ -@Service() export class MessageEventBusLogWriter { - private options: Required; + private static instance: MessageEventBusLogWriter; - private _worker: Worker | undefined; + static options: Required; - constructor() { - const { keepLogCount, logBaseName, maxFileSizeInKB } = config.get('eventBus.logWriter'); - this.options = { - logFullBasePath: path.join(UserSettings.getUserN8nFolderPath(), logBaseName), - keepNumberOfFiles: keepLogCount, - maxFileSizeInKB, - }; - } + private _worker: Worker | undefined; public get worker(): Worker | undefined { return this._worker; } + /** + * Instantiates the Writer and the corresponding worker thread. + * To actually start logging, call startLogging() function on the instance. + * + * **Note** that starting to log will archive existing logs, so handle unsent events first before calling startLogging() + */ + static async getInstance( + options?: MessageEventBusLogWriterConstructorOptions, + ): Promise { + if (!MessageEventBusLogWriter.instance) { + MessageEventBusLogWriter.instance = new MessageEventBusLogWriter(); + MessageEventBusLogWriter.options = { + logFullBasePath: path.join( + options?.logBasePath ?? UserSettings.getUserN8nFolderPath(), + options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'), + ), + keepNumberOfFiles: + options?.keepNumberOfFiles ?? config.getEnv('eventBus.logWriter.keepLogCount'), + maxFileSizeInKB: + options?.maxFileSizeInKB ?? config.getEnv('eventBus.logWriter.maxFileSizeInKB'), + }; + await MessageEventBusLogWriter.instance.startThread(); + } + return MessageEventBusLogWriter.instance; + } + /** * First archives existing log files one history level upwards, * then starts logging events into a fresh event log @@ -74,19 +98,13 @@ export class MessageEventBusLogWriter { } } - /** - * Instantiates the Writer and the corresponding worker thread. - * To actually start logging, call startLogging() function on the instance. - * - * **Note** that starting to log will archive existing logs, so handle unsent events first before calling startLogging() - */ - async startThread() { + private async startThread() { if (this.worker) { await this.close(); } - await this.spawnThread(); + await MessageEventBusLogWriter.instance.spawnThread(); if (this.worker) { - this.worker.postMessage({ command: 'initialize', data: this.options }); + this.worker.postMessage({ command: 'initialize', data: MessageEventBusLogWriter.options }); } } @@ -102,7 +120,7 @@ export class MessageEventBusLogWriter { if (this.worker) { this.worker.on('messageerror', async (error) => { LoggerProxy.error('Event Bus Log Writer thread error, attempting to restart...', error); - await this.startThread(); + await MessageEventBusLogWriter.instance.startThread(); }); return true; } @@ -217,14 +235,14 @@ export class MessageEventBusLogWriter { getLogFileName(counter?: number): string { if (counter) { - return `${this.options.logFullBasePath}-${counter}.log`; + return `${MessageEventBusLogWriter.options.logFullBasePath}-${counter}.log`; } else { - return `${this.options.logFullBasePath}.log`; + return `${MessageEventBusLogWriter.options.logFullBasePath}.log`; } } cleanAllLogs() { - for (let i = 0; i <= this.options.keepNumberOfFiles; i++) { + for (let i = 0; i <= MessageEventBusLogWriter.options.keepNumberOfFiles; i++) { if (existsSync(this.getLogFileName(i))) { rmSync(this.getLogFileName(i)); } diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts index 655022b900853..2672a3b98784f 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts @@ -90,12 +90,11 @@ if (!isMainThread) { clearInterval(fileStatTimer); break; case 'initialize': - const { logFullBasePath, keepNumberOfFiles, maxFileSizeInKB } = - data as MessageEventBusLogWriterOptions; + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment const settings: MessageEventBusLogWriterOptions = { - logFullBasePath: logFullBasePath ?? '', - keepNumberOfFiles: keepNumberOfFiles ?? 3, - maxFileSizeInKB: maxFileSizeInKB ?? 1024, + logFullBasePath: (data as MessageEventBusLogWriterOptions).logFullBasePath ?? '', + keepNumberOfFiles: (data as MessageEventBusLogWriterOptions).keepNumberOfFiles ?? 10, + maxFileSizeInKB: (data as MessageEventBusLogWriterOptions).maxFileSizeInKB ?? 102400, }; setLogFileBasePath(settings.logFullBasePath); setKeepFiles(settings.keepNumberOfFiles); diff --git a/packages/cli/src/eventbus/eventBus.controller.ts b/packages/cli/src/eventbus/eventBus.controller.ts index b493828e9533b..bc18472d4c910 100644 --- a/packages/cli/src/eventbus/eventBus.controller.ts +++ b/packages/cli/src/eventbus/eventBus.controller.ts @@ -6,8 +6,8 @@ import { isEventMessageOptions } from './EventMessageClasses/AbstractEventMessag import { EventMessageGeneric } from './EventMessageClasses/EventMessageGeneric'; import type { EventMessageWorkflowOptions } from './EventMessageClasses/EventMessageWorkflow'; import { EventMessageWorkflow } from './EventMessageClasses/EventMessageWorkflow'; -import { MessageEventBus } from './MessageEventBus/MessageEventBus'; import type { EventMessageReturnMode } from './MessageEventBus/MessageEventBus'; +import { eventBus } from './MessageEventBus/MessageEventBus'; import { isMessageEventBusDestinationSentryOptions, MessageEventBusDestinationSentry, @@ -76,8 +76,6 @@ const isMessageEventBusDestinationOptions = ( @RestController('/eventbus') export class EventBusController { - constructor(private eventBus: MessageEventBus) {} - // ---------------------------------------- // Events // ---------------------------------------- @@ -88,24 +86,24 @@ export class EventBusController { if (isWithQueryString(req.query)) { switch (req.query.query as EventMessageReturnMode) { case 'sent': - return this.eventBus.getEventsSent(); + return eventBus.getEventsSent(); case 'unsent': - return this.eventBus.getEventsUnsent(); + return eventBus.getEventsUnsent(); case 'unfinished': - return this.eventBus.getUnfinishedExecutions(); + return eventBus.getUnfinishedExecutions(); case 'all': default: - return this.eventBus.getEventsAll(); + return eventBus.getEventsAll(); } } else { - return this.eventBus.getEventsAll(); + return eventBus.getEventsAll(); } } @Get('/failed') async getFailedEvents(req: express.Request): Promise { const amount = parseInt(req.query?.amount as string) ?? 5; - return this.eventBus.getEventsFailed(amount); + return eventBus.getEventsFailed(amount); } @Get('/execution/:id') @@ -115,7 +113,7 @@ export class EventBusController { if (req.query?.logHistory) { logHistory = parseInt(req.query.logHistory as string, 10); } - return this.eventBus.getEventsByExecutionId(req.params.id, logHistory); + return eventBus.getEventsByExecutionId(req.params.id, logHistory); } return; } @@ -126,7 +124,7 @@ export class EventBusController { if (req.params?.id) { const logHistory = parseInt(req.query.logHistory as string, 10) || undefined; const applyToDb = req.query.applyToDb !== undefined ? !!req.query.applyToDb : true; - const messages = await this.eventBus.getEventsByExecutionId(id, logHistory); + const messages = await eventBus.getEventsByExecutionId(id, logHistory); if (messages.length > 0) { return recoverExecutionDataFromEventLogMessages(id, messages, applyToDb); } @@ -152,7 +150,7 @@ export class EventBusController { default: msg = new EventMessageGeneric(req.body); } - await this.eventBus.send(msg); + await eventBus.send(msg); } else { throw new BadRequestError( 'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}', @@ -168,9 +166,9 @@ export class EventBusController { @Get('/destination') async getDestination(req: express.Request): Promise { if (isWithIdString(req.query)) { - return this.eventBus.findDestination(req.query.id); + return eventBus.findDestination(req.query.id); } else { - return this.eventBus.findDestination(); + return eventBus.findDestination(); } } @@ -185,22 +183,22 @@ export class EventBusController { switch (req.body.__type) { case MessageEventBusDestinationTypeNames.sentry: if (isMessageEventBusDestinationSentryOptions(req.body)) { - result = await this.eventBus.addDestination( - new MessageEventBusDestinationSentry(this.eventBus, req.body), + result = await eventBus.addDestination( + new MessageEventBusDestinationSentry(eventBus, req.body), ); } break; case MessageEventBusDestinationTypeNames.webhook: if (isMessageEventBusDestinationWebhookOptions(req.body)) { - result = await this.eventBus.addDestination( - new MessageEventBusDestinationWebhook(this.eventBus, req.body), + result = await eventBus.addDestination( + new MessageEventBusDestinationWebhook(eventBus, req.body), ); } break; case MessageEventBusDestinationTypeNames.syslog: if (isMessageEventBusDestinationSyslogOptions(req.body)) { - result = await this.eventBus.addDestination( - new MessageEventBusDestinationSyslog(this.eventBus, req.body), + result = await eventBus.addDestination( + new MessageEventBusDestinationSyslog(eventBus, req.body), ); } break; @@ -225,7 +223,7 @@ export class EventBusController { @Get('/testmessage') async sendTestMessage(req: express.Request): Promise { if (isWithIdString(req.query)) { - return this.eventBus.testDestination(req.query.id); + return eventBus.testDestination(req.query.id); } return false; } @@ -236,7 +234,7 @@ export class EventBusController { throw new ResponseHelper.UnauthorizedError('Invalid request'); } if (isWithIdString(req.query)) { - return this.eventBus.removeDestination(req.query.id); + return eventBus.removeDestination(req.query.id); } else { throw new BadRequestError('Query is missing id'); } diff --git a/packages/cli/src/eventbus/index.ts b/packages/cli/src/eventbus/index.ts index 563705976d68d..1b3b48d7af4ba 100644 --- a/packages/cli/src/eventbus/index.ts +++ b/packages/cli/src/eventbus/index.ts @@ -1 +1 @@ -export { MessageEventBus } from './MessageEventBus/MessageEventBus'; +export { eventBus } from './MessageEventBus/MessageEventBus'; diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index c789cd7a52f53..a9022bdc5cbd5 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -1,24 +1,17 @@ -import { Container } from 'typedi'; import { jsonStringify, LoggerProxy as Logger } from 'n8n-workflow'; import type { IPushDataType } from '@/Interfaces'; -import { MessageEventBus } from '@/eventbus'; +import { eventBus } from '../eventbus'; export abstract class AbstractPush { protected connections: Record = {}; - protected eventBus: MessageEventBus; - - constructor() { - this.eventBus = Container.get(MessageEventBus); - } - protected abstract close(connection: T): void; protected abstract sendToOne(connection: T, data: string): void; protected add(sessionId: string, connection: T): void { const { connections } = this; Logger.debug('Add editor-UI session', { sessionId }); - this.eventBus.emit('editorUiConnected', sessionId); + eventBus.emit('editorUiConnected', sessionId); const existingConnection = connections[sessionId]; if (existingConnection) { diff --git a/packages/cli/test/integration/eventbus.test.ts b/packages/cli/test/integration/eventbus.test.ts index 8ad83f0fc37ee..d456f86799884 100644 --- a/packages/cli/test/integration/eventbus.test.ts +++ b/packages/cli/test/integration/eventbus.test.ts @@ -1,4 +1,3 @@ -import { Container } from 'typedi'; import express from 'express'; import config from '@/config'; import axios from 'axios'; @@ -17,7 +16,7 @@ import { MessageEventBusDestinationSyslogOptions, MessageEventBusDestinationWebhookOptions, } from 'n8n-workflow'; -import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; +import { eventBus } from '@/eventbus'; import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric'; import { MessageEventBusDestinationSyslog } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; import { MessageEventBusDestinationWebhook } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; @@ -65,8 +64,6 @@ const testSentryDestination: MessageEventBusDestinationSentryOptions = { subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], }; -const eventBus = Container.get(MessageEventBus); - async function confirmIdInAll(id: string) { const sent = await eventBus.getEventsAll(); expect(sent.length).toBeGreaterThan(0); diff --git a/packages/cli/test/integration/shared/utils.ts b/packages/cli/test/integration/shared/utils.ts index c3f77fd9b42d6..e8bf676ba7125 100644 --- a/packages/cli/test/integration/shared/utils.ts +++ b/packages/cli/test/integration/shared/utils.ts @@ -81,7 +81,6 @@ import { setSamlLoginEnabled } from '@/sso/saml/samlHelpers'; import { SamlService } from '@/sso/saml/saml.service.ee'; import { SamlController } from '@/sso/saml/routes/saml.controller.ee'; import { EventBusController } from '@/eventbus/eventBus.controller'; -import { MessageEventBus } from '@/eventbus'; export const mockInstance = ( ctor: new (...args: any[]) => T, @@ -177,8 +176,7 @@ export async function initTestServer({ for (const group of functionEndpoints) { switch (group) { case 'eventBus': - const eventBus = Container.get(MessageEventBus); - registerController(testServer.app, config, new EventBusController(eventBus)); + registerController(testServer.app, config, new EventBusController()); break; case 'auth': registerController( diff --git a/packages/cli/test/unit/ActiveWorkflowRunner.test.ts b/packages/cli/test/unit/ActiveWorkflowRunner.test.ts index 052cbf880acf9..e3290446e0f4f 100644 --- a/packages/cli/test/unit/ActiveWorkflowRunner.test.ts +++ b/packages/cli/test/unit/ActiveWorkflowRunner.test.ts @@ -29,7 +29,6 @@ import { mockInstance } from '../integration/shared/utils'; import { Push } from '@/push'; import { ActiveExecutions } from '@/ActiveExecutions'; import { NodeTypes } from '@/NodeTypes'; -import { MessageEventBus } from '@/eventbus'; /** * TODO: @@ -156,7 +155,6 @@ describe('ActiveWorkflowRunner', () => { }; Container.set(LoadNodesAndCredentials, nodesAndCredentials); mockInstance(Push); - mockInstance(MessageEventBus); }); beforeEach(() => {