Skip to content

Commit

Permalink
create a new sleep function in workflow utils
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Nov 8, 2022
1 parent 9ed35c4 commit 75e6518
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 33 deletions.
7 changes: 2 additions & 5 deletions packages/cli/commands/executeBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import { Command, flags } from '@oclif/command';

import { BinaryDataManager, UserSettings } from 'n8n-core';

// eslint-disable-next-line @typescript-eslint/no-unused-vars
import { INode, ITaskData, LoggerProxy } from 'n8n-workflow';
import { ITaskData, LoggerProxy, sleep } from 'n8n-workflow';

import { sep } from 'path';

Expand Down Expand Up @@ -147,9 +146,7 @@ export class ExecuteBatch extends Command {
});
}
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, 500);
});
await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
// We may receive true but when called from `process.on`
Expand Down
6 changes: 2 additions & 4 deletions packages/cli/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { Command, flags } from '@oclif/command';
// eslint-disable-next-line import/no-extraneous-dependencies
import Redis from 'ioredis';

import { IDataObject, LoggerProxy } from 'n8n-workflow';
import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow';
import { createHash } from 'crypto';
import config from '../config';
import {
Expand Down Expand Up @@ -137,9 +137,7 @@ export class Start extends Command {
});
}
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, 500);
});
await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
} catch (error) {
Expand Down
6 changes: 2 additions & 4 deletions packages/cli/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Command, flags } from '@oclif/command';
// eslint-disable-next-line import/no-extraneous-dependencies
import Redis from 'ioredis';

import { IDataObject, LoggerProxy } from 'n8n-workflow';
import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow';
import config from '../config';
import {
ActiveExecutions,
Expand Down Expand Up @@ -71,9 +71,7 @@ export class Webhook extends Command {
);
}
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, 500);
});
await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
} catch (error) {
Expand Down
13 changes: 9 additions & 4 deletions packages/cli/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ import PCancelable from 'p-cancelable';
import { Command, flags } from '@oclif/command';
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';

import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow';
import {
IExecuteResponsePromiseData,
INodeTypes,
IRun,
Workflow,
LoggerProxy,
sleep,
} from 'n8n-workflow';

import { FindOneOptions, getConnectionManager } from 'typeorm';

Expand Down Expand Up @@ -103,9 +110,7 @@ export class Worker extends Command {
);
}
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, 500);
});
await sleep(500);
}
} catch (error) {
LoggerProxy.error('There was an error shutting down n8n.', error);
Expand Down
5 changes: 3 additions & 2 deletions packages/nodes-base/nodes/Discord/Discord.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
jsonParse,
NodeApiError,
NodeOperationError,
sleep,
} from 'n8n-workflow';

import { DiscordAttachment, DiscordWebhook } from './Interfaces';
Expand Down Expand Up @@ -244,7 +245,7 @@ export class Discord implements INodeType {
// remaining requests 0
// https://discord.com/developers/docs/topics/rate-limits
if (!+remainingRatelimit) {
await new Promise<void>((resolve) => setTimeout(resolve, resetAfter || 1000));
await sleep(resetAfter ?? 1000);
}

break;
Expand All @@ -255,7 +256,7 @@ export class Discord implements INodeType {
if (error.statusCode === 429) {
const retryAfter = error.response?.headers['retry-after'] || 1000;

await new Promise<void>((resolve) => setTimeout(resolve, +retryAfter));
await sleep(+retryAfter);

continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
INodeTypeDescription,
NodeApiError,
NodeOperationError,
sleep,
} from 'n8n-workflow';

import { OptionsWithUri } from 'request';
Expand Down Expand Up @@ -667,7 +668,7 @@ export class HttpRequestV1 implements INodeType {
const batchSize: number =
(options.batchSize as number) > 0 ? (options.batchSize as number) : 1;
if (itemIndex % batchSize === 0) {
await new Promise((resolve) => setTimeout(resolve, options.batchInterval as number));
await sleep(options.batchInterval as number);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
INodeTypeDescription,
NodeApiError,
NodeOperationError,
sleep,
} from 'n8n-workflow';

import { OptionsWithUri } from 'request';
Expand Down Expand Up @@ -701,7 +702,7 @@ export class HttpRequestV2 implements INodeType {
const batchSize: number =
(options.batchSize as number) > 0 ? (options.batchSize as number) : 1;
if (itemIndex % batchSize === 0) {
await new Promise((resolve) => setTimeout(resolve, options.batchInterval as number));
await sleep(options.batchInterval as number);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
jsonParse,
NodeApiError,
NodeOperationError,
sleep,
} from 'n8n-workflow';

import { OptionsWithUri } from 'request-promise-native';
Expand Down Expand Up @@ -1002,7 +1003,7 @@ export class HttpRequestV3 implements INodeType {

if (itemIndex > 0 && batchSize >= 0 && batchInterval > 0) {
if (itemIndex % batchSize === 0) {
await new Promise((resolve) => setTimeout(resolve, batchInterval));
await sleep(batchInterval);
}
}

Expand Down
6 changes: 2 additions & 4 deletions packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IDataObject, IExecuteFunctions, ITriggerFunctions } from 'n8n-workflow';
import { IDataObject, IExecuteFunctions, ITriggerFunctions, sleep } from 'n8n-workflow';

import * as amqplib from 'amqplib';

Expand Down Expand Up @@ -138,9 +138,7 @@ export class MessageTracker {
// when for example a new version of the workflow got saved. That would lead to
// them getting delivered and processed again.
while (unansweredMessages !== 0 && count++ <= 300) {
await new Promise((resolve) => {
setTimeout(resolve, 1000);
});
await sleep(1000);
unansweredMessages = this.unansweredMessages();
}

Expand Down
8 changes: 2 additions & 6 deletions packages/nodes-base/nodes/Twitter/GenericFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
INodeExecutionData,
NodeApiError,
NodeOperationError,
sleep,
} from 'n8n-workflow';

export async function twitterApiRequest(
Expand Down Expand Up @@ -193,12 +194,7 @@ export async function uploadAttachments(
// data has not been uploaded yet, so wait for it to be ready
if (response.processing_info) {
const { check_after_secs } = response.processing_info as IDataObject;
await new Promise((resolve, _reject) => {
setTimeout(() => {
// @ts-ignore
resolve();
}, (check_after_secs as number) * 1000);
});
await sleep((check_after_secs as number) * 1000);
}

media.push(response);
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export * from './WorkflowErrors';
export * from './WorkflowHooks';
export * from './VersionedNodeType';
export { LoggerProxy, NodeHelpers, ObservableObject, TelemetryHelpers };
export { deepCopy, jsonParse } from './utils';
export { deepCopy, jsonParse, sleep } from './utils';
export {
isINodeProperties,
isINodePropertyOptions,
Expand Down
5 changes: 5 additions & 0 deletions packages/workflow/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,8 @@ export const jsonParse = <T>(jsonString: string, options?: JSONParseOptions<T>):
throw error;
}
};

export const sleep = async (ms: number): Promise<void> =>
new Promise((resolve) => {
setTimeout(resolve, ms);
});

0 comments on commit 75e6518

Please sign in to comment.