Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Centralize CronJob management #10033

Merged
merged 20 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"devDependencies": {
"@types/aws4": "^1.5.1",
"@types/concat-stream": "^2.0.0",
"@types/cron": "~1.7.1",
"@types/express": "^4.17.21",
"@types/lodash": "^4.14.195",
"@types/mime-types": "^2.1.0",
Expand All @@ -40,7 +39,7 @@
"aws4": "1.11.0",
"axios": "1.6.7",
"concat-stream": "2.0.0",
"cron": "1.7.2",
"cron": "3.1.7",
"fast-glob": "3.2.12",
"file-type": "16.5.4",
"form-data": "4.0.0",
Expand Down
65 changes: 19 additions & 46 deletions packages/core/src/ActiveWorkflows.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { Service } from 'typedi';
import { CronJob } from 'cron';

import type {
IGetExecutePollFunctions,
IGetExecuteTriggerFunctions,
INode,
IPollResponse,
ITriggerResponse,
IWorkflowExecuteAdditionalData,
TriggerTime,
Expand All @@ -23,10 +21,13 @@ import {
WorkflowDeactivationError,
} from 'n8n-workflow';

import { ScheduledTaskManager } from './ScheduledTaskManager';
import type { IWorkflowData } from './Interfaces';

@Service()
export class ActiveWorkflows {
constructor(private readonly scheduledTaskManager: ScheduledTaskManager) {}

private activeWorkflows: { [workflowId: string]: IWorkflowData } = {};

/**
Expand Down Expand Up @@ -102,20 +103,15 @@ export class ActiveWorkflows {

if (pollingNodes.length === 0) return;

this.activeWorkflows[workflowId].pollResponses = [];

for (const pollNode of pollingNodes) {
try {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.activeWorkflows[workflowId].pollResponses!.push(
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
),
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
Expand All @@ -138,7 +134,7 @@ export class ActiveWorkflows {
getPollFunctions: IGetExecutePollFunctions,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
): Promise<IPollResponse> {
): Promise<void> {
const pollFunctions = getPollFunctions(workflow, node, additionalData, mode, activation);

const pollTimes = pollFunctions.getNodeParameter('pollTimes') as unknown as {
Expand All @@ -161,7 +157,7 @@ export class ActiveWorkflows {
pollFunctions.__emit(pollResponse);
}
} catch (error) {
// If the poll function failes in the first activation
// If the poll function fails in the first activation
// throw the error back so we let the user know there is
// an issue with the trigger.
if (testingTrigger) {
Expand All @@ -174,11 +170,6 @@ export class ActiveWorkflows {
// Execute the trigger directly to be able to know if it works
await executeTrigger(true);

const timezone = pollFunctions.getTimezone();

// Start the cron-jobs
const cronJobs: CronJob[] = [];

for (const cronTime of cronTimes) {
const cronTimeParts = cronTime.split(' ');
if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) {
Expand All @@ -187,19 +178,8 @@ export class ActiveWorkflows {
);
}

cronJobs.push(new CronJob(cronTime, executeTrigger, undefined, true, timezone));
}

// Stop the cron-jobs
async function closeFunction() {
for (const cronJob of cronJobs) {
cronJob.stop();
}
this.scheduledTaskManager.registerCron(workflow, cronTime, executeTrigger);
}

return {
closeFunction,
};
}

/**
Expand All @@ -211,14 +191,11 @@ export class ActiveWorkflows {
return false;
}

const w = this.activeWorkflows[workflowId];
this.scheduledTaskManager.deregisterCrons(workflowId);

const w = this.activeWorkflows[workflowId];
for (const r of w.triggerResponses ?? []) {
await this.close(r, workflowId, 'trigger');
}

for (const r of w.pollResponses ?? []) {
await this.close(r, workflowId, 'poller');
despairblue marked this conversation as resolved.
Show resolved Hide resolved
await this.closeTrigger(r, workflowId);
}

delete this.activeWorkflows[workflowId];
Expand All @@ -232,11 +209,7 @@ export class ActiveWorkflows {
}
}

private async close(
response: ITriggerResponse | IPollResponse,
workflowId: string,
target: 'trigger' | 'poller',
) {
private async closeTrigger(response: ITriggerResponse, workflowId: string) {
if (!response.closeFunction) return;

try {
Expand All @@ -246,14 +219,14 @@ export class ActiveWorkflows {
Logger.error(
`There was a problem calling "closeFunction" on "${e.node.name}" in workflow "${workflowId}"`,
);
ErrorReporter.error(e, { extra: { target, workflowId } });
ErrorReporter.error(e, { extra: { workflowId } });
return;
}

const error = e instanceof Error ? e : new Error(`${e}`);

throw new WorkflowDeactivationError(
`Failed to deactivate ${target} of workflow ID "${workflowId}": "${error.message}"`,
`Failed to deactivate trigger of workflow ID "${workflowId}": "${error.message}"`,
{ cause: error, workflowId },
);
}
Expand Down
2 changes: 0 additions & 2 deletions packages/core/src/Interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type {
IPollResponse,
ITriggerResponse,
IWorkflowSettings as IWorkflowSettingsWorkflow,
ValidationResult,
Expand All @@ -18,7 +17,6 @@ export interface IWorkflowSettings extends IWorkflowSettingsWorkflow {
}

export interface IWorkflowData {
pollResponses?: IPollResponse[];
triggerResponses?: ITriggerResponse[];
}

Expand Down
22 changes: 13 additions & 9 deletions packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import type {
INodeParameters,
EnsureTypeOptions,
SSHTunnelFunctions,
SchedulingFunctions,
} from 'n8n-workflow';
import {
ExpressionError,
Expand All @@ -114,7 +115,6 @@ import {
createDeferredPromise,
deepCopy,
fileTypeFromMimeType,
getGlobalState,
isObjectEmpty,
isResourceMapperValue,
validateFieldType,
Expand Down Expand Up @@ -157,6 +157,7 @@ import Container from 'typedi';
import type { BinaryData } from './BinaryData/types';
import merge from 'lodash/merge';
import { InstanceSettings } from './InstanceSettings';
import { ScheduledTaskManager } from './ScheduledTaskManager';
import { SSHClientsManager } from './SSHClientsManager';
import { binaryToBuffer } from './BinaryData/utils';

Expand Down Expand Up @@ -2585,13 +2586,6 @@ export function getNodeWebhookUrl(
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id, node, path.toString(), isFullPath);
}

/**
* Returns the timezone for the workflow
*/
export function getTimezone(workflow: Workflow): string {
return workflow.settings.timezone ?? getGlobalState().defaultTimezone;
}

/**
* Returns the full webhook description of the webhook with the given name
*
Expand Down Expand Up @@ -2957,7 +2951,7 @@ const getCommonWorkflowFunctions = (
getRestApiUrl: () => additionalData.restApiUrl,
getInstanceBaseUrl: () => additionalData.instanceBaseUrl,
getInstanceId: () => Container.get(InstanceSettings).instanceId,
getTimezone: () => getTimezone(workflow),
getTimezone: () => workflow.timezone,
getCredentialsProperties: (type: string) =>
additionalData.credentialsHelper.getCredentialsProperties(type),
prepareOutputData: async (outputData) => [outputData],
Expand Down Expand Up @@ -3286,6 +3280,14 @@ const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({
await Container.get(SSHClientsManager).getClient(credentials),
});

const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => {
const scheduledTaskManager = Container.get(ScheduledTaskManager);
return {
registerCron: (cronExpression, onTick) =>
scheduledTaskManager.registerCron(workflow, cronExpression, onTick),
};
};

const getAllowedPaths = () => {
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
if (!restrictFileAccessTo) {
Expand Down Expand Up @@ -3489,6 +3491,7 @@ export function getExecutePollFunctions(
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
returnJsonArray,
},
};
Expand Down Expand Up @@ -3553,6 +3556,7 @@ export function getExecuteTriggerFunctions(
...getSSHTunnelFunctions(),
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
returnJsonArray,
},
};
Expand Down
31 changes: 31 additions & 0 deletions packages/core/src/ScheduledTaskManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Service } from 'typedi';
import { CronJob } from 'cron';
import type { CronExpression, Workflow } from 'n8n-workflow';

@Service()
export class ScheduledTaskManager {
readonly cronJobs = new Map<string, CronJob[]>();

registerCron(workflow: Workflow, cronExpression: CronExpression, onTick: () => void) {
const cronJob = new CronJob(cronExpression, onTick, undefined, true, workflow.timezone);
const cronJobsForWorkflow = this.cronJobs.get(workflow.id);
if (cronJobsForWorkflow) {
cronJobsForWorkflow.push(cronJob);
} else {
this.cronJobs.set(workflow.id, [cronJob]);
}
}

deregisterCrons(workflowId: string) {
const cronJobs = this.cronJobs.get(workflowId) ?? [];
for (const cronJob of cronJobs) {
cronJob.stop();
}
}

deregisterAllCrons() {
for (const workflowId of Object.keys(this.cronJobs)) {
this.deregisterCrons(workflowId);
}
}
}
54 changes: 54 additions & 0 deletions packages/core/test/ScheduledTaskManager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { Workflow } from 'n8n-workflow';
import { mock } from 'jest-mock-extended';

import { ScheduledTaskManager } from '@/ScheduledTaskManager';

describe('ScheduledTaskManager', () => {
const workflow = mock<Workflow>({ timezone: 'GMT' });
const everyMinute = '0 * * * * *';
const onTick = jest.fn();

let scheduledTaskManager: ScheduledTaskManager;

beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers();
scheduledTaskManager = new ScheduledTaskManager();
});

it('should throw when workflow timezone is invalid', () => {
expect(() =>
scheduledTaskManager.registerCron(
mock<Workflow>({ timezone: 'somewhere' }),
everyMinute,
onTick,
),
).toThrow('Invalid timezone.');
});

it('should throw when cron expression is invalid', () => {
expect(() =>
//@ts-expect-error invalid cron expression is a type-error
scheduledTaskManager.registerCron(workflow, 'invalid-cron-expression', onTick),
).toThrow();
});

it('should register valid CronJobs', async () => {
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);

expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
expect(onTick).toHaveBeenCalledTimes(10);
});

it('should deregister CronJobs for a workflow', async () => {
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.deregisterCrons(workflow.id);

expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
expect(onTick).not.toHaveBeenCalled();
});
});
24 changes: 3 additions & 21 deletions packages/nodes-base/nodes/Cron/Cron.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import type {
} from 'n8n-workflow';
import { NodeHelpers, toCronExpression } from 'n8n-workflow';

import { CronJob } from 'cron';

export class Cron implements INodeType {
description: INodeTypeDescription = {
displayName: 'Cron',
Expand Down Expand Up @@ -66,27 +64,11 @@ export class Cron implements INodeType {
this.emit([this.helpers.returnJsonArray([{}])]);
};

const timezone = this.getTimezone();

// Start the cron-jobs
const cronJobs = cronTimes.map(
(cronTime) => new CronJob(cronTime, executeTrigger, undefined, true, timezone),
);

// Stop the cron-jobs
async function closeFunction() {
for (const cronJob of cronJobs) {
cronJob.stop();
}
}

async function manualTriggerFunction() {
executeTrigger();
}
// Register the cron-jobs
cronTimes.forEach((cronTime) => this.helpers.registerCron(cronTime, executeTrigger));

return {
closeFunction,
manualTriggerFunction,
manualTriggerFunction: async () => executeTrigger(),
};
}
}
Loading
Loading