Skip to content

Commit

Permalink
refactor(core): Remove event bus helpers (no-changelog) (#9690)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Jun 11, 2024
1 parent 817167c commit cc4e46e
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 73 deletions.
8 changes: 4 additions & 4 deletions packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,20 @@ import type { EventMessageAuditOptions } from '../EventMessageClasses/EventMessa
import { EventMessageAudit } from '../EventMessageClasses/EventMessageAudit';
import type { EventMessageWorkflowOptions } from '../EventMessageClasses/EventMessageWorkflow';
import { EventMessageWorkflow } from '../EventMessageClasses/EventMessageWorkflow';
import { isLogStreamingEnabled } from './MessageEventBusHelper';
import type { EventMessageNodeOptions } from '../EventMessageClasses/EventMessageNode';
import { EventMessageNode } from '../EventMessageClasses/EventMessageNode';
import {
EventMessageGeneric,
eventMessageGenericDestinationTestEvent,
} from '../EventMessageClasses/EventMessageGeneric';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { ExecutionRecoveryService } from '../../executions/execution-recovery.service';
import {
EventMessageAiNode,
type EventMessageAiNodeOptions,
} from '../EventMessageClasses/EventMessageAiNode';
import { License } from '@/License';

export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';

Expand Down Expand Up @@ -69,6 +68,7 @@ export class MessageEventBus extends EventEmitter {
private readonly workflowRepository: WorkflowRepository,
private readonly orchestrationService: OrchestrationService,
private readonly recoveryService: ExecutionRecoveryService,
private readonly license: License,
) {
super();
}
Expand Down Expand Up @@ -329,7 +329,7 @@ export class MessageEventBus extends EventEmitter {
}

private async emitMessage(msg: EventMessageTypes) {
this.emit(METRICS_EVENT_NAME, msg);
this.emit('metrics.messageEventBus.Event', msg);

// generic emit for external modules to capture events
// this is for internal use ONLY and not for use with custom destinations!
Expand All @@ -350,7 +350,7 @@ export class MessageEventBus extends EventEmitter {

shouldSendMsg(msg: EventMessageTypes): boolean {
return (
isLogStreamingEnabled() &&
this.license.isLogStreamingEnabled() &&
Object.keys(this.destinations).length > 0 &&
this.hasAnyDestinationSubscribedToEvent(msg)
);
Expand Down

This file was deleted.

52 changes: 0 additions & 52 deletions packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { EventMessageTypes } from '../EventMessageClasses';
import type { EventMessageConfirmSource } from '../EventMessageClasses/EventMessageConfirm';
import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus';
import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository';
import { License } from '@/License';

export abstract class MessageEventBusDestination implements MessageEventBusDestinationOptions {
// Since you can't have static abstract functions - this just serves as a reminder that you need to implement these. Please.
Expand All @@ -18,6 +19,8 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti

protected readonly logger: Logger;

protected readonly license: License;

__type: MessageEventBusDestinationTypeNames;

label: string;
Expand All @@ -31,7 +34,10 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti
anonymizeAuditMessages: boolean;

constructor(eventBusInstance: MessageEventBus, options: MessageEventBusDestinationOptions) {
// @TODO: Use DI
this.logger = Container.get(Logger);
this.license = Container.get(License);

this.eventBusInstance = eventBusInstance;
this.id = !options.id || options.id.length !== 36 ? uuid() : options.id;
this.__type = options.__type ?? MessageEventBusDestinationTypeNames.abstract;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import type {
MessageEventBusDestinationOptions,
MessageEventBusDestinationSentryOptions,
} from 'n8n-workflow';
import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper';
import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric';
import { N8N_VERSION } from '@/constants';
import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus';
Expand Down Expand Up @@ -57,7 +56,7 @@ export class MessageEventBusDestinationSentry
let sendResult = false;
if (!this.sentryClient) return sendResult;
if (msg.eventName !== eventMessageGenericDestinationTestEvent) {
if (!isLogStreamingEnabled()) return sendResult;
if (!this.license.isLogStreamingEnabled()) return sendResult;
if (!this.hasSubscribedToEvent(msg)) return sendResult;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import type {
} from 'n8n-workflow';
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
import { MessageEventBusDestination } from './MessageEventBusDestination.ee';
import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper';
import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric';
import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus';
import Container from 'typedi';
Expand Down Expand Up @@ -73,7 +72,7 @@ export class MessageEventBusDestinationSyslog
const { msg, confirmCallback } = emitterPayload;
let sendResult = false;
if (msg.eventName !== eventMessageGenericDestinationTestEvent) {
if (!isLogStreamingEnabled()) return sendResult;
if (!this.license.isLogStreamingEnabled()) return sendResult;
if (!this.hasSubscribedToEvent(msg)) return sendResult;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import type {
} from 'n8n-workflow';
import { CredentialsHelper } from '@/CredentialsHelper';
import { Agent as HTTPSAgent } from 'https';
import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper';
import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric';
import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus';
import * as SecretsHelpers from '@/ExternalSecrets/externalSecretsHelper.ee';
Expand Down Expand Up @@ -255,7 +254,7 @@ export class MessageEventBusDestinationWebhook
const { msg, confirmCallback } = emitterPayload;
let sendResult = false;
if (msg.eventName !== eventMessageGenericDestinationTestEvent) {
if (!isLogStreamingEnabled()) return sendResult;
if (!this.license.isLogStreamingEnabled()) return sendResult;
if (!this.hasSubscribedToEvent(msg)) return sendResult;
}
// at first run, build this.requestOptions with the destination settings
Expand Down
1 change: 0 additions & 1 deletion packages/cli/src/eventbus/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
export { EventMessageTypes } from './EventMessageClasses';
export { EventPayloadWorkflow } from './EventMessageClasses/EventMessageWorkflow';
export { METRICS_EVENT_NAME, getLabelsForEvent } from './MessageEventBusDestination/Helpers.ee';
49 changes: 46 additions & 3 deletions packages/cli/src/services/metrics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import { Service } from 'typedi';
import EventEmitter from 'events';

import { CacheService } from '@/services/cache/cache.service';
import { METRICS_EVENT_NAME, getLabelsForEvent, type EventMessageTypes } from '@/eventbus';
import { type EventMessageTypes } from '@/eventbus';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { Logger } from '@/Logger';
import { EventMessageTypeNames } from 'n8n-workflow';

@Service()
export class MetricsService extends EventEmitter {
Expand Down Expand Up @@ -135,7 +136,7 @@ export class MetricsService extends EventEmitter {
const counter = new promClient.Counter({
name: metricName,
help: `Total number of ${event.eventName} events.`,
labelNames: Object.keys(getLabelsForEvent(event)),
labelNames: Object.keys(this.getLabelsForEvent(event)),
});
counter.inc(0);
this.counters[event.eventName] = counter;
Expand All @@ -148,10 +149,52 @@ export class MetricsService extends EventEmitter {
if (!config.getEnv('endpoints.metrics.includeMessageEventBusMetrics')) {
return;
}
this.eventBus.on(METRICS_EVENT_NAME, (event: EventMessageTypes) => {
this.eventBus.on('metrics.messageEventBus.Event', (event: EventMessageTypes) => {
const counter = this.getCounterForEvent(event);
if (!counter) return;
counter.inc(1);
});
}

getLabelsForEvent(event: EventMessageTypes): Record<string, string> {
switch (event.__type) {
case EventMessageTypeNames.audit:
if (event.eventName.startsWith('n8n.audit.user.credentials')) {
return config.getEnv('endpoints.metrics.includeCredentialTypeLabel')
? {
credential_type: this.getLabelValueForCredential(
event.payload.credentialType ?? 'unknown',
),
}
: {};
}

if (event.eventName.startsWith('n8n.audit.workflow')) {
return config.getEnv('endpoints.metrics.includeWorkflowIdLabel')
? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' }
: {};
}
break;

case EventMessageTypeNames.node:
return config.getEnv('endpoints.metrics.includeNodeTypeLabel')
? { node_type: this.getLabelValueForNode(event.payload.nodeType ?? 'unknown') }
: {};

case EventMessageTypeNames.workflow:
return config.getEnv('endpoints.metrics.includeWorkflowIdLabel')
? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' }
: {};
}

return {};
}

getLabelValueForNode(nodeType: string) {
return nodeType.replace('n8n-nodes-', '').replace(/\./g, '_');
}

getLabelValueForCredential(credentialType: string) {
return credentialType.replace(/\./g, '_');
}
}

0 comments on commit cc4e46e

Please sign in to comment.