Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Support redis cluster in queue mode #6708

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 50 additions & 27 deletions packages/cli/src/AbstractServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import parseUrl from 'parseurl';
import type { RedisOptions } from 'ioredis';

import type { WebhookHttpMethod } from 'n8n-workflow';
import { LoggerProxy as Logger } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import config from '@/config';
import { N8N_VERSION, inDevelopment } from '@/constants';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
Expand All @@ -27,6 +27,7 @@ import { corsMiddleware } from '@/middlewares';
import { TestWebhooks } from '@/TestWebhooks';
import { WaitingWebhooks } from '@/WaitingWebhooks';
import { WEBHOOK_METHODS } from '@/WebhookHelpers';
import { getRedisClusterNodes } from './GenericHelpers';

const emptyBuffer = Buffer.alloc(0);

Expand Down Expand Up @@ -187,42 +188,64 @@ export abstract class AbstractServer {
let lastTimer = 0;
let cumulativeTimeout = 0;
const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis');
const clusterNodes = getRedisClusterNodes();
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');

const redis = new Redis({
host,
port,
db,
const usesRedisCluster = clusterNodes.length > 0;
LoggerProxy.debug(
usesRedisCluster
? `Initialising Redis cluster connection with nodes: ${clusterNodes
.map((e) => `${e.host}:${e.port}`)
.join(',')}`
: `Initialising Redis client connection with host: ${host ?? 'localhost'} and port: ${
port ?? '6379'
}`,
);
const sharedRedisOptions: RedisOptions = {
username,
password,
retryStrategy: (): number | null => {
const now = Date.now();
if (now - lastTimer > 30000) {
// Means we had no timeout at all or last timeout was temporary and we recovered
lastTimer = now;
cumulativeTimeout = 0;
} else {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
Logger.error(
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
);
process.exit(1);
}
}
return 500;
},
});
db,
enableReadyCheck: false,
maxRetriesPerRequest: null,
};
const redis = usesRedisCluster
? new Redis.Cluster(
clusterNodes.map((node) => ({ host: node.host, port: node.port })),
{
redisOptions: sharedRedisOptions,
},
)
: new Redis({
host,
port,
...sharedRedisOptions,
retryStrategy: (): number | null => {
const now = Date.now();
if (now - lastTimer > 30000) {
// Means we had no timeout at all or last timeout was temporary and we recovered
lastTimer = now;
cumulativeTimeout = 0;
} else {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
LoggerProxy.error(
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
);
process.exit(1);
}
}
return 500;
},
});

redis.on('close', () => {
Logger.warn('Redis unavailable - trying to reconnect...');
LoggerProxy.warn('Redis unavailable - trying to reconnect...');
});

redis.on('error', (error) => {
if (!String(error).includes('ECONNREFUSED')) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
Logger.warn('Error with Redis: ', error);
LoggerProxy.warn('Error with Redis: ', error);
}
});
}
Expand Down
24 changes: 24 additions & 0 deletions packages/cli/src/GenericHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,28 @@ export async function createErrorExecution(
await Container.get(ExecutionRepository).createNewExecution(fullExecutionData);
}

export function getRedisClusterNodes(): Array<{ host: string; port: number }> {
const clusterNodePairs = config
.getEnv('queue.bull.redis.clusterNodes')
.split(',')
.filter((e) => e);
return clusterNodePairs.map((pair) => {
const [host, port] = pair.split(':');
return { host, port: parseInt(port) };
});
}

export function getRedisPrefix(): string {
let prefix = config.getEnv('queue.bull.prefix');
if (prefix && getRedisClusterNodes().length > 0) {
if (!prefix.startsWith('{')) {
prefix = '{' + prefix;
}
if (!prefix.endsWith('}')) {
prefix += '}';
}
}
return prefix;
}

export const DEFAULT_EXECUTIONS_GET_ALL_LIMIT = 20;
52 changes: 44 additions & 8 deletions packages/cli/src/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type Bull from 'bull';
import type { RedisOptions } from 'ioredis';
import { type RedisOptions } from 'ioredis';
import { Service } from 'typedi';
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import { LoggerProxy, type IExecuteResponsePromiseData } from 'n8n-workflow';
import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
import * as WebhookHelpers from '@/WebhookHelpers';
import { getRedisClusterNodes, getRedisPrefix } from './GenericHelpers';

export type JobId = Bull.JobId;
export type Job = Bull.Job<JobData>;
Expand All @@ -31,19 +32,54 @@ export class Queue {
constructor(private activeExecutions: ActiveExecutions) {}

async init() {
const prefix = config.getEnv('queue.bull.prefix');
const redisOptions: RedisOptions = config.getEnv('queue.bull.redis');

const prefix = getRedisPrefix();
const clusterNodes = getRedisClusterNodes();
const usesRedisCluster = clusterNodes.length > 0;
const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis');
// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Bull } = await import('bull');

// eslint-disable-next-line @typescript-eslint/naming-convention
const { default: Redis } = await import('ioredis');
// Disabling ready check is necessary as it allows worker to
// quickly reconnect to Redis if Redis crashes or is unreachable
// for some time. With it enabled, worker might take minutes to realize
// redis is back up and resume working.
// More here: https://github.com/OptimalBits/bull/issues/890
// @ts-ignore
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false });

LoggerProxy.debug(
usesRedisCluster
? `Initialising Redis cluster connection with nodes: ${clusterNodes
.map((e) => `${e.host}:${e.port}`)
.join(',')}`
: `Initialising Redis client connection with host: ${host ?? 'localhost'} and port: ${
port ?? '6379'
}`,
);
const sharedRedisOptions: RedisOptions = {
username,
password,
db,
enableReadyCheck: false,
maxRetriesPerRequest: null,
};
this.jobQueue = new Bull('jobs', {
prefix,
createClient: (type, clientConfig) =>
usesRedisCluster
? new Redis.Cluster(
clusterNodes.map((node) => ({ host: node.host, port: node.port })),
{
...clientConfig,
redisOptions: sharedRedisOptions,
},
)
: new Redis({
...clientConfig,
host,
port,
...sharedRedisOptions,
}),
});

this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => {
this.activeExecutions.resolveResponsePromise(
Expand Down
10 changes: 8 additions & 2 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ export const schema = {
},
bull: {
prefix: {
doc: 'Prefix for all queue keys',
doc: 'Prefix for all queue keys (wrap in {} for cluster mode)',
format: String,
default: '',
default: 'bull',
env: 'QUEUE_BULL_PREFIX',
},
redis: {
Expand Down Expand Up @@ -395,6 +395,12 @@ export const schema = {
default: '',
env: 'QUEUE_BULL_REDIS_USERNAME',
},
clusterNodes: {
doc: 'Redis Cluster startup nodes (comma separated list of host:port pairs)',
format: String,
default: '',
env: 'QUEUE_BULL_REDIS_CLUSTER_NODES',
},
},
queueRecoveryInterval: {
doc: 'If > 0 enables an active polling to the queue that can recover for Redis crashes. Given in seconds; 0 is disabled. May increase Redis traffic significantly.',
Expand Down