diff --git a/.github/workflows/e2e-reusable.yml b/.github/workflows/e2e-reusable.yml index 771dfab35a9be..49c37fd4c896b 100644 --- a/.github/workflows/e2e-reusable.yml +++ b/.github/workflows/e2e-reusable.yml @@ -166,8 +166,7 @@ jobs: # We have to provide custom ci-build-id key to make sure that this workflow could be run multiple times # in the same parent workflow ci-build-id: ${{ needs.prepare.outputs.uuid }} - spec: '/__w/n8n/n8n/cypress/${{ inputs.spec }}' - config-file: /__w/n8n/n8n/cypress/cypress.config.js + spec: '${{ inputs.spec }}' env: NODE_OPTIONS: --dns-result-order=ipv4first CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }} diff --git a/packages/@n8n/permissions/src/types.ts b/packages/@n8n/permissions/src/types.ts index 2c6079203aaa9..2720272e6fd75 100644 --- a/packages/@n8n/permissions/src/types.ts +++ b/packages/@n8n/permissions/src/types.ts @@ -6,7 +6,6 @@ export type Resource = | 'credential' | 'externalSecretsProvider' | 'externalSecret' - | 'eventBusEvent' | 'eventBusDestination' | 'ldap' | 'license' @@ -45,7 +44,6 @@ export type EventBusDestinationScope = ResourceScope< 'eventBusDestination', DefaultOperations | 'test' >; -export type EventBusEventScope = ResourceScope<'eventBusEvent', DefaultOperations | 'query'>; export type LdapScope = ResourceScope<'ldap', 'manage' | 'sync'>; export type LicenseScope = ResourceScope<'license', 'manage'>; export type LogStreamingScope = ResourceScope<'logStreaming', 'manage'>; @@ -70,7 +68,6 @@ export type Scope = | CredentialScope | ExternalSecretProviderScope | ExternalSecretScope - | EventBusEventScope | EventBusDestinationScope | LdapScope | LicenseScope diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index fa30e8392d2e2..c6e30d76b8b81 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -47,7 +47,6 @@ import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; import * as ResponseHelper from '@/ResponseHelper'; import { EventBusController } from '@/eventbus/eventBus.controller'; -import { EventBusControllerEE } from '@/eventbus/eventBus.controller.ee'; import { LicenseController } from '@/license/license.controller'; import { setupPushServer, setupPushHandler } from '@/push'; import { isLdapEnabled } from './Ldap/helpers'; @@ -119,7 +118,6 @@ export class Server extends AbstractServer { const controllers: Array> = [ EventBusController, - EventBusControllerEE, AuthController, LicenseController, OAuth1CredentialController, diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 0f1c9b15b5aec..d8f60697a3675 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -21,19 +21,18 @@ import type { EventMessageAuditOptions } from '../EventMessageClasses/EventMessa import { EventMessageAudit } from '../EventMessageClasses/EventMessageAudit'; import type { EventMessageWorkflowOptions } from '../EventMessageClasses/EventMessageWorkflow'; import { EventMessageWorkflow } from '../EventMessageClasses/EventMessageWorkflow'; -import { isLogStreamingEnabled } from './MessageEventBusHelper'; import type { EventMessageNodeOptions } from '../EventMessageClasses/EventMessageNode'; import { EventMessageNode } from '../EventMessageClasses/EventMessageNode'; import { EventMessageGeneric, eventMessageGenericDestinationTestEvent, } from '../EventMessageClasses/EventMessageGeneric'; -import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import { ExecutionRecoveryService } from '../../executions/execution-recovery.service'; import { EventMessageAiNode, type EventMessageAiNodeOptions, } from '../EventMessageClasses/EventMessageAiNode'; +import { License } from '@/License'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -66,6 +65,7 @@ export class MessageEventBus extends EventEmitter { private readonly workflowRepository: WorkflowRepository, private readonly orchestrationService: OrchestrationService, private readonly recoveryService: ExecutionRecoveryService, + private readonly license: License, ) { super(); } @@ -315,7 +315,7 @@ export class MessageEventBus extends EventEmitter { } private async emitMessage(msg: EventMessageTypes) { - this.emit(METRICS_EVENT_NAME, msg); + this.emit('metrics.messageEventBus.Event', msg); // generic emit for external modules to capture events // this is for internal use ONLY and not for use with custom destinations! @@ -336,7 +336,7 @@ export class MessageEventBus extends EventEmitter { shouldSendMsg(msg: EventMessageTypes): boolean { return ( - isLogStreamingEnabled() && + this.license.isLogStreamingEnabled() && Object.keys(this.destinations).length > 0 && this.hasAnyDestinationSubscribedToEvent(msg) ); diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBusHelper.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBusHelper.ts deleted file mode 100644 index 29eab2872aa5e..0000000000000 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBusHelper.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { License } from '@/License'; -import { Container } from 'typedi'; - -export function isLogStreamingEnabled(): boolean { - const license = Container.get(License); - return license.isLogStreamingEnabled(); -} diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts deleted file mode 100644 index 33a6e54bccee4..0000000000000 --- a/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { EventMessageTypeNames } from 'n8n-workflow'; -import config from '@/config'; -import type { EventMessageTypes } from '../EventMessageClasses'; - -export const METRICS_EVENT_NAME = 'metrics.messageEventBus.Event'; - -export function getMetricNameForEvent(event: EventMessageTypes): string { - const prefix = config.getEnv('endpoints.metrics.prefix'); - return prefix + event.eventName.replace('n8n.', '').replace(/\./g, '_') + '_total'; -} - -export function getLabelValueForNode(nodeType: string): string { - return nodeType.replace('n8n-nodes-', '').replace(/\./g, '_'); -} - -export function getLabelValueForCredential(credentialType: string): string { - return credentialType.replace(/\./g, '_'); -} - -export function getLabelsForEvent(event: EventMessageTypes): Record { - switch (event.__type) { - case EventMessageTypeNames.audit: - if (event.eventName.startsWith('n8n.audit.user.credentials')) { - return config.getEnv('endpoints.metrics.includeCredentialTypeLabel') - ? { - credential_type: getLabelValueForCredential( - event.payload.credentialType ?? 'unknown', - ), - } - : {}; - } - - if (event.eventName.startsWith('n8n.audit.workflow')) { - return config.getEnv('endpoints.metrics.includeWorkflowIdLabel') - ? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' } - : {}; - } - break; - - case EventMessageTypeNames.node: - return config.getEnv('endpoints.metrics.includeNodeTypeLabel') - ? { node_type: getLabelValueForNode(event.payload.nodeType ?? 'unknown') } - : {}; - - case EventMessageTypeNames.workflow: - return config.getEnv('endpoints.metrics.includeWorkflowIdLabel') - ? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' } - : {}; - } - - return {}; -} diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts index bf68bb859c7a1..6a7fef6d6e47c 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts @@ -8,6 +8,7 @@ import type { EventMessageTypes } from '../EventMessageClasses'; import type { EventMessageConfirmSource } from '../EventMessageClasses/EventMessageConfirm'; import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository'; +import { License } from '@/License'; export abstract class MessageEventBusDestination implements MessageEventBusDestinationOptions { // Since you can't have static abstract functions - this just serves as a reminder that you need to implement these. Please. @@ -18,6 +19,8 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti protected readonly logger: Logger; + protected readonly license: License; + __type: MessageEventBusDestinationTypeNames; label: string; @@ -31,7 +34,10 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti anonymizeAuditMessages: boolean; constructor(eventBusInstance: MessageEventBus, options: MessageEventBusDestinationOptions) { + // @TODO: Use DI this.logger = Container.get(Logger); + this.license = Container.get(License); + this.eventBusInstance = eventBusInstance; this.id = !options.id || options.id.length !== 36 ? uuid() : options.id; this.__type = options.__type ?? MessageEventBusDestinationTypeNames.abstract; diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts index 7c03927aac0e3..25633c3ac628a 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts @@ -7,7 +7,6 @@ import type { MessageEventBusDestinationOptions, MessageEventBusDestinationSentryOptions, } from 'n8n-workflow'; -import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper'; import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric'; import { N8N_VERSION } from '@/constants'; import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; @@ -57,7 +56,7 @@ export class MessageEventBusDestinationSentry let sendResult = false; if (!this.sentryClient) return sendResult; if (msg.eventName !== eventMessageGenericDestinationTestEvent) { - if (!isLogStreamingEnabled()) return sendResult; + if (!this.license.isLogStreamingEnabled()) return sendResult; if (!this.hasSubscribedToEvent(msg)) return sendResult; } try { diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts index ac68d3856a675..f57705319c95c 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts @@ -7,7 +7,6 @@ import type { } from 'n8n-workflow'; import { MessageEventBusDestinationTypeNames } from 'n8n-workflow'; import { MessageEventBusDestination } from './MessageEventBusDestination.ee'; -import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper'; import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric'; import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; import Container from 'typedi'; @@ -73,7 +72,7 @@ export class MessageEventBusDestinationSyslog const { msg, confirmCallback } = emitterPayload; let sendResult = false; if (msg.eventName !== eventMessageGenericDestinationTestEvent) { - if (!isLogStreamingEnabled()) return sendResult; + if (!this.license.isLogStreamingEnabled()) return sendResult; if (!this.hasSubscribedToEvent(msg)) return sendResult; } try { diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts index 6b23c2de78ecb..95e76a854ac1a 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts @@ -14,7 +14,6 @@ import type { } from 'n8n-workflow'; import { CredentialsHelper } from '@/CredentialsHelper'; import { Agent as HTTPSAgent } from 'https'; -import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper'; import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric'; import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; import * as SecretsHelpers from '@/ExternalSecrets/externalSecretsHelper.ee'; @@ -255,7 +254,7 @@ export class MessageEventBusDestinationWebhook const { msg, confirmCallback } = emitterPayload; let sendResult = false; if (msg.eventName !== eventMessageGenericDestinationTestEvent) { - if (!isLogStreamingEnabled()) return sendResult; + if (!this.license.isLogStreamingEnabled()) return sendResult; if (!this.hasSubscribedToEvent(msg)) return sendResult; } // at first run, build this.requestOptions with the destination settings diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index 918e79e523ce3..7565cb481482b 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -97,15 +97,6 @@ export class MessageEventBusLogWriter { } } - /** - * Pauses all logging. Events are still received by the worker, they just are not logged any more - */ - async pauseLogging() { - if (this.worker) { - this.worker.postMessage({ command: 'pauseLogging', data: {} }); - } - } - startRecoveryProcess() { if (this.worker) { this.worker.postMessage({ command: 'startRecoveryProcess', data: {} }); diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts index 53bdc2a829726..4686a1cf3c860 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts @@ -103,10 +103,6 @@ if (!isMainThread) { appendMessageSync(data); parentPort?.postMessage({ command, data: true }); break; - case 'pauseLogging': - loggingPaused = true; - clearInterval(fileStatTimer); - break; case 'initialize': const settings: MessageEventBusLogWriterOptions = { logFullBasePath: (data as MessageEventBusLogWriterOptions).logFullBasePath ?? '', diff --git a/packages/cli/src/eventbus/eventBus.controller.ee.ts b/packages/cli/src/eventbus/eventBus.controller.ee.ts deleted file mode 100644 index 95433a359b334..0000000000000 --- a/packages/cli/src/eventbus/eventBus.controller.ee.ts +++ /dev/null @@ -1,135 +0,0 @@ -import express from 'express'; -import type { - MessageEventBusDestinationWebhookOptions, - MessageEventBusDestinationOptions, -} from 'n8n-workflow'; -import { MessageEventBusDestinationTypeNames } from 'n8n-workflow'; - -import { RestController, Get, Post, Delete, GlobalScope } from '@/decorators'; -import { AuthenticatedRequest } from '@/requests'; -import { BadRequestError } from '@/errors/response-errors/bad-request.error'; - -import { MessageEventBus } from './MessageEventBus/MessageEventBus'; -import { - isMessageEventBusDestinationSentryOptions, - MessageEventBusDestinationSentry, -} from './MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; -import { - isMessageEventBusDestinationSyslogOptions, - MessageEventBusDestinationSyslog, -} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; -import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; -import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee'; -import { logStreamingLicensedMiddleware } from './middleware/logStreamingEnabled.middleware.ee'; - -// ---------------------------------------- -// TypeGuards -// ---------------------------------------- - -const isWithIdString = (candidate: unknown): candidate is { id: string } => { - const o = candidate as { id: string }; - if (!o) return false; - return o.id !== undefined; -}; - -const isMessageEventBusDestinationWebhookOptions = ( - candidate: unknown, -): candidate is MessageEventBusDestinationWebhookOptions => { - const o = candidate as MessageEventBusDestinationWebhookOptions; - if (!o) return false; - return o.url !== undefined; -}; - -const isMessageEventBusDestinationOptions = ( - candidate: unknown, -): candidate is MessageEventBusDestinationOptions => { - const o = candidate as MessageEventBusDestinationOptions; - if (!o) return false; - return o.__type !== undefined; -}; - -// ---------------------------------------- -// Controller -// ---------------------------------------- - -@RestController('/eventbus') -export class EventBusControllerEE { - constructor(private readonly eventBus: MessageEventBus) {} - - // ---------------------------------------- - // Destinations - // ---------------------------------------- - - @Get('/destination', { middlewares: [logStreamingLicensedMiddleware] }) - @GlobalScope('eventBusDestination:list') - async getDestination(req: express.Request): Promise { - if (isWithIdString(req.query)) { - return await this.eventBus.findDestination(req.query.id); - } else { - return await this.eventBus.findDestination(); - } - } - - @Post('/destination', { middlewares: [logStreamingLicensedMiddleware] }) - @GlobalScope('eventBusDestination:create') - async postDestination(req: AuthenticatedRequest): Promise { - let result: MessageEventBusDestination | undefined; - if (isMessageEventBusDestinationOptions(req.body)) { - switch (req.body.__type) { - case MessageEventBusDestinationTypeNames.sentry: - if (isMessageEventBusDestinationSentryOptions(req.body)) { - result = await this.eventBus.addDestination( - new MessageEventBusDestinationSentry(this.eventBus, req.body), - ); - } - break; - case MessageEventBusDestinationTypeNames.webhook: - if (isMessageEventBusDestinationWebhookOptions(req.body)) { - result = await this.eventBus.addDestination( - new MessageEventBusDestinationWebhook(this.eventBus, req.body), - ); - } - break; - case MessageEventBusDestinationTypeNames.syslog: - if (isMessageEventBusDestinationSyslogOptions(req.body)) { - result = await this.eventBus.addDestination( - new MessageEventBusDestinationSyslog(this.eventBus, req.body), - ); - } - break; - default: - throw new BadRequestError( - `Body is missing ${req.body.__type} options or type ${req.body.__type} is unknown`, - ); - } - if (result) { - await result.saveToDb(); - return { - ...result.serialize(), - eventBusInstance: undefined, - }; - } - throw new BadRequestError('There was an error adding the destination'); - } - throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions'); - } - - @Get('/testmessage', { middlewares: [logStreamingLicensedMiddleware] }) - @GlobalScope('eventBusDestination:test') - async sendTestMessage(req: express.Request): Promise { - if (isWithIdString(req.query)) { - return await this.eventBus.testDestination(req.query.id); - } - return false; - } - - @Delete('/destination', { middlewares: [logStreamingLicensedMiddleware] }) - @GlobalScope('eventBusDestination:delete') - async deleteDestination(req: AuthenticatedRequest) { - if (isWithIdString(req.query)) { - return await this.eventBus.removeDestination(req.query.id); - } else { - throw new BadRequestError('Query is missing id'); - } - } -} diff --git a/packages/cli/src/eventbus/eventBus.controller.ts b/packages/cli/src/eventbus/eventBus.controller.ts index 3f73227e47b6a..419c4055aa264 100644 --- a/packages/cli/src/eventbus/eventBus.controller.ts +++ b/packages/cli/src/eventbus/eventBus.controller.ts @@ -1,112 +1,132 @@ +import { eventNamesAll } from './EventMessageClasses'; import express from 'express'; -import { EventMessageTypeNames } from 'n8n-workflow'; +import type { + MessageEventBusDestinationWebhookOptions, + MessageEventBusDestinationOptions, +} from 'n8n-workflow'; +import { MessageEventBusDestinationTypeNames } from 'n8n-workflow'; -import { RestController, Get, Post, GlobalScope } from '@/decorators'; +import { RestController, Get, Post, Delete, GlobalScope, Licensed } from '@/decorators'; +import { AuthenticatedRequest } from '@/requests'; import { BadRequestError } from '@/errors/response-errors/bad-request.error'; -import { isEventMessageOptions } from './EventMessageClasses/AbstractEventMessage'; -import { EventMessageGeneric } from './EventMessageClasses/EventMessageGeneric'; -import type { EventMessageWorkflowOptions } from './EventMessageClasses/EventMessageWorkflow'; -import { EventMessageWorkflow } from './EventMessageClasses/EventMessageWorkflow'; -import type { EventMessageReturnMode } from './MessageEventBus/MessageEventBus'; import { MessageEventBus } from './MessageEventBus/MessageEventBus'; -import type { EventMessageTypes } from './EventMessageClasses'; -import { eventNamesAll } from './EventMessageClasses'; -import type { EventMessageAuditOptions } from './EventMessageClasses/EventMessageAudit'; -import { EventMessageAudit } from './EventMessageClasses/EventMessageAudit'; -import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode'; -import { EventMessageNode } from './EventMessageClasses/EventMessageNode'; +import { + isMessageEventBusDestinationSentryOptions, + MessageEventBusDestinationSentry, +} from './MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; +import { + isMessageEventBusDestinationSyslogOptions, + MessageEventBusDestinationSyslog, +} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; +import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; +import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee'; -// ---------------------------------------- -// TypeGuards -// ---------------------------------------- +const isWithIdString = (candidate: unknown): candidate is { id: string } => { + const o = candidate as { id: string }; + if (!o) return false; + return o.id !== undefined; +}; -const isWithQueryString = (candidate: unknown): candidate is { query: string } => { - const o = candidate as { query: string }; +const isMessageEventBusDestinationWebhookOptions = ( + candidate: unknown, +): candidate is MessageEventBusDestinationWebhookOptions => { + const o = candidate as MessageEventBusDestinationWebhookOptions; if (!o) return false; - return o.query !== undefined; + return o.url !== undefined; }; -// ---------------------------------------- -// Controller -// ---------------------------------------- +const isMessageEventBusDestinationOptions = ( + candidate: unknown, +): candidate is MessageEventBusDestinationOptions => { + const o = candidate as MessageEventBusDestinationOptions; + if (!o) return false; + return o.__type !== undefined; +}; @RestController('/eventbus') export class EventBusController { constructor(private readonly eventBus: MessageEventBus) {} - // ---------------------------------------- - // Events - // ---------------------------------------- - @Get('/event') - @GlobalScope('eventBusEvent:query') - async getEvents( - req: express.Request, - ): Promise> { - if (isWithQueryString(req.query)) { - switch (req.query.query as EventMessageReturnMode) { - case 'sent': - return await this.eventBus.getEventsSent(); - case 'unsent': - return await this.eventBus.getEventsUnsent(); - case 'unfinished': - return await this.eventBus.getUnfinishedExecutions(); - case 'all': - default: - return await this.eventBus.getEventsAll(); - } - } else { - return await this.eventBus.getEventsAll(); - } + @Get('/eventnames') + async getEventNames(): Promise { + return eventNamesAll; } - @Get('/execution/:id') - @GlobalScope('eventBusEvent:read') - async getEventForExecutionId(req: express.Request): Promise { - if (req.params?.id) { - let logHistory; - if (req.query?.logHistory) { - logHistory = parseInt(req.query.logHistory as string, 10); - } - return await this.eventBus.getEventsByExecutionId(req.params.id, logHistory); + @Licensed('feat:logStreaming') + @Get('/destination') + @GlobalScope('eventBusDestination:list') + async getDestination(req: express.Request): Promise { + if (isWithIdString(req.query)) { + return await this.eventBus.findDestination(req.query.id); + } else { + return await this.eventBus.findDestination(); } - return; } - @Post('/event') - @GlobalScope('eventBusEvent:create') - async postEvent(req: express.Request): Promise { - let msg: EventMessageTypes | undefined; - if (isEventMessageOptions(req.body)) { + @Licensed('feat:logStreaming') + @Post('/destination') + @GlobalScope('eventBusDestination:create') + async postDestination(req: AuthenticatedRequest): Promise { + let result: MessageEventBusDestination | undefined; + if (isMessageEventBusDestinationOptions(req.body)) { switch (req.body.__type) { - case EventMessageTypeNames.workflow: - msg = new EventMessageWorkflow(req.body as EventMessageWorkflowOptions); + case MessageEventBusDestinationTypeNames.sentry: + if (isMessageEventBusDestinationSentryOptions(req.body)) { + result = await this.eventBus.addDestination( + new MessageEventBusDestinationSentry(this.eventBus, req.body), + ); + } break; - case EventMessageTypeNames.audit: - msg = new EventMessageAudit(req.body as EventMessageAuditOptions); + case MessageEventBusDestinationTypeNames.webhook: + if (isMessageEventBusDestinationWebhookOptions(req.body)) { + result = await this.eventBus.addDestination( + new MessageEventBusDestinationWebhook(this.eventBus, req.body), + ); + } break; - case EventMessageTypeNames.node: - msg = new EventMessageNode(req.body as EventMessageNodeOptions); + case MessageEventBusDestinationTypeNames.syslog: + if (isMessageEventBusDestinationSyslogOptions(req.body)) { + result = await this.eventBus.addDestination( + new MessageEventBusDestinationSyslog(this.eventBus, req.body), + ); + } break; - case EventMessageTypeNames.generic: default: - msg = new EventMessageGeneric(req.body); + throw new BadRequestError( + `Body is missing ${req.body.__type} options or type ${req.body.__type} is unknown`, + ); } - await this.eventBus.send(msg); - } else { - throw new BadRequestError( - 'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}', - ); + if (result) { + await result.saveToDb(); + return { + ...result.serialize(), + eventBusInstance: undefined, + }; + } + throw new BadRequestError('There was an error adding the destination'); } - return msg; + throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions'); } - // ---------------------------------------- - // Utilities - // ---------------------------------------- + @Licensed('feat:logStreaming') + @Get('/testmessage') + @GlobalScope('eventBusDestination:test') + async sendTestMessage(req: express.Request): Promise { + if (isWithIdString(req.query)) { + return await this.eventBus.testDestination(req.query.id); + } + return false; + } - @Get('/eventnames') - async getEventNames(): Promise { - return eventNamesAll; + @Licensed('feat:logStreaming') + @Delete('/destination') + @GlobalScope('eventBusDestination:delete') + async deleteDestination(req: AuthenticatedRequest) { + if (isWithIdString(req.query)) { + return await this.eventBus.removeDestination(req.query.id); + } else { + throw new BadRequestError('Query is missing id'); + } } } diff --git a/packages/cli/src/eventbus/index.ts b/packages/cli/src/eventbus/index.ts index b9a271bb81f18..fd3658c0a4a13 100644 --- a/packages/cli/src/eventbus/index.ts +++ b/packages/cli/src/eventbus/index.ts @@ -1,3 +1,2 @@ export { EventMessageTypes } from './EventMessageClasses'; export { EventPayloadWorkflow } from './EventMessageClasses/EventMessageWorkflow'; -export { METRICS_EVENT_NAME, getLabelsForEvent } from './MessageEventBusDestination/Helpers.ee'; diff --git a/packages/cli/src/eventbus/middleware/logStreamingEnabled.middleware.ee.ts b/packages/cli/src/eventbus/middleware/logStreamingEnabled.middleware.ee.ts deleted file mode 100644 index 4589032ca135d..0000000000000 --- a/packages/cli/src/eventbus/middleware/logStreamingEnabled.middleware.ee.ts +++ /dev/null @@ -1,15 +0,0 @@ -import type { RequestHandler } from 'express'; -import Container from 'typedi'; -import { License } from '../../License'; - -export function islogStreamingLicensed(): boolean { - return Container.get(License).isLogStreamingEnabled(); -} - -export const logStreamingLicensedMiddleware: RequestHandler = (_req, res, next) => { - if (islogStreamingLicensed()) { - next(); - } else { - res.status(403).json({ status: 'error', message: 'Unauthorized' }); - } -}; diff --git a/packages/cli/src/permissions/global-roles.ts b/packages/cli/src/permissions/global-roles.ts index 9824ec1bee062..ad930dfdd21d8 100644 --- a/packages/cli/src/permissions/global-roles.ts +++ b/packages/cli/src/permissions/global-roles.ts @@ -14,12 +14,6 @@ export const GLOBAL_OWNER_SCOPES: Scope[] = [ 'communityPackage:uninstall', 'communityPackage:update', 'communityPackage:list', - 'eventBusEvent:create', - 'eventBusEvent:read', - 'eventBusEvent:update', - 'eventBusEvent:delete', - 'eventBusEvent:query', - 'eventBusEvent:create', 'eventBusDestination:create', 'eventBusDestination:read', 'eventBusDestination:update', @@ -81,7 +75,6 @@ export const GLOBAL_OWNER_SCOPES: Scope[] = [ export const GLOBAL_ADMIN_SCOPES = GLOBAL_OWNER_SCOPES.concat(); export const GLOBAL_MEMBER_SCOPES: Scope[] = [ - 'eventBusEvent:read', 'eventBusDestination:list', 'eventBusDestination:test', 'tag:create', diff --git a/packages/cli/src/services/metrics.service.ts b/packages/cli/src/services/metrics.service.ts index aae4652e56e77..edee289de169f 100644 --- a/packages/cli/src/services/metrics.service.ts +++ b/packages/cli/src/services/metrics.service.ts @@ -8,9 +8,10 @@ import { Service } from 'typedi'; import EventEmitter from 'events'; import { CacheService } from '@/services/cache/cache.service'; -import { METRICS_EVENT_NAME, getLabelsForEvent, type EventMessageTypes } from '@/eventbus'; +import { type EventMessageTypes } from '@/eventbus'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { Logger } from '@/Logger'; +import { EventMessageTypeNames } from 'n8n-workflow'; @Service() export class MetricsService extends EventEmitter { @@ -135,7 +136,7 @@ export class MetricsService extends EventEmitter { const counter = new promClient.Counter({ name: metricName, help: `Total number of ${event.eventName} events.`, - labelNames: Object.keys(getLabelsForEvent(event)), + labelNames: Object.keys(this.getLabelsForEvent(event)), }); counter.inc(0); this.counters[event.eventName] = counter; @@ -148,10 +149,52 @@ export class MetricsService extends EventEmitter { if (!config.getEnv('endpoints.metrics.includeMessageEventBusMetrics')) { return; } - this.eventBus.on(METRICS_EVENT_NAME, (event: EventMessageTypes) => { + this.eventBus.on('metrics.messageEventBus.Event', (event: EventMessageTypes) => { const counter = this.getCounterForEvent(event); if (!counter) return; counter.inc(1); }); } + + getLabelsForEvent(event: EventMessageTypes): Record { + switch (event.__type) { + case EventMessageTypeNames.audit: + if (event.eventName.startsWith('n8n.audit.user.credentials')) { + return config.getEnv('endpoints.metrics.includeCredentialTypeLabel') + ? { + credential_type: this.getLabelValueForCredential( + event.payload.credentialType ?? 'unknown', + ), + } + : {}; + } + + if (event.eventName.startsWith('n8n.audit.workflow')) { + return config.getEnv('endpoints.metrics.includeWorkflowIdLabel') + ? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' } + : {}; + } + break; + + case EventMessageTypeNames.node: + return config.getEnv('endpoints.metrics.includeNodeTypeLabel') + ? { node_type: this.getLabelValueForNode(event.payload.nodeType ?? 'unknown') } + : {}; + + case EventMessageTypeNames.workflow: + return config.getEnv('endpoints.metrics.includeWorkflowIdLabel') + ? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' } + : {}; + } + + return {}; + } + + getLabelValueForNode(nodeType: string) { + return nodeType.replace('n8n-nodes-', '').replace(/\./g, '_'); + } + + getLabelValueForCredential(credentialType: string) { + return credentialType.replace(/\./g, '_'); + } } diff --git a/packages/cli/test/integration/shared/utils/testServer.ts b/packages/cli/test/integration/shared/utils/testServer.ts index 7110366f6568b..4968ddb3d9dab 100644 --- a/packages/cli/test/integration/shared/utils/testServer.ts +++ b/packages/cli/test/integration/shared/utils/testServer.ts @@ -158,9 +158,7 @@ export const setupTestServer = ({ case 'eventBus': const { EventBusController } = await import('@/eventbus/eventBus.controller'); - const { EventBusControllerEE } = await import('@/eventbus/eventBus.controller.ee'); registerController(app, EventBusController); - registerController(app, EventBusControllerEE); break; case 'auth': diff --git a/packages/editor-ui/src/api/eventbus.ee.ts b/packages/editor-ui/src/api/eventbus.ee.ts index 99a8bf480d010..fa99c38e6c36f 100644 --- a/packages/editor-ui/src/api/eventbus.ee.ts +++ b/packages/editor-ui/src/api/eventbus.ee.ts @@ -47,7 +47,3 @@ export async function getDestinationsFromBackend( ): Promise { return await makeRestApiRequest(context, 'GET', '/eventbus/destination'); } - -export async function getExecutionEvents(context: IRestApiContext, executionId: string) { - return await makeRestApiRequest(context, 'GET', `/eventbus/execution/${executionId}`); -} diff --git a/packages/editor-ui/src/stores/rbac.store.ts b/packages/editor-ui/src/stores/rbac.store.ts index caba9e8634e48..a45d0964ae521 100644 --- a/packages/editor-ui/src/stores/rbac.store.ts +++ b/packages/editor-ui/src/stores/rbac.store.ts @@ -24,7 +24,6 @@ export const useRBACStore = defineStore(STORES.RBAC, () => { orchestration: {}, workersView: {}, eventBusDestination: {}, - eventBusEvent: {}, auditLogs: {}, banner: {}, communityPackage: {}, diff --git a/packages/editor-ui/src/stores/workflows.store.ts b/packages/editor-ui/src/stores/workflows.store.ts index 974b10129d14a..3c6cc81e2609c 100644 --- a/packages/editor-ui/src/stores/workflows.store.ts +++ b/packages/editor-ui/src/stores/workflows.store.ts @@ -34,7 +34,6 @@ import type { } from '@/Interface'; import { defineStore } from 'pinia'; import type { - IAbstractEventMessage, IConnection, IConnections, IDataObject, @@ -1432,15 +1431,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { }); } - async function getExecutionEvents(id: string): Promise { - const rootStore = useRootStore(); - return await makeRestApiRequest( - rootStore.getRestApiContext, - 'GET', - `/eventbus/execution/${id}`, - ); - } - function getBinaryUrl( binaryDataId: string, action: 'view' | 'download', @@ -1651,7 +1641,6 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { fetchExecutionDataById, deleteExecution, addToCurrentExecutions, - getExecutionEvents, getBinaryUrl, setNodePristine, resetChatMessages,