From d174dd7cd5f78aa34105cf20f07df1312fccdb1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 8 Nov 2022 13:57:17 +0100 Subject: [PATCH 1/4] SIGTERM/SIGINT should only be handled once --- packages/cli/commands/executeBatch.ts | 4 ++-- packages/cli/commands/start.ts | 4 ++-- packages/cli/commands/webhook.ts | 4 ++-- packages/cli/commands/worker.ts | 4 ++-- packages/cli/src/WorkflowRunnerProcess.ts | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/cli/commands/executeBatch.ts b/packages/cli/commands/executeBatch.ts index 3f4ffd92360e8..2c3d64e8f656d 100644 --- a/packages/cli/commands/executeBatch.ts +++ b/packages/cli/commands/executeBatch.ts @@ -192,8 +192,8 @@ export class ExecuteBatch extends Command { // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types async run() { - process.on('SIGTERM', ExecuteBatch.stopProcess); - process.on('SIGINT', ExecuteBatch.stopProcess); + process.once('SIGTERM', ExecuteBatch.stopProcess); + process.once('SIGINT', ExecuteBatch.stopProcess); const logger = getLogger(); LoggerProxy.init(logger); diff --git a/packages/cli/commands/start.ts b/packages/cli/commands/start.ts index 4477bb446d3ae..c1680f9de00c3 100644 --- a/packages/cli/commands/start.ts +++ b/packages/cli/commands/start.ts @@ -150,8 +150,8 @@ export class Start extends Command { async run() { // Make sure that n8n shuts down gracefully if possible - process.on('SIGTERM', Start.stopProcess); - process.on('SIGINT', Start.stopProcess); + process.once('SIGTERM', Start.stopProcess); + process.once('SIGINT', Start.stopProcess); // eslint-disable-next-line @typescript-eslint/no-shadow const { flags } = this.parse(Start); diff --git a/packages/cli/commands/webhook.ts b/packages/cli/commands/webhook.ts index d269cf3b35d64..e057fbb05ab4e 100644 --- a/packages/cli/commands/webhook.ts +++ b/packages/cli/commands/webhook.ts @@ -88,8 +88,8 @@ export class Webhook extends Command { LoggerProxy.init(logger); // Make sure that n8n shuts down gracefully if possible - process.on('SIGTERM', Webhook.stopProcess); - process.on('SIGINT', Webhook.stopProcess); + process.once('SIGTERM', Webhook.stopProcess); + process.once('SIGINT', Webhook.stopProcess); // eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-shadow const { flags } = this.parse(Webhook); diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index 4ecda5c702795..fd221f5a19b73 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -258,8 +258,8 @@ export class Worker extends Command { console.info('Starting n8n worker...'); // Make sure that n8n shuts down gracefully if possible - process.on('SIGTERM', Worker.stopProcess); - process.on('SIGINT', Worker.stopProcess); + process.once('SIGTERM', Worker.stopProcess); + process.once('SIGINT', Worker.stopProcess); // Wrap that the process does not close but we can still use async await (async () => { diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index dfcb7ca7a89d5..ca73c9f66e00f 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -86,8 +86,8 @@ export class WorkflowRunnerProcess { } async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise { - process.on('SIGTERM', WorkflowRunnerProcess.stopProcess); - process.on('SIGINT', WorkflowRunnerProcess.stopProcess); + process.once('SIGTERM', WorkflowRunnerProcess.stopProcess); + process.once('SIGINT', WorkflowRunnerProcess.stopProcess); // eslint-disable-next-line no-multi-assign const logger = (this.logger = getLogger()); From 9ed35c464db63099a1892b77168454e89a56a4e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 8 Nov 2022 14:05:08 +0100 Subject: [PATCH 2/4] move error-handling initialization to commands --- packages/cli/commands/start.ts | 3 +++ packages/cli/commands/webhook.ts | 3 +++ packages/cli/src/ErrorReporting.ts | 14 +++++++------- packages/cli/src/Server.ts | 4 ++-- packages/cli/src/WebhookServer.ts | 4 ++-- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/packages/cli/commands/start.ts b/packages/cli/commands/start.ts index c1680f9de00c3..891a8e95b10ab 100644 --- a/packages/cli/commands/start.ts +++ b/packages/cli/commands/start.ts @@ -34,6 +34,7 @@ import { import { getLogger } from '../src/Logger'; import { getAllInstalledPackages } from '../src/CommunityNodes/packageModel'; +import { initErrorHandling } from '../src/ErrorReporting'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -153,6 +154,8 @@ export class Start extends Command { process.once('SIGTERM', Start.stopProcess); process.once('SIGINT', Start.stopProcess); + initErrorHandling(); + // eslint-disable-next-line @typescript-eslint/no-shadow const { flags } = this.parse(Start); diff --git a/packages/cli/commands/webhook.ts b/packages/cli/commands/webhook.ts index e057fbb05ab4e..2c8476ca82169 100644 --- a/packages/cli/commands/webhook.ts +++ b/packages/cli/commands/webhook.ts @@ -26,6 +26,7 @@ import { } from '../src'; import { getLogger } from '../src/Logger'; +import { initErrorHandling } from '../src/ErrorReporting'; let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined; let processExistCode = 0; @@ -91,6 +92,8 @@ export class Webhook extends Command { process.once('SIGTERM', Webhook.stopProcess); process.once('SIGINT', Webhook.stopProcess); + initErrorHandling(); + // eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-shadow const { flags } = this.parse(Webhook); diff --git a/packages/cli/src/ErrorReporting.ts b/packages/cli/src/ErrorReporting.ts index a0d00a75e58cb..b508e5343dad5 100644 --- a/packages/cli/src/ErrorReporting.ts +++ b/packages/cli/src/ErrorReporting.ts @@ -6,7 +6,7 @@ import { ErrorReporterProxy } from 'n8n-workflow'; let initialized = false; -export const initErrorHandling = (app?: Application) => { +export const initErrorHandling = () => { if (initialized) return; if (!config.getEnv('diagnostics.enabled')) { @@ -27,15 +27,15 @@ export const initErrorHandling = (app?: Application) => { }, }); - if (app) { - const { requestHandler, errorHandler } = Sentry.Handlers; - app.use(requestHandler()); - app.use(errorHandler()); - } - ErrorReporterProxy.init({ report: (error, options) => Sentry.captureException(error, options), }); initialized = true; }; + +export const setupErrorMiddleware = (app: Application) => { + const { requestHandler, errorHandler } = Sentry.Handlers; + app.use(requestHandler()); + app.use(errorHandler()); +}; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 903c799f505ea..c450681031e7e 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -155,7 +155,7 @@ import glob from 'fast-glob'; import { ResponseError } from './ResponseHelper'; import { toHttpNodeParameters } from './CurlConverterHelper'; -import { initErrorHandling } from './ErrorReporting'; +import { setupErrorMiddleware } from './ErrorReporting'; require('body-parser-xml')(bodyParser); @@ -259,7 +259,7 @@ class App { this.presetCredentialsLoaded = false; this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); - initErrorHandling(this.app); + setupErrorMiddleware(this.app); const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl(); const telemetrySettings: ITelemetrySettings = { diff --git a/packages/cli/src/WebhookServer.ts b/packages/cli/src/WebhookServer.ts index b68d7de07ad76..2ef62cd1b641a 100644 --- a/packages/cli/src/WebhookServer.ts +++ b/packages/cli/src/WebhookServer.ts @@ -33,7 +33,7 @@ import { import config from '../config'; // eslint-disable-next-line import/no-cycle import { WEBHOOK_METHODS } from './WebhookHelpers'; -import { initErrorHandling } from './ErrorReporting'; +import { setupErrorMiddleware } from './ErrorReporting'; // eslint-disable-next-line @typescript-eslint/no-var-requires, @typescript-eslint/no-unsafe-call require('body-parser-xml')(bodyParser); @@ -219,7 +219,7 @@ class App { this.presetCredentialsLoaded = false; this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); - initErrorHandling(this.app); + setupErrorMiddleware(this.app); } /** From 75e651844b0f094ddcac777166f8f7a88b451ea9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 8 Nov 2022 14:32:05 +0100 Subject: [PATCH 3/4] create a new `sleep` function in workflow utils --- packages/cli/commands/executeBatch.ts | 7 ++----- packages/cli/commands/start.ts | 6 ++---- packages/cli/commands/webhook.ts | 6 ++---- packages/cli/commands/worker.ts | 13 +++++++++---- packages/nodes-base/nodes/Discord/Discord.node.ts | 5 +++-- .../nodes/HttpRequest/V1/HttpRequestV1.node.ts | 3 ++- .../nodes/HttpRequest/V2/HttpRequestV2.node.ts | 3 ++- .../nodes/HttpRequest/V3/HttpRequestV3.node.ts | 3 ++- .../nodes-base/nodes/RabbitMQ/GenericFunctions.ts | 6 ++---- .../nodes-base/nodes/Twitter/GenericFunctions.ts | 8 ++------ packages/workflow/src/index.ts | 2 +- packages/workflow/src/utils.ts | 5 +++++ 12 files changed, 34 insertions(+), 33 deletions(-) diff --git a/packages/cli/commands/executeBatch.ts b/packages/cli/commands/executeBatch.ts index 2c3d64e8f656d..58fda6f9eb158 100644 --- a/packages/cli/commands/executeBatch.ts +++ b/packages/cli/commands/executeBatch.ts @@ -11,8 +11,7 @@ import { Command, flags } from '@oclif/command'; import { BinaryDataManager, UserSettings } from 'n8n-core'; -// eslint-disable-next-line @typescript-eslint/no-unused-vars -import { INode, ITaskData, LoggerProxy } from 'n8n-workflow'; +import { ITaskData, LoggerProxy, sleep } from 'n8n-workflow'; import { sep } from 'path'; @@ -147,9 +146,7 @@ export class ExecuteBatch extends Command { }); } // eslint-disable-next-line no-await-in-loop - await new Promise((resolve) => { - setTimeout(resolve, 500); - }); + await sleep(500); executingWorkflows = activeExecutionsInstance.getActiveExecutions(); } // We may receive true but when called from `process.on` diff --git a/packages/cli/commands/start.ts b/packages/cli/commands/start.ts index 891a8e95b10ab..6894d8c9efca9 100644 --- a/packages/cli/commands/start.ts +++ b/packages/cli/commands/start.ts @@ -12,7 +12,7 @@ import { Command, flags } from '@oclif/command'; // eslint-disable-next-line import/no-extraneous-dependencies import Redis from 'ioredis'; -import { IDataObject, LoggerProxy } from 'n8n-workflow'; +import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow'; import { createHash } from 'crypto'; import config from '../config'; import { @@ -137,9 +137,7 @@ export class Start extends Command { }); } // eslint-disable-next-line no-await-in-loop - await new Promise((resolve) => { - setTimeout(resolve, 500); - }); + await sleep(500); executingWorkflows = activeExecutionsInstance.getActiveExecutions(); } } catch (error) { diff --git a/packages/cli/commands/webhook.ts b/packages/cli/commands/webhook.ts index 2c8476ca82169..45834248c9c41 100644 --- a/packages/cli/commands/webhook.ts +++ b/packages/cli/commands/webhook.ts @@ -9,7 +9,7 @@ import { Command, flags } from '@oclif/command'; // eslint-disable-next-line import/no-extraneous-dependencies import Redis from 'ioredis'; -import { IDataObject, LoggerProxy } from 'n8n-workflow'; +import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow'; import config from '../config'; import { ActiveExecutions, @@ -71,9 +71,7 @@ export class Webhook extends Command { ); } // eslint-disable-next-line no-await-in-loop - await new Promise((resolve) => { - setTimeout(resolve, 500); - }); + await sleep(500); executingWorkflows = activeExecutionsInstance.getActiveExecutions(); } } catch (error) { diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index fd221f5a19b73..e99a1d793de8b 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -15,7 +15,14 @@ import PCancelable from 'p-cancelable'; import { Command, flags } from '@oclif/command'; import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core'; -import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow'; +import { + IExecuteResponsePromiseData, + INodeTypes, + IRun, + Workflow, + LoggerProxy, + sleep, +} from 'n8n-workflow'; import { FindOneOptions, getConnectionManager } from 'typeorm'; @@ -103,9 +110,7 @@ export class Worker extends Command { ); } // eslint-disable-next-line no-await-in-loop - await new Promise((resolve) => { - setTimeout(resolve, 500); - }); + await sleep(500); } } catch (error) { LoggerProxy.error('There was an error shutting down n8n.', error); diff --git a/packages/nodes-base/nodes/Discord/Discord.node.ts b/packages/nodes-base/nodes/Discord/Discord.node.ts index 5a7e199fd2a27..6a1180c7226f6 100644 --- a/packages/nodes-base/nodes/Discord/Discord.node.ts +++ b/packages/nodes-base/nodes/Discord/Discord.node.ts @@ -7,6 +7,7 @@ import { jsonParse, NodeApiError, NodeOperationError, + sleep, } from 'n8n-workflow'; import { DiscordAttachment, DiscordWebhook } from './Interfaces'; @@ -244,7 +245,7 @@ export class Discord implements INodeType { // remaining requests 0 // https://discord.com/developers/docs/topics/rate-limits if (!+remainingRatelimit) { - await new Promise((resolve) => setTimeout(resolve, resetAfter || 1000)); + await sleep(resetAfter ?? 1000); } break; @@ -255,7 +256,7 @@ export class Discord implements INodeType { if (error.statusCode === 429) { const retryAfter = error.response?.headers['retry-after'] || 1000; - await new Promise((resolve) => setTimeout(resolve, +retryAfter)); + await sleep(+retryAfter); continue; } diff --git a/packages/nodes-base/nodes/HttpRequest/V1/HttpRequestV1.node.ts b/packages/nodes-base/nodes/HttpRequest/V1/HttpRequestV1.node.ts index fe3ce4b313aa4..05388ba32e0d1 100644 --- a/packages/nodes-base/nodes/HttpRequest/V1/HttpRequestV1.node.ts +++ b/packages/nodes-base/nodes/HttpRequest/V1/HttpRequestV1.node.ts @@ -10,6 +10,7 @@ import { INodeTypeDescription, NodeApiError, NodeOperationError, + sleep, } from 'n8n-workflow'; import { OptionsWithUri } from 'request'; @@ -667,7 +668,7 @@ export class HttpRequestV1 implements INodeType { const batchSize: number = (options.batchSize as number) > 0 ? (options.batchSize as number) : 1; if (itemIndex % batchSize === 0) { - await new Promise((resolve) => setTimeout(resolve, options.batchInterval as number)); + await sleep(options.batchInterval as number); } } diff --git a/packages/nodes-base/nodes/HttpRequest/V2/HttpRequestV2.node.ts b/packages/nodes-base/nodes/HttpRequest/V2/HttpRequestV2.node.ts index db136e16a9102..cc529865229b7 100644 --- a/packages/nodes-base/nodes/HttpRequest/V2/HttpRequestV2.node.ts +++ b/packages/nodes-base/nodes/HttpRequest/V2/HttpRequestV2.node.ts @@ -8,6 +8,7 @@ import { INodeTypeDescription, NodeApiError, NodeOperationError, + sleep, } from 'n8n-workflow'; import { OptionsWithUri } from 'request'; @@ -701,7 +702,7 @@ export class HttpRequestV2 implements INodeType { const batchSize: number = (options.batchSize as number) > 0 ? (options.batchSize as number) : 1; if (itemIndex % batchSize === 0) { - await new Promise((resolve) => setTimeout(resolve, options.batchInterval as number)); + await sleep(options.batchInterval as number); } } diff --git a/packages/nodes-base/nodes/HttpRequest/V3/HttpRequestV3.node.ts b/packages/nodes-base/nodes/HttpRequest/V3/HttpRequestV3.node.ts index d949700cb8124..c7af85020294f 100644 --- a/packages/nodes-base/nodes/HttpRequest/V3/HttpRequestV3.node.ts +++ b/packages/nodes-base/nodes/HttpRequest/V3/HttpRequestV3.node.ts @@ -9,6 +9,7 @@ import { jsonParse, NodeApiError, NodeOperationError, + sleep, } from 'n8n-workflow'; import { OptionsWithUri } from 'request-promise-native'; @@ -1002,7 +1003,7 @@ export class HttpRequestV3 implements INodeType { if (itemIndex > 0 && batchSize >= 0 && batchInterval > 0) { if (itemIndex % batchSize === 0) { - await new Promise((resolve) => setTimeout(resolve, batchInterval)); + await sleep(batchInterval); } } diff --git a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts index c379718762a2a..14a2a77d020f0 100644 --- a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts +++ b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts @@ -1,4 +1,4 @@ -import { IDataObject, IExecuteFunctions, ITriggerFunctions } from 'n8n-workflow'; +import { IDataObject, IExecuteFunctions, ITriggerFunctions, sleep } from 'n8n-workflow'; import * as amqplib from 'amqplib'; @@ -138,9 +138,7 @@ export class MessageTracker { // when for example a new version of the workflow got saved. That would lead to // them getting delivered and processed again. while (unansweredMessages !== 0 && count++ <= 300) { - await new Promise((resolve) => { - setTimeout(resolve, 1000); - }); + await sleep(1000); unansweredMessages = this.unansweredMessages(); } diff --git a/packages/nodes-base/nodes/Twitter/GenericFunctions.ts b/packages/nodes-base/nodes/Twitter/GenericFunctions.ts index bcdc637bb8a9c..6959694f21d92 100644 --- a/packages/nodes-base/nodes/Twitter/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Twitter/GenericFunctions.ts @@ -13,6 +13,7 @@ import { INodeExecutionData, NodeApiError, NodeOperationError, + sleep, } from 'n8n-workflow'; export async function twitterApiRequest( @@ -193,12 +194,7 @@ export async function uploadAttachments( // data has not been uploaded yet, so wait for it to be ready if (response.processing_info) { const { check_after_secs } = response.processing_info as IDataObject; - await new Promise((resolve, _reject) => { - setTimeout(() => { - // @ts-ignore - resolve(); - }, (check_after_secs as number) * 1000); - }); + await sleep((check_after_secs as number) * 1000); } media.push(response); diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 3cb855a7055b5..86b1d884607ad 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -18,7 +18,7 @@ export * from './WorkflowErrors'; export * from './WorkflowHooks'; export * from './VersionedNodeType'; export { LoggerProxy, NodeHelpers, ObservableObject, TelemetryHelpers }; -export { deepCopy, jsonParse } from './utils'; +export { deepCopy, jsonParse, sleep } from './utils'; export { isINodeProperties, isINodePropertyOptions, diff --git a/packages/workflow/src/utils.ts b/packages/workflow/src/utils.ts index 1dbb6ac1b298e..0a6c1c98a76db 100644 --- a/packages/workflow/src/utils.ts +++ b/packages/workflow/src/utils.ts @@ -64,3 +64,8 @@ export const jsonParse = (jsonString: string, options?: JSONParseOptions): throw error; } }; + +export const sleep = async (ms: number): Promise => + new Promise((resolve) => { + setTimeout(resolve, ms); + }); From 46f5425a98c2826443a912e342c972b52d207e9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 8 Nov 2022 15:08:33 +0100 Subject: [PATCH 4/4] detect crashes and report them to Sentry --- packages/cli/commands/start.ts | 22 ++++++++++++++------- packages/cli/commands/webhook.ts | 20 +++++++++++++------ packages/cli/commands/worker.ts | 12 ++++++------ packages/cli/src/CrashJournal.ts | 33 ++++++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 19 deletions(-) create mode 100644 packages/cli/src/CrashJournal.ts diff --git a/packages/cli/commands/start.ts b/packages/cli/commands/start.ts index 6894d8c9efca9..5837af8556048 100644 --- a/packages/cli/commands/start.ts +++ b/packages/cli/commands/start.ts @@ -35,6 +35,7 @@ import { import { getLogger } from '../src/Logger'; import { getAllInstalledPackages } from '../src/CommunityNodes/packageModel'; import { initErrorHandling } from '../src/ErrorReporting'; +import * as CrashJournal from '../src/CrashJournal'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -84,7 +85,7 @@ export class Start extends Command { } /** - * Stoppes the n8n in a graceful way. + * Stop n8n in a graceful way. * Make for example sure that all the webhooks from third party services * get removed. */ @@ -92,6 +93,12 @@ export class Start extends Command { static async stopProcess() { getLogger().info('\nStopping n8n...'); + const exit = () => { + CrashJournal.cleanup().finally(() => { + process.exit(processExitCode); + }); + }; + try { // Stop with trying to activate workflows that could not be activated activeWorkflowRunner?.removeAllQueuedWorkflowActivations(); @@ -103,7 +110,7 @@ export class Start extends Command { // In case that something goes wrong with shutdown we // kill after max. 30 seconds no matter what console.log(`process exited after 30s`); - process.exit(processExitCode); + exit(); }, 30000); await InternalHooksManager.getInstance().onN8nStop(); @@ -144,7 +151,7 @@ export class Start extends Command { console.error('There was an error shutting down n8n.', error); } - process.exit(processExitCode); + exit(); } async run() { @@ -152,7 +159,12 @@ export class Start extends Command { process.once('SIGTERM', Start.stopProcess); process.once('SIGINT', Start.stopProcess); + const logger = getLogger(); + LoggerProxy.init(logger); + logger.info('Initializing n8n process'); + initErrorHandling(); + await CrashJournal.init(); // eslint-disable-next-line @typescript-eslint/no-shadow const { flags } = this.parse(Start); @@ -160,10 +172,6 @@ export class Start extends Command { // Wrap that the process does not close but we can still use async await (async () => { try { - const logger = getLogger(); - LoggerProxy.init(logger); - logger.info('Initializing n8n process'); - // Start directly with the init of the database to improve startup time const startDbInitPromise = Db.init().catch((error: Error) => { logger.error(`There was an error initializing DB: "${error.message}"`); diff --git a/packages/cli/commands/webhook.ts b/packages/cli/commands/webhook.ts index 45834248c9c41..a1f3d727dfc50 100644 --- a/packages/cli/commands/webhook.ts +++ b/packages/cli/commands/webhook.ts @@ -27,9 +27,10 @@ import { import { getLogger } from '../src/Logger'; import { initErrorHandling } from '../src/ErrorReporting'; +import * as CrashJournal from '../src/CrashJournal'; let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined; -let processExistCode = 0; +let processExitCode = 0; export class Webhook extends Command { static description = 'Starts n8n webhook process. Intercepts only production URLs.'; @@ -41,7 +42,7 @@ export class Webhook extends Command { }; /** - * Stops the n8n in a graceful way. + * Stops n8n in a graceful way. * Make for example sure that all the webhooks from third party services * get removed. */ @@ -49,6 +50,12 @@ export class Webhook extends Command { static async stopProcess() { LoggerProxy.info(`\nStopping n8n...`); + const exit = () => { + CrashJournal.cleanup().finally(() => { + process.exit(processExitCode); + }); + }; + try { const externalHooks = ExternalHooks(); await externalHooks.run('n8n.stop', []); @@ -56,7 +63,7 @@ export class Webhook extends Command { setTimeout(() => { // In case that something goes wrong with shutdown we // kill after max. 30 seconds no matter what - process.exit(processExistCode); + exit(); }, 30000); // Wait for active workflow executions to finish @@ -78,7 +85,7 @@ export class Webhook extends Command { LoggerProxy.error('There was an error shutting down n8n.', error); } - process.exit(processExistCode); + exit(); } // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types @@ -91,6 +98,7 @@ export class Webhook extends Command { process.once('SIGINT', Webhook.stopProcess); initErrorHandling(); + await CrashJournal.init(); // eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-shadow const { flags } = this.parse(Webhook); @@ -119,7 +127,7 @@ export class Webhook extends Command { // eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access logger.error(`There was an error initializing DB: "${error.message}"`); - processExistCode = 1; + processExitCode = 1; // @ts-ignore process.emit('SIGINT'); process.exit(1); @@ -231,7 +239,7 @@ export class Webhook extends Command { // eslint-disable-next-line @typescript-eslint/restrict-template-expressions logger.error(`Webhook process cannot continue. "${error.message}"`); - processExistCode = 1; + processExitCode = 1; // @ts-ignore process.emit('SIGINT'); process.exit(1); diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index e99a1d793de8b..1836bbfb8cd02 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -69,11 +69,11 @@ export class Worker extends Command { static jobQueue: Queue.JobQueue; - static processExistCode = 0; + static processExitCode = 0; // static activeExecutions = ActiveExecutions.getInstance(); /** - * Stoppes the n8n in a graceful way. + * Stop n8n in a graceful way. * Make for example sure that all the webhooks from third party services * get removed. */ @@ -95,7 +95,7 @@ export class Worker extends Command { setTimeout(() => { // In case that something goes wrong with shutdown we // kill after max. 30 seconds no matter what - process.exit(Worker.processExistCode); + process.exit(Worker.processExitCode); }, maxStopTime); // Wait for active workflow executions to finish @@ -116,7 +116,7 @@ export class Worker extends Command { LoggerProxy.error('There was an error shutting down n8n.', error); } - process.exit(Worker.processExistCode); + process.exit(Worker.processExitCode); } async runJob(job: Queue.Job, nodeTypes: INodeTypes): Promise { @@ -275,7 +275,7 @@ export class Worker extends Command { const startDbInitPromise = Db.init().catch((error) => { logger.error(`There was an error initializing DB: "${error.message}"`); - Worker.processExistCode = 1; + Worker.processExitCode = 1; // @ts-ignore process.emit('SIGINT'); process.exit(1); @@ -446,7 +446,7 @@ export class Worker extends Command { } catch (error) { logger.error(`Worker process cannot continue. "${error.message}"`); - Worker.processExistCode = 1; + Worker.processExitCode = 1; // @ts-ignore process.emit('SIGINT'); process.exit(1); diff --git a/packages/cli/src/CrashJournal.ts b/packages/cli/src/CrashJournal.ts new file mode 100644 index 0000000000000..71803ec7b8019 --- /dev/null +++ b/packages/cli/src/CrashJournal.ts @@ -0,0 +1,33 @@ +import { existsSync } from 'fs'; +import { mkdir, utimes, open, rm } from 'fs/promises'; +import { join, dirname } from 'path'; +import { UserSettings } from 'n8n-core'; +import { ErrorReporterProxy as ErrorReporter, LoggerProxy, sleep } from 'n8n-workflow'; + +export const touchFile = async (filePath: string): Promise => { + await mkdir(dirname(filePath), { recursive: true }); + const time = new Date(); + try { + await utimes(filePath, time, time); + } catch { + const fd = await open(filePath, 'w'); + await fd.close(); + } +}; + +const journalFile = join(UserSettings.getUserN8nFolderPath(), 'crash.journal'); + +export const init = async () => { + if (existsSync(journalFile)) { + // Crash detected + ErrorReporter.warn('Last session crashed'); + LoggerProxy.error('Last session crashed'); + // add a 10 seconds pause to slow down crash-looping + await sleep(10_000); + } + await touchFile(journalFile); +}; + +export const cleanup = async () => { + await rm(journalFile, { force: true }); +};