Skip to content

Commit

Permalink
feat(core): Expose queue metrics for Prometheus (#10559)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Aug 28, 2024
1 parent acfd60a commit 008c510
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 52 deletions.
8 changes: 8 additions & 0 deletions packages/@n8n/config/src/configs/endpoints.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ class PrometheusMetricsConfig {
/** Whether to include metrics derived from n8n's internal events */
@Env('N8N_METRICS_INCLUDE_MESSAGE_EVENT_BUS_METRICS')
includeMessageEventBusMetrics: boolean = false;

/** Whether to include metrics for jobs in scaling mode. Not supported in multi-main setup. */
@Env('N8N_METRICS_INCLUDE_QUEUE_METRICS')
includeQueueMetrics: boolean = false;

/** How often (in seconds) to update queue metrics. */
@Env('N8N_METRICS_QUEUE_METRICS_INTERVAL')
queueMetricsInterval: number = 20;
}

@Config
Expand Down
2 changes: 2 additions & 0 deletions packages/@n8n/config/test/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ describe('GlobalConfig', () => {
includeApiMethodLabel: false,
includeCredentialTypeLabel: false,
includeApiStatusCodeLabel: false,
includeQueueMetrics: false,
queueMetricsInterval: 20,
},
additionalNonUIRoutes: '',
disableProductionWebhooksOnMainProcess: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ describe('TelemetryEventRelay', () => {
includeApiEndpoints: false,
includeCacheMetrics: false,
includeMessageEventBusMetrics: false,
includeQueueMetrics: false,
},
},
});
Expand Down Expand Up @@ -948,6 +949,7 @@ describe('TelemetryEventRelay', () => {
metrics_category_routes: false,
metrics_category_cache: false,
metrics_category_logs: false,
metrics_category_queue: false,
},
}),
);
Expand Down
5 changes: 4 additions & 1 deletion packages/cli/src/events/event.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Service } from 'typedi';
import { TypedEmitter } from '@/TypedEmitter';
import type { RelayEventMap } from './relay-event-map';
import type { QueueMetricsEventMap } from './queue-metrics-event-map';

type EventMap = RelayEventMap & QueueMetricsEventMap;

@Service()
export class EventService extends TypedEmitter<RelayEventMap> {}
export class EventService extends TypedEmitter<EventMap> {}
8 changes: 8 additions & 0 deletions packages/cli/src/events/queue-metrics-event-map.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export type QueueMetricsEventMap = {
'job-counts-updated': {
active: number;
completed: number;
failed: number;
waiting: number;
};
};
1 change: 1 addition & 0 deletions packages/cli/src/events/telemetry-event-relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ export class TelemetryEventRelay extends EventRelay {
metrics_category_routes: this.globalConfig.endpoints.metrics.includeApiEndpoints,
metrics_category_cache: this.globalConfig.endpoints.metrics.includeCacheMetrics,
metrics_category_logs: this.globalConfig.endpoints.metrics.includeMessageEventBusMetrics,
metrics_category_queue: this.globalConfig.endpoints.metrics.includeQueueMetrics,
},
};

Expand Down
152 changes: 105 additions & 47 deletions packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type express from 'express';
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { mockInstance } from '@test/mocking';
import { GlobalConfig } from '@n8n/config';
import type { EventService } from '@/events/event.service';

const mockMiddleware = (
_req: express.Request,
Expand All @@ -22,103 +23,120 @@ describe('PrometheusMetricsService', () => {
endpoints: {
metrics: {
prefix: 'n8n_',
includeDefaultMetrics: true,
includeApiEndpoints: true,
includeCacheMetrics: true,
includeMessageEventBusMetrics: true,
includeDefaultMetrics: false,
includeApiEndpoints: false,
includeCacheMetrics: false,
includeMessageEventBusMetrics: false,
includeCredentialTypeLabel: false,
includeNodeTypeLabel: false,
includeWorkflowIdLabel: false,
includeApiPathLabel: true,
includeApiMethodLabel: true,
includeApiStatusCodeLabel: true,
includeApiPathLabel: false,
includeApiMethodLabel: false,
includeApiStatusCodeLabel: false,
includeQueueMetrics: false,
},
},
});

const app = mock<express.Application>();
const eventBus = mock<MessageEventBus>();
const eventService = mock<EventService>();
const prometheusMetricsService = new PrometheusMetricsService(
mock(),
eventBus,
globalConfig,
eventService,
);

afterEach(() => {
jest.clearAllMocks();
prometheusMetricsService.disableAllMetrics();
});

describe('constructor', () => {
it('should enable metrics based on global config', async () => {
const customGlobalConfig = { ...globalConfig };
customGlobalConfig.endpoints.metrics.includeCacheMetrics = true;
const customPrometheusMetricsService = new PrometheusMetricsService(
mock(),
mock(),
customGlobalConfig,
mock(),
);

await customPrometheusMetricsService.init(app);

expect(promClient.Counter).toHaveBeenCalledWith({
name: 'n8n_cache_hits_total',
help: 'Total number of cache hits.',
labelNames: ['cache'],
});
});
});

describe('init', () => {
it('should set up `n8n_version_info`', async () => {
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);
await prometheusMetricsService.init(app);

await service.init(mock<express.Application>());

expect(promClient.Gauge).toHaveBeenCalledWith({
expect(promClient.Gauge).toHaveBeenNthCalledWith(1, {
name: 'n8n_version_info',
help: 'n8n version info.',
labelNames: ['version', 'major', 'minor', 'patch'],
});
});

it('should set up default metrics collection with `prom-client`', async () => {
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);

await service.init(mock<express.Application>());
prometheusMetricsService.enableMetric('default');
await prometheusMetricsService.init(app);

expect(promClient.collectDefaultMetrics).toHaveBeenCalled();
});

it('should set up `n8n_cache_hits_total`', async () => {
config.set('endpoints.metrics.includeCacheMetrics', true);
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);

await service.init(mock<express.Application>());
prometheusMetricsService.enableMetric('cache');
await prometheusMetricsService.init(app);

expect(promClient.Counter).toHaveBeenCalledWith({
name: 'n8n_cache_hits_total',
help: 'Total number of cache hits.',
labelNames: ['cache'],
});
// @ts-expect-error private field
expect(service.counters.cacheHitsTotal?.inc).toHaveBeenCalledWith(0);
});

it('should set up `n8n_cache_misses_total`', async () => {
config.set('endpoints.metrics.includeCacheMetrics', true);
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);

await service.init(mock<express.Application>());
prometheusMetricsService.enableMetric('cache');
await prometheusMetricsService.init(app);

expect(promClient.Counter).toHaveBeenCalledWith({
name: 'n8n_cache_misses_total',
help: 'Total number of cache misses.',
labelNames: ['cache'],
});
// @ts-expect-error private field
expect(service.counters.cacheMissesTotal?.inc).toHaveBeenCalledWith(0);
});

it('should set up `n8n_cache_updates_total`', async () => {
config.set('endpoints.metrics.includeCacheMetrics', true);
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);

await service.init(mock<express.Application>());
prometheusMetricsService.enableMetric('cache');
await prometheusMetricsService.init(app);

expect(promClient.Counter).toHaveBeenCalledWith({
name: 'n8n_cache_updates_total',
help: 'Total number of cache updates.',
labelNames: ['cache'],
});
// @ts-expect-error private field
expect(service.counters.cacheUpdatesTotal?.inc).toHaveBeenCalledWith(0);
expect(prometheusMetricsService.counters.cacheUpdatesTotal?.inc).toHaveBeenCalledWith(0);
});

it('should set up route metrics with `express-prom-bundle`', async () => {
config.set('endpoints.metrics.includeApiEndpoints', true);
config.set('endpoints.metrics.includeApiPathLabel', true);
config.set('endpoints.metrics.includeApiMethodLabel', true);
config.set('endpoints.metrics.includeApiStatusCodeLabel', true);
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);

const app = mock<express.Application>();

await service.init(app);
prometheusMetricsService.enableMetric('routes');
await prometheusMetricsService.init(app);

expect(promBundle).toHaveBeenCalledWith({
autoregister: false,
includeUp: false,
includePath: true,
includeMethod: true,
includeStatusCode: true,
includePath: false,
includeMethod: false,
includeStatusCode: false,
});

expect(app.use).toHaveBeenCalledWith(
Expand All @@ -137,12 +155,52 @@ describe('PrometheusMetricsService', () => {
});

it('should set up event bus metrics', async () => {
const eventBus = mock<MessageEventBus>();
const service = new PrometheusMetricsService(mock(), eventBus, globalConfig);

await service.init(mock<express.Application>());
prometheusMetricsService.enableMetric('logs');
await prometheusMetricsService.init(app);

expect(eventBus.on).toHaveBeenCalledWith('metrics.eventBus.event', expect.any(Function));
});

it('should set up queue metrics if enabled', async () => {
config.set('executions.mode', 'queue');
prometheusMetricsService.enableMetric('queue');

await prometheusMetricsService.init(app);

// call 1 is for `n8n_version_info` (always enabled)

expect(promClient.Gauge).toHaveBeenNthCalledWith(2, {
name: 'n8n_scaling_mode_queue_jobs_waiting',
help: 'Current number of enqueued jobs waiting for pickup in scaling mode.',
});

expect(promClient.Gauge).toHaveBeenNthCalledWith(3, {
name: 'n8n_scaling_mode_queue_jobs_active',
help: 'Current number of jobs being processed across all workers in scaling mode.',
});

expect(promClient.Counter).toHaveBeenNthCalledWith(1, {
name: 'n8n_scaling_mode_queue_jobs_completed',
help: 'Total number of jobs completed across all workers in scaling mode since instance start.',
});

expect(promClient.Counter).toHaveBeenNthCalledWith(2, {
name: 'n8n_scaling_mode_queue_jobs_failed',
help: 'Total number of jobs failed across all workers in scaling mode since instance start.',
});

expect(eventService.on).toHaveBeenCalledWith('job-counts-updated', expect.any(Function));
});

it('should not set up queue metrics if enabled but not on scaling mode', async () => {
config.set('executions.mode', 'regular');
prometheusMetricsService.enableMetric('queue');

await prometheusMetricsService.init(app);

expect(promClient.Gauge).toHaveBeenCalledTimes(1); // version metric
expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics
expect(eventService.on).not.toHaveBeenCalled();
});
});
});
45 changes: 44 additions & 1 deletion packages/cli/src/metrics/prometheus-metrics.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { N8N_VERSION } from '@/constants';
import type express from 'express';
import promBundle from 'express-prom-bundle';
import promClient, { type Counter } from 'prom-client';
import promClient, { type Counter, type Gauge } from 'prom-client';
import semverParse from 'semver/functions/parse';
import { Service } from 'typedi';

Expand All @@ -11,17 +11,22 @@ import { EventMessageTypeNames } from 'n8n-workflow';
import type { EventMessageTypes } from '@/eventbus';
import type { Includes, MetricCategory, MetricLabel } from './types';
import { GlobalConfig } from '@n8n/config';
import { EventService } from '@/events/event.service';
import config from '@/config';

@Service()
export class PrometheusMetricsService {
constructor(
private readonly cacheService: CacheService,
private readonly eventBus: MessageEventBus,
private readonly globalConfig: GlobalConfig,
private readonly eventService: EventService,
) {}

private readonly counters: { [key: string]: Counter<string> | null } = {};

private readonly gauges: Record<string, Gauge<string>> = {};

private readonly prefix = this.globalConfig.endpoints.metrics.prefix;

private readonly includes: Includes = {
Expand All @@ -30,6 +35,7 @@ export class PrometheusMetricsService {
routes: this.globalConfig.endpoints.metrics.includeApiEndpoints,
cache: this.globalConfig.endpoints.metrics.includeCacheMetrics,
logs: this.globalConfig.endpoints.metrics.includeMessageEventBusMetrics,
queue: this.globalConfig.endpoints.metrics.includeQueueMetrics,
},
labels: {
credentialsType: this.globalConfig.endpoints.metrics.includeCredentialTypeLabel,
Expand All @@ -48,6 +54,7 @@ export class PrometheusMetricsService {
this.initCacheMetrics();
this.initEventBusMetrics();
this.initRouteMetrics(app);
this.initQueueMetrics();
this.mountMetricsEndpoint(app);
}

Expand Down Expand Up @@ -218,6 +225,42 @@ export class PrometheusMetricsService {
});
}

private initQueueMetrics() {
if (!this.includes.metrics.queue || config.getEnv('executions.mode') !== 'queue') return;

this.gauges.waiting = new promClient.Gauge({
name: this.prefix + 'scaling_mode_queue_jobs_waiting',
help: 'Current number of enqueued jobs waiting for pickup in scaling mode.',
});

this.gauges.active = new promClient.Gauge({
name: this.prefix + 'scaling_mode_queue_jobs_active',
help: 'Current number of jobs being processed across all workers in scaling mode.',
});

this.counters.completed = new promClient.Counter({
name: this.prefix + 'scaling_mode_queue_jobs_completed',
help: 'Total number of jobs completed across all workers in scaling mode since instance start.',
});

this.counters.failed = new promClient.Counter({
name: this.prefix + 'scaling_mode_queue_jobs_failed',
help: 'Total number of jobs failed across all workers in scaling mode since instance start.',
});

this.gauges.waiting.set(0);
this.gauges.active.set(0);
this.counters.completed.inc(0);
this.counters.failed.inc(0);

this.eventService.on('job-counts-updated', (jobCounts) => {
this.gauges.waiting.set(jobCounts.waiting);
this.gauges.active.set(jobCounts.active);
this.counters.completed?.inc(jobCounts.completed);
this.counters.failed?.inc(jobCounts.failed);
});
}

private toLabels(event: EventMessageTypes): Record<string, string> {
const { __type, eventName, payload } = event;

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/metrics/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export type MetricCategory = 'default' | 'routes' | 'cache' | 'logs';
export type MetricCategory = 'default' | 'routes' | 'cache' | 'logs' | 'queue';

export type MetricLabel =
| 'credentialsType'
Expand Down
Loading

0 comments on commit 008c510

Please sign in to comment.