diff --git a/packages/@n8n/config/src/configs/endpoints.config.ts b/packages/@n8n/config/src/configs/endpoints.config.ts index 88efa01d26bb5..9905d9f48aaf0 100644 --- a/packages/@n8n/config/src/configs/endpoints.config.ts +++ b/packages/@n8n/config/src/configs/endpoints.config.ts @@ -49,6 +49,14 @@ class PrometheusMetricsConfig { /** Whether to include metrics derived from n8n's internal events */ @Env('N8N_METRICS_INCLUDE_MESSAGE_EVENT_BUS_METRICS') includeMessageEventBusMetrics: boolean = false; + + /** Whether to include metrics for jobs in scaling mode. Not supported in multi-main setup. */ + @Env('N8N_METRICS_INCLUDE_QUEUE_METRICS') + includeQueueMetrics: boolean = false; + + /** How often (in seconds) to update queue metrics. */ + @Env('N8N_METRICS_QUEUE_METRICS_INTERVAL') + queueMetricsInterval: number = 20; } @Config diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 301b99ca567ba..50e5c3b252910 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -165,6 +165,8 @@ describe('GlobalConfig', () => { includeApiMethodLabel: false, includeCredentialTypeLabel: false, includeApiStatusCodeLabel: false, + includeQueueMetrics: false, + queueMetricsInterval: 20, }, additionalNonUIRoutes: '', disableProductionWebhooksOnMainProcess: false, diff --git a/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts b/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts index a7be9772e4b5d..4c3c1de4b5a62 100644 --- a/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts +++ b/packages/cli/src/events/__tests__/telemetry-event-relay.test.ts @@ -33,6 +33,7 @@ describe('TelemetryEventRelay', () => { includeApiEndpoints: false, includeCacheMetrics: false, includeMessageEventBusMetrics: false, + includeQueueMetrics: false, }, }, }); @@ -948,6 +949,7 @@ describe('TelemetryEventRelay', () => { metrics_category_routes: false, metrics_category_cache: false, metrics_category_logs: false, + metrics_category_queue: false, }, }), ); diff --git a/packages/cli/src/events/event.service.ts b/packages/cli/src/events/event.service.ts index 6744103a07799..8eb61794443b0 100644 --- a/packages/cli/src/events/event.service.ts +++ b/packages/cli/src/events/event.service.ts @@ -1,6 +1,9 @@ import { Service } from 'typedi'; import { TypedEmitter } from '@/TypedEmitter'; import type { RelayEventMap } from './relay-event-map'; +import type { QueueMetricsEventMap } from './queue-metrics-event-map'; + +type EventMap = RelayEventMap & QueueMetricsEventMap; @Service() -export class EventService extends TypedEmitter {} +export class EventService extends TypedEmitter {} diff --git a/packages/cli/src/events/queue-metrics-event-map.ts b/packages/cli/src/events/queue-metrics-event-map.ts new file mode 100644 index 0000000000000..c81360cc64bbb --- /dev/null +++ b/packages/cli/src/events/queue-metrics-event-map.ts @@ -0,0 +1,8 @@ +export type QueueMetricsEventMap = { + 'job-counts-updated': { + active: number; + completed: number; + failed: number; + waiting: number; + }; +}; diff --git a/packages/cli/src/events/telemetry-event-relay.ts b/packages/cli/src/events/telemetry-event-relay.ts index c7ffba499ad3a..e374a4cf7c9f1 100644 --- a/packages/cli/src/events/telemetry-event-relay.ts +++ b/packages/cli/src/events/telemetry-event-relay.ts @@ -768,6 +768,7 @@ export class TelemetryEventRelay extends EventRelay { metrics_category_routes: this.globalConfig.endpoints.metrics.includeApiEndpoints, metrics_category_cache: this.globalConfig.endpoints.metrics.includeCacheMetrics, metrics_category_logs: this.globalConfig.endpoints.metrics.includeMessageEventBusMetrics, + metrics_category_queue: this.globalConfig.endpoints.metrics.includeQueueMetrics, }, }; diff --git a/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts b/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts index 91f99f91b041a..dd79de956ef42 100644 --- a/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts +++ b/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts @@ -7,6 +7,7 @@ import type express from 'express'; import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { mockInstance } from '@test/mocking'; import { GlobalConfig } from '@n8n/config'; +import type { EventService } from '@/events/event.service'; const mockMiddleware = ( _req: express.Request, @@ -22,27 +23,62 @@ describe('PrometheusMetricsService', () => { endpoints: { metrics: { prefix: 'n8n_', - includeDefaultMetrics: true, - includeApiEndpoints: true, - includeCacheMetrics: true, - includeMessageEventBusMetrics: true, + includeDefaultMetrics: false, + includeApiEndpoints: false, + includeCacheMetrics: false, + includeMessageEventBusMetrics: false, includeCredentialTypeLabel: false, includeNodeTypeLabel: false, includeWorkflowIdLabel: false, - includeApiPathLabel: true, - includeApiMethodLabel: true, - includeApiStatusCodeLabel: true, + includeApiPathLabel: false, + includeApiMethodLabel: false, + includeApiStatusCodeLabel: false, + includeQueueMetrics: false, }, }, }); + const app = mock(); + const eventBus = mock(); + const eventService = mock(); + const prometheusMetricsService = new PrometheusMetricsService( + mock(), + eventBus, + globalConfig, + eventService, + ); + + afterEach(() => { + jest.clearAllMocks(); + prometheusMetricsService.disableAllMetrics(); + }); + + describe('constructor', () => { + it('should enable metrics based on global config', async () => { + const customGlobalConfig = { ...globalConfig }; + customGlobalConfig.endpoints.metrics.includeCacheMetrics = true; + const customPrometheusMetricsService = new PrometheusMetricsService( + mock(), + mock(), + customGlobalConfig, + mock(), + ); + + await customPrometheusMetricsService.init(app); + + expect(promClient.Counter).toHaveBeenCalledWith({ + name: 'n8n_cache_hits_total', + help: 'Total number of cache hits.', + labelNames: ['cache'], + }); + }); + }); + describe('init', () => { it('should set up `n8n_version_info`', async () => { - const service = new PrometheusMetricsService(mock(), mock(), globalConfig); + await prometheusMetricsService.init(app); - await service.init(mock()); - - expect(promClient.Gauge).toHaveBeenCalledWith({ + expect(promClient.Gauge).toHaveBeenNthCalledWith(1, { name: 'n8n_version_info', help: 'n8n version info.', labelNames: ['version', 'major', 'minor', 'patch'], @@ -50,48 +86,37 @@ describe('PrometheusMetricsService', () => { }); it('should set up default metrics collection with `prom-client`', async () => { - const service = new PrometheusMetricsService(mock(), mock(), globalConfig); - - await service.init(mock()); + prometheusMetricsService.enableMetric('default'); + await prometheusMetricsService.init(app); expect(promClient.collectDefaultMetrics).toHaveBeenCalled(); }); it('should set up `n8n_cache_hits_total`', async () => { - config.set('endpoints.metrics.includeCacheMetrics', true); - const service = new PrometheusMetricsService(mock(), mock(), globalConfig); - - await service.init(mock()); + prometheusMetricsService.enableMetric('cache'); + await prometheusMetricsService.init(app); expect(promClient.Counter).toHaveBeenCalledWith({ name: 'n8n_cache_hits_total', help: 'Total number of cache hits.', labelNames: ['cache'], }); - // @ts-expect-error private field - expect(service.counters.cacheHitsTotal?.inc).toHaveBeenCalledWith(0); }); it('should set up `n8n_cache_misses_total`', async () => { - config.set('endpoints.metrics.includeCacheMetrics', true); - const service = new PrometheusMetricsService(mock(), mock(), globalConfig); - - await service.init(mock()); + prometheusMetricsService.enableMetric('cache'); + await prometheusMetricsService.init(app); expect(promClient.Counter).toHaveBeenCalledWith({ name: 'n8n_cache_misses_total', help: 'Total number of cache misses.', labelNames: ['cache'], }); - // @ts-expect-error private field - expect(service.counters.cacheMissesTotal?.inc).toHaveBeenCalledWith(0); }); it('should set up `n8n_cache_updates_total`', async () => { - config.set('endpoints.metrics.includeCacheMetrics', true); - const service = new PrometheusMetricsService(mock(), mock(), globalConfig); - - await service.init(mock()); + prometheusMetricsService.enableMetric('cache'); + await prometheusMetricsService.init(app); expect(promClient.Counter).toHaveBeenCalledWith({ name: 'n8n_cache_updates_total', @@ -99,26 +124,19 @@ describe('PrometheusMetricsService', () => { labelNames: ['cache'], }); // @ts-expect-error private field - expect(service.counters.cacheUpdatesTotal?.inc).toHaveBeenCalledWith(0); + expect(prometheusMetricsService.counters.cacheUpdatesTotal?.inc).toHaveBeenCalledWith(0); }); it('should set up route metrics with `express-prom-bundle`', async () => { - config.set('endpoints.metrics.includeApiEndpoints', true); - config.set('endpoints.metrics.includeApiPathLabel', true); - config.set('endpoints.metrics.includeApiMethodLabel', true); - config.set('endpoints.metrics.includeApiStatusCodeLabel', true); - const service = new PrometheusMetricsService(mock(), mock(), globalConfig); - - const app = mock(); - - await service.init(app); + prometheusMetricsService.enableMetric('routes'); + await prometheusMetricsService.init(app); expect(promBundle).toHaveBeenCalledWith({ autoregister: false, includeUp: false, - includePath: true, - includeMethod: true, - includeStatusCode: true, + includePath: false, + includeMethod: false, + includeStatusCode: false, }); expect(app.use).toHaveBeenCalledWith( @@ -137,12 +155,52 @@ describe('PrometheusMetricsService', () => { }); it('should set up event bus metrics', async () => { - const eventBus = mock(); - const service = new PrometheusMetricsService(mock(), eventBus, globalConfig); - - await service.init(mock()); + prometheusMetricsService.enableMetric('logs'); + await prometheusMetricsService.init(app); expect(eventBus.on).toHaveBeenCalledWith('metrics.eventBus.event', expect.any(Function)); }); + + it('should set up queue metrics if enabled', async () => { + config.set('executions.mode', 'queue'); + prometheusMetricsService.enableMetric('queue'); + + await prometheusMetricsService.init(app); + + // call 1 is for `n8n_version_info` (always enabled) + + expect(promClient.Gauge).toHaveBeenNthCalledWith(2, { + name: 'n8n_scaling_mode_queue_jobs_waiting', + help: 'Current number of enqueued jobs waiting for pickup in scaling mode.', + }); + + expect(promClient.Gauge).toHaveBeenNthCalledWith(3, { + name: 'n8n_scaling_mode_queue_jobs_active', + help: 'Current number of jobs being processed across all workers in scaling mode.', + }); + + expect(promClient.Counter).toHaveBeenNthCalledWith(1, { + name: 'n8n_scaling_mode_queue_jobs_completed', + help: 'Total number of jobs completed across all workers in scaling mode since instance start.', + }); + + expect(promClient.Counter).toHaveBeenNthCalledWith(2, { + name: 'n8n_scaling_mode_queue_jobs_failed', + help: 'Total number of jobs failed across all workers in scaling mode since instance start.', + }); + + expect(eventService.on).toHaveBeenCalledWith('job-counts-updated', expect.any(Function)); + }); + + it('should not set up queue metrics if enabled but not on scaling mode', async () => { + config.set('executions.mode', 'regular'); + prometheusMetricsService.enableMetric('queue'); + + await prometheusMetricsService.init(app); + + expect(promClient.Gauge).toHaveBeenCalledTimes(1); // version metric + expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics + expect(eventService.on).not.toHaveBeenCalled(); + }); }); }); diff --git a/packages/cli/src/metrics/prometheus-metrics.service.ts b/packages/cli/src/metrics/prometheus-metrics.service.ts index 32522808fc243..3646f172f183b 100644 --- a/packages/cli/src/metrics/prometheus-metrics.service.ts +++ b/packages/cli/src/metrics/prometheus-metrics.service.ts @@ -1,7 +1,7 @@ import { N8N_VERSION } from '@/constants'; import type express from 'express'; import promBundle from 'express-prom-bundle'; -import promClient, { type Counter } from 'prom-client'; +import promClient, { type Counter, type Gauge } from 'prom-client'; import semverParse from 'semver/functions/parse'; import { Service } from 'typedi'; @@ -11,6 +11,8 @@ import { EventMessageTypeNames } from 'n8n-workflow'; import type { EventMessageTypes } from '@/eventbus'; import type { Includes, MetricCategory, MetricLabel } from './types'; import { GlobalConfig } from '@n8n/config'; +import { EventService } from '@/events/event.service'; +import config from '@/config'; @Service() export class PrometheusMetricsService { @@ -18,10 +20,13 @@ export class PrometheusMetricsService { private readonly cacheService: CacheService, private readonly eventBus: MessageEventBus, private readonly globalConfig: GlobalConfig, + private readonly eventService: EventService, ) {} private readonly counters: { [key: string]: Counter | null } = {}; + private readonly gauges: Record> = {}; + private readonly prefix = this.globalConfig.endpoints.metrics.prefix; private readonly includes: Includes = { @@ -30,6 +35,7 @@ export class PrometheusMetricsService { routes: this.globalConfig.endpoints.metrics.includeApiEndpoints, cache: this.globalConfig.endpoints.metrics.includeCacheMetrics, logs: this.globalConfig.endpoints.metrics.includeMessageEventBusMetrics, + queue: this.globalConfig.endpoints.metrics.includeQueueMetrics, }, labels: { credentialsType: this.globalConfig.endpoints.metrics.includeCredentialTypeLabel, @@ -48,6 +54,7 @@ export class PrometheusMetricsService { this.initCacheMetrics(); this.initEventBusMetrics(); this.initRouteMetrics(app); + this.initQueueMetrics(); this.mountMetricsEndpoint(app); } @@ -218,6 +225,42 @@ export class PrometheusMetricsService { }); } + private initQueueMetrics() { + if (!this.includes.metrics.queue || config.getEnv('executions.mode') !== 'queue') return; + + this.gauges.waiting = new promClient.Gauge({ + name: this.prefix + 'scaling_mode_queue_jobs_waiting', + help: 'Current number of enqueued jobs waiting for pickup in scaling mode.', + }); + + this.gauges.active = new promClient.Gauge({ + name: this.prefix + 'scaling_mode_queue_jobs_active', + help: 'Current number of jobs being processed across all workers in scaling mode.', + }); + + this.counters.completed = new promClient.Counter({ + name: this.prefix + 'scaling_mode_queue_jobs_completed', + help: 'Total number of jobs completed across all workers in scaling mode since instance start.', + }); + + this.counters.failed = new promClient.Counter({ + name: this.prefix + 'scaling_mode_queue_jobs_failed', + help: 'Total number of jobs failed across all workers in scaling mode since instance start.', + }); + + this.gauges.waiting.set(0); + this.gauges.active.set(0); + this.counters.completed.inc(0); + this.counters.failed.inc(0); + + this.eventService.on('job-counts-updated', (jobCounts) => { + this.gauges.waiting.set(jobCounts.waiting); + this.gauges.active.set(jobCounts.active); + this.counters.completed?.inc(jobCounts.completed); + this.counters.failed?.inc(jobCounts.failed); + }); + } + private toLabels(event: EventMessageTypes): Record { const { __type, eventName, payload } = event; diff --git a/packages/cli/src/metrics/types.ts b/packages/cli/src/metrics/types.ts index 6a156d48d7ebc..3b68d5408a122 100644 --- a/packages/cli/src/metrics/types.ts +++ b/packages/cli/src/metrics/types.ts @@ -1,4 +1,4 @@ -export type MetricCategory = 'default' | 'routes' | 'cache' | 'logs'; +export type MetricCategory = 'default' | 'routes' | 'cache' | 'logs' | 'queue'; export type MetricLabel = | 'credentialsType' diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index f87d02b33d0d4..0395ce5577780 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -37,6 +37,12 @@ describe('ScalingService', () => { }, }, }, + endpoints: { + metrics: { + includeQueueMetrics: false, + queueMetricsInterval: 20, + }, + }, }); const instanceSettings = Container.get(InstanceSettings); @@ -73,6 +79,7 @@ describe('ScalingService', () => { mock(), instanceSettings, orchestrationService, + mock(), ); getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount'); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 37436cdb4361e..0803ac1ced796 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -23,6 +23,7 @@ import { GlobalConfig } from '@n8n/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { InstanceSettings } from 'n8n-core'; import { OrchestrationService } from '@/services/orchestration.service'; +import { EventService } from '@/events/event.service'; @Service() export class ScalingService { @@ -38,6 +39,7 @@ export class ScalingService { private readonly executionRepository: ExecutionRepository, private readonly instanceSettings: InstanceSettings, private readonly orchestrationService: OrchestrationService, + private readonly eventService: EventService, ) {} // #region Lifecycle @@ -66,6 +68,8 @@ export class ScalingService { .on('leader-stepdown', () => this.stopQueueRecovery()); } + this.scheduleQueueMetrics(); + this.logger.debug('[ScalingService] Queue setup completed'); } @@ -89,8 +93,9 @@ export class ScalingService { this.logger.debug('[ScalingService] Queue paused'); this.stopQueueRecovery(); + this.stopQueueMetrics(); - this.logger.debug('[ScalingService] Queue recovery stopped'); + this.logger.debug('[ScalingService] Queue recovery and metrics stopped'); let count = 0; @@ -113,6 +118,12 @@ export class ScalingService { // #region Jobs + async getPendingJobCounts() { + const { active, waiting } = await this.queue.getJobCounts(); + + return { active, waiting }; + } + async addJob(jobData: JobData, jobOptions: JobOptions) { const { executionId } = jobData; @@ -246,6 +257,11 @@ export class ScalingService { this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse); } }); + + if (this.isQueueMetricsEnabled) { + this.queue.on('global:completed', () => this.jobCounters.completed++); + this.queue.on('global:failed', () => this.jobCounters.failed++); + } } private isPubSubMessage(candidate: unknown): candidate is PubSubMessage { @@ -282,6 +298,49 @@ export class ScalingService { throw new ApplicationError('This method must be called on a `worker` instance'); } + // #region Queue metrics + + /** Counters for completed and failed jobs, reset on each interval tick. */ + private readonly jobCounters = { completed: 0, failed: 0 }; + + /** Interval for collecting queue metrics to expose via Prometheus. */ + private queueMetricsInterval: NodeJS.Timer | undefined; + + get isQueueMetricsEnabled() { + return ( + this.globalConfig.endpoints.metrics.includeQueueMetrics && + this.instanceType === 'main' && + !this.orchestrationService.isMultiMainSetupEnabled + ); + } + + /** Set up an interval to collect queue metrics and emit them in an event. */ + private scheduleQueueMetrics() { + if (!this.isQueueMetricsEnabled || this.queueMetricsInterval) return; + + this.queueMetricsInterval = setInterval(async () => { + const pendingJobCounts = await this.getPendingJobCounts(); + + this.eventService.emit('job-counts-updated', { + ...pendingJobCounts, // active, waiting + ...this.jobCounters, // completed, failed + }); + + this.jobCounters.completed = 0; + this.jobCounters.failed = 0; + }, this.globalConfig.endpoints.metrics.queueMetricsInterval * Time.seconds.toMilliseconds); + } + + /** Stop collecting queue metrics. */ + private stopQueueMetrics() { + if (this.queueMetricsInterval) { + clearInterval(this.queueMetricsInterval); + this.queueMetricsInterval = undefined; + } + } + + // #endregion + // #region Queue recovery private readonly queueRecoveryContext: QueueRecoveryContext = { diff --git a/packages/cli/test/integration/prometheus-metrics.test.ts b/packages/cli/test/integration/prometheus-metrics.test.ts index f6710ba3a5ec4..c863950e4880b 100644 --- a/packages/cli/test/integration/prometheus-metrics.test.ts +++ b/packages/cli/test/integration/prometheus-metrics.test.ts @@ -6,11 +6,14 @@ import { N8N_VERSION } from '@/constants'; import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; import { setupTestServer } from './shared/utils'; import { GlobalConfig } from '@n8n/config'; +import config from '@/config'; +import { EventService } from '@/events/event.service'; jest.unmock('@/eventbus/message-event-bus/message-event-bus'); const toLines = (response: Response) => response.text.trim().split('\n'); +const eventService = Container.get(EventService); const globalConfig = Container.get(GlobalConfig); globalConfig.endpoints.metrics = { enable: true, @@ -25,6 +28,8 @@ globalConfig.endpoints.metrics = { includeApiPathLabel: true, includeApiMethodLabel: true, includeApiStatusCodeLabel: true, + includeQueueMetrics: true, + queueMetricsInterval: 20, }; const server = setupTestServer({ endpointGroups: ['metrics'] }); @@ -32,7 +37,7 @@ const agent = request.agent(server.app); let prometheusService: PrometheusMetricsService; -describe('Metrics', () => { +describe('PrometheusMetricsService', () => { beforeAll(() => { prometheusService = Container.get(PrometheusMetricsService); }); @@ -222,4 +227,60 @@ describe('Metrics', () => { expect(lines).toContainEqual(expect.stringContaining('path="/webhook-test/some-uuid"')); expect(lines).toContainEqual(expect.stringContaining('status_code="404"')); }); + + it('should return queue metrics if enabled', async () => { + /** + * Arrange + */ + prometheusService.enableMetric('queue'); + config.set('executions.mode', 'queue'); + await prometheusService.init(server.app); + + /** + * Act + */ + const response = await agent.get('/metrics'); + + /** + * Assert + */ + expect(response.status).toEqual(200); + expect(response.type).toEqual('text/plain'); + + const lines = toLines(response); + + expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_waiting 0'); + expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_active 0'); + expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_completed 0'); + expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_failed 0'); + }); + + it('should set queue metrics in response to `job-counts-updated` event', async () => { + /** + * Arrange + */ + prometheusService.enableMetric('queue'); + config.set('executions.mode', 'queue'); + await prometheusService.init(server.app); + + /** + * Act + */ + eventService.emit('job-counts-updated', { waiting: 1, active: 2, completed: 0, failed: 0 }); + + /** + * Assert + */ + const response = await agent.get('/metrics'); + + expect(response.status).toEqual(200); + expect(response.type).toEqual('text/plain'); + + const lines = toLines(response); + + expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_waiting 1'); + expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_active 2'); + expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_completed 0'); + expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_failed 0'); + }); });