diff --git a/CHANGELOG.md b/CHANGELOG.md index 986b85b0682ea..99120642fe416 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,44 @@ +# [1.54.0](https://github.com/n8n-io/n8n/compare/n8n@1.53.0...n8n@1.54.0) (2024-08-07) + + +### Bug Fixes + +* **core:** Ensure OAuth token data is not stubbed in source control ([#10302](https://github.com/n8n-io/n8n/issues/10302)) ([98115e9](https://github.com/n8n-io/n8n/commit/98115e95df8289a8ec400a570a7f256382f8e286)) +* **core:** Fix expressions in webhook nodes(Form, Webhook) to access previous node's data ([#10247](https://github.com/n8n-io/n8n/issues/10247)) ([88a1701](https://github.com/n8n-io/n8n/commit/88a170176a3447e7f847e9cf145aeb867b1c5fcf)) +* **core:** Fix user telemetry bugs ([#10293](https://github.com/n8n-io/n8n/issues/10293)) ([42a0b59](https://github.com/n8n-io/n8n/commit/42a0b594d6ea2527c55a2aa9976c904cf70ecf92)) +* **core:** Make execution and its data creation atomic ([#10276](https://github.com/n8n-io/n8n/issues/10276)) ([ae50bb9](https://github.com/n8n-io/n8n/commit/ae50bb95a8e5bf1cdbf9483da54b84094b82e260)) +* **core:** Make OAuth1/OAuth2 callback not require auth ([#10263](https://github.com/n8n-io/n8n/issues/10263)) ([a8e2774](https://github.com/n8n-io/n8n/commit/a8e2774f5382e202556b5506c7788265786aa973)) +* **core:** Revert transactions until we remove the legacy sqlite driver ([#10299](https://github.com/n8n-io/n8n/issues/10299)) ([1eba7c3](https://github.com/n8n-io/n8n/commit/1eba7c3c763ac5b6b28c1c6fc43fc8c215249292)) +* **core:** Surface enterprise trial error message ([#10267](https://github.com/n8n-io/n8n/issues/10267)) ([432ac1d](https://github.com/n8n-io/n8n/commit/432ac1da59e173ce4c0f2abbc416743d9953ba70)) +* **core:** Upgrade tournament to address some XSS vulnerabilities ([#10277](https://github.com/n8n-io/n8n/issues/10277)) ([43ae159](https://github.com/n8n-io/n8n/commit/43ae159ea40c574f8e41bdfd221ab2bf3268eee7)) +* **core:** VM2 sandbox should not throw on `new Promise` ([#10298](https://github.com/n8n-io/n8n/issues/10298)) ([7e95f9e](https://github.com/n8n-io/n8n/commit/7e95f9e2e40a99871f1b6abcdacb39ac5f857332)) +* **core:** Webhook and form baseUrl missing ([#10290](https://github.com/n8n-io/n8n/issues/10290)) ([8131d66](https://github.com/n8n-io/n8n/commit/8131d66f8ca1b1da00597a12859ee4372148a0c9)) +* **editor:** Enable moving resources only if team projects are available by the license ([#10271](https://github.com/n8n-io/n8n/issues/10271)) ([42ba884](https://github.com/n8n-io/n8n/commit/42ba8841c401126c77158a53dc8fcbb45dfce8fd)) +* **editor:** Fix execution retry button ([#10275](https://github.com/n8n-io/n8n/issues/10275)) ([55f2ffe](https://github.com/n8n-io/n8n/commit/55f2ffe256c91a028cee95c3bbb37a093a1c0f81)) +* **editor:** Update design system Avatar component to show initials also when only firstName or lastName is given ([#10308](https://github.com/n8n-io/n8n/issues/10308)) ([46bbf09](https://github.com/n8n-io/n8n/commit/46bbf09beacad12472d91786b91d845fe2afb26d)) +* **editor:** Update tags filter/editor to not show non existing tag as a selectable option ([#10297](https://github.com/n8n-io/n8n/issues/10297)) ([557a76e](https://github.com/n8n-io/n8n/commit/557a76ec2326de72fb7a8b46fc4353f8fd9b591d)) +* **Invoice Ninja Node:** Fix payment types ([#10196](https://github.com/n8n-io/n8n/issues/10196)) ([c5acbb7](https://github.com/n8n-io/n8n/commit/c5acbb7ec0d24ec9b30c221fa3b2fb615fb9ec7f)) +* Loop node no input data shown ([#10224](https://github.com/n8n-io/n8n/issues/10224)) ([c8ee852](https://github.com/n8n-io/n8n/commit/c8ee852159207be0cfe2c3e0ee8e7b29d838aa35)) + + +### Features + +* **core:** Allow filtering executions and users by project in Public API ([#10250](https://github.com/n8n-io/n8n/issues/10250)) ([7056e50](https://github.com/n8n-io/n8n/commit/7056e50b006bda665f64ce6234c5c1967891c415)) +* **core:** Allow transferring credentials in Public API ([#10259](https://github.com/n8n-io/n8n/issues/10259)) ([07d7b24](https://github.com/n8n-io/n8n/commit/07d7b247f02a9d7185beca7817deb779a3d665dd)) +* **core:** Show sub-node error on the logs pane. Open logs pane on sub-node error ([#10248](https://github.com/n8n-io/n8n/issues/10248)) ([57d1c9a](https://github.com/n8n-io/n8n/commit/57d1c9a99e97308f2f1b8ae05ac3861a835e8e5a)) +* **core:** Support community packages in scaling-mode ([#10228](https://github.com/n8n-io/n8n/issues/10228)) ([88086a4](https://github.com/n8n-io/n8n/commit/88086a41ff5b804b35aa9d9503dc2d48836fe4ec)) +* **core:** Support create, delete, edit role for users in Public API ([#10279](https://github.com/n8n-io/n8n/issues/10279)) ([84efbd9](https://github.com/n8n-io/n8n/commit/84efbd9b9c51f536b21a4f969ab607d277bef692)) +* **core:** Support create, read, update, delete projects in Public API ([#10269](https://github.com/n8n-io/n8n/issues/10269)) ([489ce10](https://github.com/n8n-io/n8n/commit/489ce100634c3af678fb300e9a39d273042542e6)) +* **editor:** Auto-add LLM chain for new LLM nodes on empty canvas ([#10245](https://github.com/n8n-io/n8n/issues/10245)) ([06419d9](https://github.com/n8n-io/n8n/commit/06419d9483ae916e79aace6d8c17e265b419b15d)) +* **Elasticsearch Node:** Add bulk operations for Elasticsearch ([#9940](https://github.com/n8n-io/n8n/issues/9940)) ([bf8f848](https://github.com/n8n-io/n8n/commit/bf8f848645dfd31527713a55bd1fc93865327017)) +* **Lemlist Trigger Node:** Update Trigger events ([#10311](https://github.com/n8n-io/n8n/issues/10311)) ([15f10ec](https://github.com/n8n-io/n8n/commit/15f10ec325cb5eda0f952bed3a5f171dd91bc639)) +* **MongoDB Node:** Add projection to query options on Find ([#9972](https://github.com/n8n-io/n8n/issues/9972)) ([0a84e0d](https://github.com/n8n-io/n8n/commit/0a84e0d8b047669f5cf023c21383d01c929c5b4f)) +* **Postgres Chat Memory, Redis Chat Memory, Xata:** Add support for context window length ([#10203](https://github.com/n8n-io/n8n/issues/10203)) ([e3edeaa](https://github.com/n8n-io/n8n/commit/e3edeaa03526f041d15d1099ea91869e38a0decc)) +* **Stripe Trigger Node:** Add Stripe webhook descriptions based on the workflow ID and name ([#9956](https://github.com/n8n-io/n8n/issues/9956)) ([3433465](https://github.com/n8n-io/n8n/commit/34334651e0e6874736a437a894176bed4590e5a7)) +* **Webflow Node:** Update to use the v2 API ([#9996](https://github.com/n8n-io/n8n/issues/9996)) ([6d8323f](https://github.com/n8n-io/n8n/commit/6d8323fadea8af04483eb1a873df0cf3ccc2a891)) + + + # [1.53.0](https://github.com/n8n-io/n8n/compare/n8n@1.52.0...n8n@1.53.0) (2024-07-31) diff --git a/README.md b/README.md index 145ecab8c62d6..d51ac596cad65 100644 --- a/README.md +++ b/README.md @@ -95,8 +95,8 @@ development environment ready in minutes. ## License n8n is [fair-code](https://faircode.io) distributed under the -[**Sustainable Use License**](https://github.com/n8n-io/n8n/blob/master/packages/cli/LICENSE.md) and the -[**n8n Enterprise License**](https://github.com/n8n-io/n8n/blob/master/packages/cli/LICENSE_EE.md). +[**Sustainable Use License**](https://github.com/n8n-io/n8n/blob/master/LICENSE.md) and the +[**n8n Enterprise License**](https://github.com/n8n-io/n8n/blob/master/LICENSE_EE.md). Proprietary licenses are available for enterprise customers. [Get in touch](mailto:license@n8n.io) diff --git a/cypress/e2e/17-workflow-tags.cy.ts b/cypress/e2e/17-workflow-tags.cy.ts index 88a2c9973dd66..fc889aead29c2 100644 --- a/cypress/e2e/17-workflow-tags.cy.ts +++ b/cypress/e2e/17-workflow-tags.cy.ts @@ -1,4 +1,5 @@ import { WorkflowPage } from '../pages'; +import { getVisibleSelect } from '../utils'; const wf = new WorkflowPage(); @@ -70,4 +71,20 @@ describe('Workflow tags', () => { wf.getters.workflowTags().click(); wf.getters.tagPills().should('have.length', TEST_TAGS.length - 1); }); + + it('should not show non existing tag as a selectable option', () => { + const NON_EXISTING_TAG = 'My Test Tag'; + + wf.getters.createTagButton().click(); + wf.actions.addTags(TEST_TAGS); + cy.get('body').click(0, 0); + wf.getters.workflowTags().click(); + wf.getters.tagsDropdown().find('input:focus').type(NON_EXISTING_TAG); + + getVisibleSelect() + .find('li') + .should('have.length', 2) + .filter(`:contains("${NON_EXISTING_TAG}")`) + .should('not.have.length'); + }); }); diff --git a/docker/images/n8n/README.md b/docker/images/n8n/README.md index d65459615086e..f7b45f9467502 100644 --- a/docker/images/n8n/README.md +++ b/docker/images/n8n/README.md @@ -230,6 +230,4 @@ Before you upgrade to the latest version make sure to check here if there are an ## License -n8n is [fair-code](https://faircode.io) distributed under the [**Sustainable Use License**](https://github.com/n8n-io/n8n/blob/master/packages/cli/LICENSE.md). - -Additional information about the license can be found in the [docs](https://docs.n8n.io/reference/license/). +You can find the license information [here](https://github.com/n8n-io/n8n/blob/master/README.md#license) diff --git a/package.json b/package.json index e05eeb9845f1b..d4a208385b4cd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "n8n-monorepo", - "version": "1.53.0", + "version": "1.54.0", "private": true, "engines": { "node": ">=20.15", diff --git a/packages/@n8n/chat/README.md b/packages/@n8n/chat/README.md index 538d72ce0af91..0ed53a5774082 100644 --- a/packages/@n8n/chat/README.md +++ b/packages/@n8n/chat/README.md @@ -260,10 +260,5 @@ body, ``` ## License -n8n Chat is [fair-code](https://faircode.io) distributed under the -[**Sustainable Use License**](https://github.com/n8n-io/n8n/blob/master/packages/cli/LICENSE.md). -Proprietary licenses are available for enterprise customers. [Get in touch](mailto:license@n8n.io) - -Additional information about the license model can be found in the -[docs](https://docs.n8n.io/reference/license/). +You can find the license information [here](https://github.com/n8n-io/n8n/blob/master/README.md#license) diff --git a/packages/@n8n/chat/package.json b/packages/@n8n/chat/package.json index 5ba8a1c39f201..f960f16783bb7 100644 --- a/packages/@n8n/chat/package.json +++ b/packages/@n8n/chat/package.json @@ -1,6 +1,6 @@ { "name": "@n8n/chat", - "version": "0.22.0", + "version": "0.23.0", "scripts": { "dev": "pnpm run storybook", "build": "pnpm build:vite && pnpm build:bundle", diff --git a/packages/@n8n/config/package.json b/packages/@n8n/config/package.json index a4abfa677adc1..e28090e49b9d2 100644 --- a/packages/@n8n/config/package.json +++ b/packages/@n8n/config/package.json @@ -1,6 +1,6 @@ { "name": "@n8n/config", - "version": "1.3.0", + "version": "1.4.0", "scripts": { "clean": "rimraf dist .turbo", "dev": "pnpm watch", diff --git a/packages/@n8n/nodes-langchain/README.md b/packages/@n8n/nodes-langchain/README.md index e5761628cf67b..03a23d21865d8 100644 --- a/packages/@n8n/nodes-langchain/README.md +++ b/packages/@n8n/nodes-langchain/README.md @@ -8,6 +8,4 @@ These nodes are still in Beta state and are only compatible with the Docker imag ## License -n8n is [fair-code](https://faircode.io) distributed under the [**Sustainable Use License**](https://github.com/n8n-io/n8n/blob/master/packages/cli/LICENSE.md). - -Additional information about the license can be found in the [docs](https://docs.n8n.io/reference/license/). +You can find the license information [here](https://github.com/n8n-io/n8n/blob/master/README.md#license) diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts index a027829e3a7d8..3c4ff28f06a10 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts @@ -38,7 +38,7 @@ export async function openAiFunctionsAgentExecute( const memory = (await this.getInputConnectionData(NodeConnectionType.AiMemory, 0)) as | BaseChatMemory | undefined; - const tools = await getConnectedTools(this, nodeVersion >= 1.5); + const tools = await getConnectedTools(this, nodeVersion >= 1.5, false); const outputParsers = await getOptionalOutputParsers(this); const options = this.getNodeParameter('options', 0, {}) as { systemMessage?: string; diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts index 81ac6bad5db43..e0da7f1e315f9 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts @@ -90,7 +90,7 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise; + const tools = (await getConnectedTools(this, true, false)) as Array; const outputParser = (await getOptionalOutputParsers(this))?.[0]; let structuredOutputParserTool: DynamicStructuredTool | undefined; diff --git a/packages/@n8n/nodes-langchain/nodes/agents/OpenAiAssistant/OpenAiAssistant.node.ts b/packages/@n8n/nodes-langchain/nodes/agents/OpenAiAssistant/OpenAiAssistant.node.ts index f56ce7c5c4484..8f05faccb0274 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/OpenAiAssistant/OpenAiAssistant.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/OpenAiAssistant/OpenAiAssistant.node.ts @@ -313,7 +313,7 @@ export class OpenAiAssistant implements INodeType { async execute(this: IExecuteFunctions): Promise { const nodeVersion = this.getNode().typeVersion; - const tools = await getConnectedTools(this, nodeVersion > 1); + const tools = await getConnectedTools(this, nodeVersion > 1, false); const credentials = await this.getCredentials('openAiApi'); const items = this.getInputData(); diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolHttpRequest/ToolHttpRequest.node.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolHttpRequest/ToolHttpRequest.node.ts index 1d391f313ef36..421e85e1b5b8c 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolHttpRequest/ToolHttpRequest.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolHttpRequest/ToolHttpRequest.node.ts @@ -12,6 +12,7 @@ import { NodeConnectionType, NodeOperationError, tryToParseAlphanumericString } import { DynamicTool } from '@langchain/core/tools'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nTool } from '../../../utils/N8nTool'; import { configureHttpRequestFunction, configureResponseOptimizer, @@ -19,6 +20,7 @@ import { prepareToolDescription, configureToolFunction, updateParametersAndOptions, + makeToolInputSchema, } from './utils'; import { @@ -38,7 +40,7 @@ export class ToolHttpRequest implements INodeType { name: 'toolHttpRequest', icon: { light: 'file:httprequest.svg', dark: 'file:httprequest.dark.svg' }, group: ['output'], - version: 1, + version: [1, 1.1], description: 'Makes an HTTP request and returns the response data', subtitle: '={{ $parameter.toolDescription }}', defaults: { @@ -394,9 +396,24 @@ export class ToolHttpRequest implements INodeType { optimizeResponse, ); - const description = prepareToolDescription(toolDescription, toolParameters); + let tool: DynamicTool | N8nTool; - const tool = new DynamicTool({ name, description, func }); + // If the node version is 1.1 or higher, we use the N8nTool wrapper: + // it allows to use tool as a DynamicStructuredTool and have a fallback to DynamicTool + if (this.getNode().typeVersion >= 1.1) { + const schema = makeToolInputSchema(toolParameters); + + tool = new N8nTool(this, { + name, + description: toolDescription, + func, + schema, + }); + } else { + // Keep the old behavior for nodes with version 1.0 + const description = prepareToolDescription(toolDescription, toolParameters); + tool = new DynamicTool({ name, description, func }); + } return { response: tool, diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolHttpRequest/utils.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolHttpRequest/utils.ts index 28015b588a101..96ae0d8492a33 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolHttpRequest/utils.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolHttpRequest/utils.ts @@ -27,6 +27,8 @@ import type { SendIn, ToolParameter, } from './interfaces'; +import type { DynamicZodObject } from '../../../types/zod.types'; +import { z } from 'zod'; const genericCredentialRequest = async (ctx: IExecuteFunctions, itemIndex: number) => { const genericType = ctx.getNodeParameter('genericAuthType', itemIndex) as string; @@ -566,7 +568,7 @@ export const configureToolFunction = ( httpRequest: (options: IHttpRequestOptions) => Promise, optimizeResponse: (response: string) => string, ) => { - return async (query: string): Promise => { + return async (query: string | IDataObject): Promise => { const { index } = ctx.addInputData(NodeConnectionType.AiTool, [[{ json: { query } }]]); let response: string = ''; @@ -581,18 +583,22 @@ export const configureToolFunction = ( if (query) { let dataFromModel; - try { - dataFromModel = jsonParse(query); - } catch (error) { - if (toolParameters.length === 1) { - dataFromModel = { [toolParameters[0].name]: query }; - } else { - throw new NodeOperationError( - ctx.getNode(), - `Input is not a valid JSON: ${error.message}`, - { itemIndex }, - ); + if (typeof query === 'string') { + try { + dataFromModel = jsonParse(query); + } catch (error) { + if (toolParameters.length === 1) { + dataFromModel = { [toolParameters[0].name]: query }; + } else { + throw new NodeOperationError( + ctx.getNode(), + `Input is not a valid JSON: ${error.message}`, + { itemIndex }, + ); + } } + } else { + dataFromModel = query; } for (const parameter of toolParameters) { @@ -727,6 +733,8 @@ export const configureToolFunction = ( } } } catch (error) { + console.error(error); + const errorMessage = 'Input provided by model is not valid'; if (error instanceof NodeOperationError) { @@ -765,3 +773,38 @@ export const configureToolFunction = ( return response; }; }; + +function makeParameterZodSchema(parameter: ToolParameter) { + let schema: z.ZodTypeAny; + + if (parameter.type === 'string') { + schema = z.string(); + } else if (parameter.type === 'number') { + schema = z.number(); + } else if (parameter.type === 'boolean') { + schema = z.boolean(); + } else if (parameter.type === 'json') { + schema = z.record(z.any()); + } else { + schema = z.string(); + } + + if (!parameter.required) { + schema = schema.optional(); + } + + if (parameter.description) { + schema = schema.describe(parameter.description); + } + + return schema; +} + +export function makeToolInputSchema(parameters: ToolParameter[]): DynamicZodObject { + const schemaEntries = parameters.map((parameter) => [ + parameter.name, + makeParameterZodSchema(parameter), + ]); + + return z.object(Object.fromEntries(schemaEntries)); +} diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts index 54b8318f5b8b8..5ed96cbd60123 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts @@ -493,7 +493,7 @@ export class ToolWorkflow implements INodeType { if (useSchema) { try { // We initialize these even though one of them will always be empty - // it makes it easer to navigate the ternary operator + // it makes it easier to navigate the ternary operator const jsonExample = this.getNodeParameter('jsonSchemaExample', itemIndex, '') as string; const inputSchema = this.getNodeParameter('inputSchema', itemIndex, '') as string; diff --git a/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/assistant/message.operation.ts b/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/assistant/message.operation.ts index da2770d05f2eb..134e8a1167924 100644 --- a/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/assistant/message.operation.ts +++ b/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/assistant/message.operation.ts @@ -163,7 +163,7 @@ export async function execute(this: IExecuteFunctions, i: number): Promise 1); + const tools = await getConnectedTools(this, nodeVersion > 1, false); let assistantTools; if (tools.length) { diff --git a/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/text/message.operation.ts b/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/text/message.operation.ts index 4cf72e9f5f48c..d37be5a065e6d 100644 --- a/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/text/message.operation.ts +++ b/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/actions/text/message.operation.ts @@ -219,7 +219,7 @@ export async function execute(this: IExecuteFunctions, i: number): Promise 1; - externalTools = await getConnectedTools(this, enforceUniqueNames); + externalTools = await getConnectedTools(this, enforceUniqueNames, false); } if (externalTools.length) { diff --git a/packages/@n8n/nodes-langchain/package.json b/packages/@n8n/nodes-langchain/package.json index 3a328af0ae880..b5d73d1862501 100644 --- a/packages/@n8n/nodes-langchain/package.json +++ b/packages/@n8n/nodes-langchain/package.json @@ -1,6 +1,6 @@ { "name": "@n8n/n8n-nodes-langchain", - "version": "1.53.0", + "version": "1.54.0", "description": "", "main": "index.js", "scripts": { @@ -153,7 +153,7 @@ "@langchain/textsplitters": "0.0.3", "@mozilla/readability": "^0.5.0", "@n8n/typeorm": "0.3.20-10", - "@n8n/vm2": "3.9.24", + "@n8n/vm2": "3.9.25", "@pinecone-database/pinecone": "3.0.0", "@qdrant/js-client-rest": "1.9.0", "@supabase/supabase-js": "2.43.4", diff --git a/packages/@n8n/nodes-langchain/utils/N8nTool.test.ts b/packages/@n8n/nodes-langchain/utils/N8nTool.test.ts new file mode 100644 index 0000000000000..6f12b18079551 --- /dev/null +++ b/packages/@n8n/nodes-langchain/utils/N8nTool.test.ts @@ -0,0 +1,169 @@ +import { N8nTool } from './N8nTool'; +import { createMockExecuteFunction } from 'n8n-nodes-base/test/nodes/Helpers'; +import { z } from 'zod'; +import type { INode } from 'n8n-workflow'; +import { DynamicStructuredTool, DynamicTool } from '@langchain/core/tools'; + +const mockNode: INode = { + id: '1', + name: 'Mock node', + typeVersion: 2, + type: 'n8n-nodes-base.mock', + position: [60, 760], + parameters: { + operation: 'test', + }, +}; + +describe('Test N8nTool wrapper as DynamicStructuredTool', () => { + it('should wrap a tool', () => { + const func = jest.fn(); + + const ctx = createMockExecuteFunction({}, mockNode); + + const tool = new N8nTool(ctx, { + name: 'Dummy Tool', + description: 'A dummy tool for testing', + func, + schema: z.object({ + foo: z.string(), + }), + }); + + expect(tool).toBeInstanceOf(DynamicStructuredTool); + }); +}); + +describe('Test N8nTool wrapper - DynamicTool fallback', () => { + it('should convert the tool to a dynamic tool', () => { + const func = jest.fn(); + + const ctx = createMockExecuteFunction({}, mockNode); + + const tool = new N8nTool(ctx, { + name: 'Dummy Tool', + description: 'A dummy tool for testing', + func, + schema: z.object({ + foo: z.string(), + }), + }); + + const dynamicTool = tool.asDynamicTool(); + + expect(dynamicTool).toBeInstanceOf(DynamicTool); + }); + + it('should format fallback description correctly', () => { + const func = jest.fn(); + + const ctx = createMockExecuteFunction({}, mockNode); + + const tool = new N8nTool(ctx, { + name: 'Dummy Tool', + description: 'A dummy tool for testing', + func, + schema: z.object({ + foo: z.string(), + bar: z.number().optional(), + qwe: z.boolean().describe('Boolean description'), + }), + }); + + const dynamicTool = tool.asDynamicTool(); + + expect(dynamicTool.description).toContain('foo: (description: , type: string, required: true)'); + expect(dynamicTool.description).toContain( + 'bar: (description: , type: number, required: false)', + ); + + expect(dynamicTool.description).toContain( + 'qwe: (description: Boolean description, type: boolean, required: true)', + ); + }); + + it('should handle empty parameter list correctly', () => { + const func = jest.fn(); + + const ctx = createMockExecuteFunction({}, mockNode); + + const tool = new N8nTool(ctx, { + name: 'Dummy Tool', + description: 'A dummy tool for testing', + func, + schema: z.object({}), + }); + + const dynamicTool = tool.asDynamicTool(); + + expect(dynamicTool.description).toEqual('A dummy tool for testing'); + }); + + it('should parse correct parameters', async () => { + const func = jest.fn(); + + const ctx = createMockExecuteFunction({}, mockNode); + + const tool = new N8nTool(ctx, { + name: 'Dummy Tool', + description: 'A dummy tool for testing', + func, + schema: z.object({ + foo: z.string().describe('Foo description'), + bar: z.number().optional(), + }), + }); + + const dynamicTool = tool.asDynamicTool(); + + const testParameters = { foo: 'some value' }; + + await dynamicTool.func(JSON.stringify(testParameters)); + + expect(func).toHaveBeenCalledWith(testParameters); + }); + + it('should recover when 1 parameter is passed directly', async () => { + const func = jest.fn(); + + const ctx = createMockExecuteFunction({}, mockNode); + + const tool = new N8nTool(ctx, { + name: 'Dummy Tool', + description: 'A dummy tool for testing', + func, + schema: z.object({ + foo: z.string().describe('Foo description'), + }), + }); + + const dynamicTool = tool.asDynamicTool(); + + const testParameter = 'some value'; + + await dynamicTool.func(testParameter); + + expect(func).toHaveBeenCalledWith({ foo: testParameter }); + }); + + it('should recover when JS object is passed instead of JSON', async () => { + const func = jest.fn(); + + const ctx = createMockExecuteFunction({}, mockNode); + + const tool = new N8nTool(ctx, { + name: 'Dummy Tool', + description: 'A dummy tool for testing', + func, + schema: z.object({ + foo: z.string().describe('Foo description'), + }), + }); + + const dynamicTool = tool.asDynamicTool(); + + await dynamicTool.func('{ foo: "some value" }'); + + expect(func).toHaveBeenCalledWith({ foo: 'some value' }); + }); +}); diff --git a/packages/@n8n/nodes-langchain/utils/N8nTool.ts b/packages/@n8n/nodes-langchain/utils/N8nTool.ts new file mode 100644 index 0000000000000..bb8bab08bd4fb --- /dev/null +++ b/packages/@n8n/nodes-langchain/utils/N8nTool.ts @@ -0,0 +1,113 @@ +import type { DynamicStructuredToolInput } from '@langchain/core/tools'; +import { DynamicStructuredTool, DynamicTool } from '@langchain/core/tools'; +import type { IExecuteFunctions, IDataObject } from 'n8n-workflow'; +import { NodeConnectionType, jsonParse, NodeOperationError } from 'n8n-workflow'; +import { StructuredOutputParser } from 'langchain/output_parsers'; +import type { ZodTypeAny } from 'zod'; +import { ZodBoolean, ZodNullable, ZodNumber, ZodObject, ZodOptional } from 'zod'; + +const getSimplifiedType = (schema: ZodTypeAny) => { + if (schema instanceof ZodObject) { + return 'object'; + } else if (schema instanceof ZodNumber) { + return 'number'; + } else if (schema instanceof ZodBoolean) { + return 'boolean'; + } else if (schema instanceof ZodNullable || schema instanceof ZodOptional) { + return getSimplifiedType(schema.unwrap()); + } + + return 'string'; +}; + +const getParametersDescription = (parameters: Array<[string, ZodTypeAny]>) => + parameters + .map( + ([name, schema]) => + `${name}: (description: ${schema.description ?? ''}, type: ${getSimplifiedType(schema)}, required: ${!schema.isOptional()})`, + ) + .join(',\n '); + +export const prepareFallbackToolDescription = (toolDescription: string, schema: ZodObject) => { + let description = `${toolDescription}`; + + const toolParameters = Object.entries(schema.shape); + + if (toolParameters.length) { + description += ` +Tool expects valid stringified JSON object with ${toolParameters.length} properties. +Property names with description, type and required status: +${getParametersDescription(toolParameters)} +ALL parameters marked as required must be provided`; + } + + return description; +}; + +export class N8nTool extends DynamicStructuredTool { + private context: IExecuteFunctions; + + constructor(context: IExecuteFunctions, fields: DynamicStructuredToolInput) { + super(fields); + + this.context = context; + } + + asDynamicTool(): DynamicTool { + const { name, func, schema, context, description } = this; + + const parser = new StructuredOutputParser(schema); + + const wrappedFunc = async function (query: string) { + let parsedQuery: object; + + // First we try to parse the query using the structured parser (Zod schema) + try { + parsedQuery = await parser.parse(query); + } catch (e) { + // If we were unable to parse the query using the schema, we try to gracefully handle it + let dataFromModel; + + try { + // First we try to parse a JSON with more relaxed rules + dataFromModel = jsonParse(query, { acceptJSObject: true }); + } catch (error) { + // In case of error, + // If model supplied a simple string instead of an object AND only one parameter expected, we try to recover the object structure + if (Object.keys(schema.shape).length === 1) { + const parameterName = Object.keys(schema.shape)[0]; + dataFromModel = { [parameterName]: query }; + } else { + // Finally throw an error if we were unable to parse the query + throw new NodeOperationError( + context.getNode(), + `Input is not a valid JSON: ${error.message}`, + ); + } + } + + // If we were able to parse the query with a fallback, we try to validate it using the schema + // Here we will throw an error if the data still does not match the schema + parsedQuery = schema.parse(dataFromModel); + } + + try { + // Call tool function with parsed query + const result = await func(parsedQuery); + + return result; + } catch (e) { + const { index } = context.addInputData(NodeConnectionType.AiTool, [[{ json: { query } }]]); + void context.addOutputData(NodeConnectionType.AiTool, index, e); + + return e.toString(); + } + }; + + return new DynamicTool({ + name, + description: prepareFallbackToolDescription(description, schema), + func: wrappedFunc, + }); + } +} diff --git a/packages/@n8n/nodes-langchain/utils/helpers.ts b/packages/@n8n/nodes-langchain/utils/helpers.ts index c6d27ee2f69f3..673fa0402c483 100644 --- a/packages/@n8n/nodes-langchain/utils/helpers.ts +++ b/packages/@n8n/nodes-langchain/utils/helpers.ts @@ -1,17 +1,19 @@ -import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow'; import type { EventNamesAiNodesType, IDataObject, IExecuteFunctions, IWebhookFunctions, } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow'; import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import type { BaseOutputParser } from '@langchain/core/output_parsers'; import type { BaseMessage } from '@langchain/core/messages'; -import { DynamicTool, type Tool } from '@langchain/core/tools'; +import type { Tool } from '@langchain/core/tools'; import type { BaseLLM } from '@langchain/core/language_models/llms'; import type { BaseChatMemory } from 'langchain/memory'; import type { BaseChatMessageHistory } from '@langchain/core/chat_history'; +import { N8nTool } from './N8nTool'; +import { DynamicTool } from '@langchain/core/tools'; function hasMethods(obj: unknown, ...methodNames: Array): obj is T { return methodNames.every( @@ -178,7 +180,11 @@ export function serializeChatHistory(chatHistory: BaseMessage[]): string { .join('\n'); } -export const getConnectedTools = async (ctx: IExecuteFunctions, enforceUniqueNames: boolean) => { +export const getConnectedTools = async ( + ctx: IExecuteFunctions, + enforceUniqueNames: boolean, + convertStructuredTool: boolean = true, +) => { const connectedTools = ((await ctx.getInputConnectionData(NodeConnectionType.AiTool, 0)) as Tool[]) || []; @@ -186,8 +192,10 @@ export const getConnectedTools = async (ctx: IExecuteFunctions, enforceUniqueNam const seenNames = new Set(); + const finalTools = []; + for (const tool of connectedTools) { - if (!(tool instanceof DynamicTool)) continue; + if (!(tool instanceof DynamicTool) && !(tool instanceof N8nTool)) continue; const { name } = tool; if (seenNames.has(name)) { @@ -197,7 +205,13 @@ export const getConnectedTools = async (ctx: IExecuteFunctions, enforceUniqueNam ); } seenNames.add(name); + + if (convertStructuredTool && tool instanceof N8nTool) { + finalTools.push(tool.asDynamicTool()); + } else { + finalTools.push(tool); + } } - return connectedTools; + return finalTools; }; diff --git a/packages/cli/README.md b/packages/cli/README.md index beeac69369b2f..526f4631676de 100644 --- a/packages/cli/README.md +++ b/packages/cli/README.md @@ -147,8 +147,4 @@ You can also find breaking changes here: [Breaking Changes](./BREAKING-CHANGES.m ## License -n8n is [fair-code](https://faircode.io) distributed under the [**Sustainable Use License**](https://github.com/n8n-io/n8n/blob/master/packages/cli/LICENSE.md). - -Proprietary licenses are available for enterprise customers. [Get in touch](mailto:license@n8n.io) - -Additional information about the license can be found in the [docs](https://docs.n8n.io/reference/license/). +You can find the license information [here](https://github.com/n8n-io/n8n/blob/master/README.md#license) diff --git a/packages/cli/package.json b/packages/cli/package.json index 0da569f1b086b..51744547d4367 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "n8n", - "version": "1.53.0", + "version": "1.54.0", "description": "n8n Workflow Automation Tool", "main": "dist/index", "types": "dist/index.d.ts", diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index ebbf2cd6033e2..744db98479ac8 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -13,15 +13,15 @@ import { N8nInstanceType } from '@/Interfaces'; import { ExternalHooks } from '@/ExternalHooks'; import { send, sendErrorResponse } from '@/ResponseHelper'; import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares'; -import { TestWebhooks } from '@/TestWebhooks'; import { WaitingForms } from '@/WaitingForms'; -import { WaitingWebhooks } from '@/WaitingWebhooks'; -import { webhookRequestHandler } from '@/WebhookHelpers'; +import { TestWebhooks } from '@/webhooks/TestWebhooks'; +import { WaitingWebhooks } from '@/webhooks/WaitingWebhooks'; +import { webhookRequestHandler } from '@/webhooks/WebhookHelpers'; +import { ActiveWebhooks } from '@/webhooks/ActiveWebhooks'; import { generateHostInstanceId } from './databases/utils/generators'; import { Logger } from '@/Logger'; import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error'; import { OnShutdown } from '@/decorators/OnShutdown'; -import { ActiveWebhooks } from '@/ActiveWebhooks'; import { GlobalConfig } from '@n8n/config'; @Service() diff --git a/packages/cli/src/ActiveWorkflowManager.ts b/packages/cli/src/ActiveWorkflowManager.ts index 1d5050e4d1ca5..fac6e9d9faa2e 100644 --- a/packages/cli/src/ActiveWorkflowManager.ts +++ b/packages/cli/src/ActiveWorkflowManager.ts @@ -27,7 +27,7 @@ import { } from 'n8n-workflow'; import type { IWorkflowDb } from '@/Interfaces'; -import * as WebhookHelpers from '@/WebhookHelpers'; +import * as WebhookHelpers from '@/webhooks/WebhookHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import type { WorkflowEntity } from '@db/entities/WorkflowEntity'; @@ -40,7 +40,7 @@ import { } from '@/constants'; import { NodeTypes } from '@/NodeTypes'; import { ExternalHooks } from '@/ExternalHooks'; -import { WebhookService } from './services/webhook.service'; +import { WebhookService } from '@/webhooks/webhook.service'; import { Logger } from './Logger'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { OrchestrationService } from '@/services/orchestration.service'; diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 8ddbd10fb29c1..73b1e99ec6559 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -1,4 +1,4 @@ -import type { Application, Request, Response } from 'express'; +import type { Application } from 'express'; import type { ExecutionError, ICredentialDataDecryptedObject, @@ -22,7 +22,6 @@ import type { FeatureFlags, INodeProperties, IUserSettings, - IHttpRequestMethods, StartNodeData, } from 'n8n-workflow'; @@ -43,7 +42,7 @@ import type { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { ExternalHooks } from './ExternalHooks'; import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants'; import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types'; -import type { WorkerJobStatusSummary } from './services/orchestration/worker/types'; +import type { RunningJobSummary } from './scaling/types'; import type { Scope } from '@n8n/permissions'; export interface ICredentialsTypeData { @@ -239,34 +238,6 @@ export interface IExternalHooksFunctions { }; } -export type WebhookCORSRequest = Request & { method: 'OPTIONS' }; - -export type WebhookRequest = Request<{ path: string }> & { - method: IHttpRequestMethods; - params: Record; -}; - -export type WaitingWebhookRequest = WebhookRequest & { - params: WebhookRequest['path'] & { suffix?: string }; -}; - -export interface WebhookAccessControlOptions { - allowedOrigins?: string; -} - -export interface IWebhookManager { - /** Gets all request methods associated with a webhook path*/ - getWebhookMethods?: (path: string) => Promise; - - /** Find the CORS options matching a path and method */ - findAccessControlOptions?: ( - path: string, - httpMethod: IHttpRequestMethods, - ) => Promise; - - executeWebhook(req: WebhookRequest, res: Response): Promise; -} - export interface IVersionNotificationSettings { enabled: boolean; endpoint: string; @@ -449,7 +420,7 @@ export interface IPushDataWorkerStatusMessage { export interface IPushDataWorkerStatusPayload { workerId: string; - runningJobsSummary: WorkerJobStatusSummary[]; + runningJobsSummary: RunningJobSummary[]; freeMem: number; totalMem: number; uptime: number; @@ -466,13 +437,6 @@ export interface IPushDataWorkerStatusPayload { version: string; } -export interface IResponseCallbackData { - data?: IDataObject | IDataObject[]; - headers?: object; - noWebhookResponse?: boolean; - responseCode?: number; -} - export interface INodesTypeData { [key: string]: { className: string; diff --git a/packages/cli/src/PublicApi/v1/openapi.yml b/packages/cli/src/PublicApi/v1/openapi.yml index 30c3a73bde0c7..0a84925734c7b 100644 --- a/packages/cli/src/PublicApi/v1/openapi.yml +++ b/packages/cli/src/PublicApi/v1/openapi.yml @@ -8,7 +8,7 @@ info: email: hello@n8n.io license: name: Sustainable Use License - url: https://github.com/n8n-io/n8n/blob/master/packages/cli/LICENSE.md + url: https://github.com/n8n-io/n8n/blob/master/LICENSE.md version: 1.1.1 externalDocs: description: n8n API documentation diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts deleted file mode 100644 index 11cfed839bee7..0000000000000 --- a/packages/cli/src/Queue.ts +++ /dev/null @@ -1,166 +0,0 @@ -import type Bull from 'bull'; -import Container, { Service } from 'typedi'; -import { - ApplicationError, - BINARY_ENCODING, - type IDataObject, - type ExecutionError, - type IExecuteResponsePromiseData, -} from 'n8n-workflow'; -import { ActiveExecutions } from '@/ActiveExecutions'; -import config from '@/config'; -import { OnShutdown } from './decorators/OnShutdown'; -import { HIGHEST_SHUTDOWN_PRIORITY } from './constants'; - -export type JobId = Bull.JobId; -export type Job = Bull.Job; -export type JobQueue = Bull.Queue; - -export interface JobData { - executionId: string; - loadStaticData: boolean; -} - -export interface JobResponse { - success: boolean; - error?: ExecutionError; -} - -export interface WebhookResponse { - executionId: string; - response: IExecuteResponsePromiseData; -} - -@Service() -export class Queue { - private jobQueue: JobQueue; - - /** - * The number of jobs a single server can process concurrently - * Any worker that wants to process executions must first set this to a non-zero value - */ - private concurrency = 0; - - setConcurrency(concurrency: number) { - this.concurrency = concurrency; - // This sets the max event listeners on the jobQueue EventEmitter to prevent the logs getting flooded with MaxListenersExceededWarning - // see: https://github.com/OptimalBits/bull/blob/develop/lib/job.js#L497-L521 - this.jobQueue.setMaxListeners( - 4 + // `close` - 2 + // `error` - 2 + // `global:progress` - concurrency * 2, // 2 global events for every call to `job.finished()` - ); - } - - constructor(private activeExecutions: ActiveExecutions) {} - - async init() { - const { default: Bull } = await import('bull'); - const { RedisClientService } = await import('@/services/redis/redis-client.service'); - - const redisClientService = Container.get(RedisClientService); - - const bullPrefix = config.getEnv('queue.bull.prefix'); - const prefix = redisClientService.toValidPrefix(bullPrefix); - - this.jobQueue = new Bull('jobs', { - prefix, - settings: config.get('queue.bull.settings'), - createClient: (type) => redisClientService.createClient({ type: `${type}(bull)` }), - }); - - this.jobQueue.on('global:progress', (_jobId, progress: WebhookResponse) => { - this.activeExecutions.resolveResponsePromise( - progress.executionId, - this.decodeWebhookResponse(progress.response), - ); - }); - } - - async findRunningJobBy({ executionId }: { executionId: string }) { - const activeOrWaitingJobs = await this.getJobs(['active', 'waiting']); - - return activeOrWaitingJobs.find(({ data }) => data.executionId === executionId) ?? null; - } - - decodeWebhookResponse(response: IExecuteResponsePromiseData): IExecuteResponsePromiseData { - if ( - typeof response === 'object' && - typeof response.body === 'object' && - (response.body as IDataObject)['__@N8nEncodedBuffer@__'] - ) { - response.body = Buffer.from( - (response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string, - BINARY_ENCODING, - ); - } - - return response; - } - - async add(jobData: JobData, jobOptions: object): Promise { - return await this.jobQueue.add(jobData, jobOptions); - } - - async getJob(jobId: JobId): Promise { - return await this.jobQueue.getJob(jobId); - } - - async getJobs(jobTypes: Bull.JobStatus[]): Promise { - return await this.jobQueue.getJobs(jobTypes); - } - - /** - * Get IDs of executions that are currently in progress in the queue. - */ - async getInProgressExecutionIds() { - const inProgressJobs = await this.getJobs(['active', 'waiting']); - - return new Set(inProgressJobs.map((job) => job.data.executionId)); - } - - async process(fn: Bull.ProcessCallbackFunction): Promise { - return await this.jobQueue.process(this.concurrency, fn); - } - - async ping(): Promise { - return await this.jobQueue.client.ping(); - } - - @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) - // Stop accepting new jobs, `doNotWaitActive` allows reporting progress - async pause(): Promise { - return await this.jobQueue?.pause(true, true); - } - - getBullObjectInstance(): JobQueue { - if (this.jobQueue === undefined) { - // if queue is not initialized yet throw an error, since we do not want to hand around an undefined queue - throw new ApplicationError('Queue is not initialized yet!'); - } - return this.jobQueue; - } - - /** - * - * @param job A Job instance - * @returns boolean true if we were able to securely stop the job - */ - async stopJob(job: Job): Promise { - if (await job.isActive()) { - // Job is already running so tell it to stop - await job.progress(-1); - return true; - } - // Job did not get started yet so remove from queue - try { - await job.remove(); - return true; - } catch (e) { - await job.progress(-1); - } - - return false; - } -} diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 19419300cff40..a8cee5e2532c1 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -215,8 +215,8 @@ export class Server extends AbstractServer { setupPushHandler(restEndpoint, app); if (config.getEnv('executions.mode') === 'queue') { - const { Queue } = await import('@/Queue'); - await Container.get(Queue).init(); + const { ScalingService } = await import('@/scaling/scaling.service'); + await Container.get(ScalingService).setupQueue(); } await handleMfaDisable(); diff --git a/packages/cli/src/WaitingForms.ts b/packages/cli/src/WaitingForms.ts index 0625acd7e40c4..bf0ab7dedb161 100644 --- a/packages/cli/src/WaitingForms.ts +++ b/packages/cli/src/WaitingForms.ts @@ -1,7 +1,7 @@ import { Service } from 'typedi'; import type { IExecutionResponse } from '@/Interfaces'; -import { WaitingWebhooks } from '@/WaitingWebhooks'; +import { WaitingWebhooks } from '@/webhooks/WaitingWebhooks'; @Service() export class WaitingForms extends WaitingWebhooks { diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 2f7976a073b30..ac5f586b0ad43 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -28,8 +28,8 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExternalHooks } from '@/ExternalHooks'; import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; -import type { Job, JobData, JobResponse } from '@/Queue'; -import { Queue } from '@/Queue'; +import type { Job, JobData, JobResult } from '@/scaling/types'; +import { ScalingService } from '@/scaling/scaling.service'; import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; @@ -40,7 +40,7 @@ import { EventService } from './events/event.service'; @Service() export class WorkflowRunner { - private jobQueue: Queue; + private readonly scalingService: ScalingService; private executionsMode = config.getEnv('executions.mode'); @@ -55,7 +55,7 @@ export class WorkflowRunner { private readonly eventService: EventService, ) { if (this.executionsMode === 'queue') { - this.jobQueue = Container.get(Queue); + this.scalingService = Container.get(ScalingService); } } @@ -375,9 +375,7 @@ export class WorkflowRunner { let job: Job; let hooks: WorkflowHooks; try { - job = await this.jobQueue.add(jobData, jobOptions); - - this.logger.info(`Started with job ID: ${job.id.toString()} (Execution ID: ${executionId})`); + job = await this.scalingService.addJob(jobData, jobOptions); hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain( data.executionMode, @@ -406,8 +404,7 @@ export class WorkflowRunner { async (resolve, reject, onCancel) => { onCancel.shouldReject = false; onCancel(async () => { - const queue = Container.get(Queue); - await queue.stopJob(job); + await Container.get(ScalingService).stopJob(job); // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // "workflowExecuteAfter" which we require. @@ -424,11 +421,11 @@ export class WorkflowRunner { reject(error); }); - const jobData: Promise = job.finished(); + const jobData: Promise = job.finished(); const queueRecoveryInterval = config.getEnv('queue.bull.queueRecoveryInterval'); - const racingPromises: Array> = [jobData]; + const racingPromises: Array> = [jobData]; let clearWatchdogInterval; if (queueRecoveryInterval > 0) { @@ -446,9 +443,9 @@ export class WorkflowRunner { ************************************************ */ let watchDogInterval: NodeJS.Timeout | undefined; - const watchDog: Promise = new Promise((res) => { + const watchDog: Promise = new Promise((res) => { watchDogInterval = setInterval(async () => { - const currentJob = await this.jobQueue.getJob(job.id); + const currentJob = await this.scalingService.getJob(job.id); // When null means job is finished (not found in queue) if (currentJob === null) { // Mimic worker's success message diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index e76ac358170c1..505e2e216170d 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -4,8 +4,8 @@ import { ApplicationError } from 'n8n-workflow'; import config from '@/config'; import { ActiveExecutions } from '@/ActiveExecutions'; -import { WebhookServer } from '@/WebhookServer'; -import { Queue } from '@/Queue'; +import { ScalingService } from '@/scaling/scaling.service'; +import { WebhookServer } from '@/webhooks/WebhookServer'; import { BaseCommand } from './BaseCommand'; import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service'; @@ -86,7 +86,7 @@ export class Webhook extends BaseCommand { await this.initExternalHooks(); this.logger.debug('External hooks init complete'); await this.initExternalSecrets(); - this.logger.debug('External seecrets init complete'); + this.logger.debug('External secrets init complete'); } async run() { @@ -96,7 +96,7 @@ export class Webhook extends BaseCommand { ); } - await Container.get(Queue).init(); + await Container.get(ScalingService).setupQueue(); await this.server.start(); this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`); this.logger.info('Webhook listener waiting for requests.'); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 151ecfdf90d12..18f039029c020 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -2,21 +2,13 @@ import { Container } from 'typedi'; import { Flags, type Config } from '@oclif/core'; import express from 'express'; import http from 'http'; -import type PCancelable from 'p-cancelable'; -import { WorkflowExecute } from 'n8n-core'; -import type { ExecutionStatus, IExecuteResponsePromiseData, INodeTypes, IRun } from 'n8n-workflow'; -import { Workflow, sleep, ApplicationError } from 'n8n-workflow'; +import { sleep, ApplicationError } from 'n8n-workflow'; import * as Db from '@/Db'; import * as ResponseHelper from '@/ResponseHelper'; -import * as WebhookHelpers from '@/WebhookHelpers'; -import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import config from '@/config'; -import type { Job, JobId, JobResponse, WebhookResponse } from '@/Queue'; -import { Queue } from '@/Queue'; +import { ScalingService } from '@/scaling/scaling.service'; import { N8N_VERSION, inTest } from '@/constants'; -import { ExecutionRepository } from '@db/repositories/execution.repository'; -import { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { ICredentialsOverwrite } from '@/Interfaces'; import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { rawBodyReader, bodyParser } from '@/middlewares'; @@ -25,10 +17,9 @@ import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisService import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; -import type { WorkerJobStatusSummary } from '@/services/orchestration/worker/types'; import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error'; import { BaseCommand } from './BaseCommand'; -import { MaxStalledCountError } from '@/errors/max-stalled-count.error'; +import { JobProcessor } from '@/scaling/job-processor'; import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay'; export class Worker extends BaseCommand { @@ -44,15 +35,17 @@ export class Worker extends BaseCommand { }), }; - static runningJobs: { - [key: string]: PCancelable; - } = {}; + /** + * How many jobs this worker may run concurrently. + * + * Taken from env var `N8N_CONCURRENCY_PRODUCTION_LIMIT` if set to a value + * other than -1, else taken from `--concurrency` flag. + */ + concurrency: number; - static runningJobsSummary: { - [jobId: string]: WorkerJobStatusSummary; - } = {}; + scalingService: ScalingService; - static jobQueue: Queue; + jobProcessor: JobProcessor; redisSubscriber: RedisServicePubSubSubscriber; @@ -73,12 +66,12 @@ export class Worker extends BaseCommand { // Wait for active workflow executions to finish let count = 0; - while (Object.keys(Worker.runningJobs).length !== 0) { + while (this.jobProcessor.getRunningJobIds().length !== 0) { if (count++ % 4 === 0) { const waitLeft = Math.ceil((hardStopTimeMs - Date.now()) / 1000); this.logger.info( `Waiting for ${ - Object.keys(Worker.runningJobs).length + Object.keys(this.jobProcessor.getRunningJobIds()).length } active executions to finish... (max wait ${waitLeft} more seconds)`, ); } @@ -92,143 +85,6 @@ export class Worker extends BaseCommand { await this.exitSuccessFully(); } - async runJob(job: Job, nodeTypes: INodeTypes): Promise { - const { executionId, loadStaticData } = job.data; - const executionRepository = Container.get(ExecutionRepository); - const fullExecutionData = await executionRepository.findSingleExecution(executionId, { - includeData: true, - unflattenData: true, - }); - - if (!fullExecutionData) { - this.logger.error( - `Worker failed to find data of execution "${executionId}" in database. Cannot continue.`, - { executionId }, - ); - throw new ApplicationError( - 'Unable to find data of execution in database. Aborting execution.', - { extra: { executionId } }, - ); - } - const workflowId = fullExecutionData.workflowData.id; - - this.logger.info( - `Start job: ${job.id} (Workflow ID: ${workflowId} | Execution: ${executionId})`, - ); - await executionRepository.updateStatus(executionId, 'running'); - - let { staticData } = fullExecutionData.workflowData; - if (loadStaticData) { - const workflowData = await Container.get(WorkflowRepository).findOne({ - select: ['id', 'staticData'], - where: { - id: workflowId, - }, - }); - if (workflowData === null) { - this.logger.error( - 'Worker execution failed because workflow could not be found in database.', - { workflowId, executionId }, - ); - throw new ApplicationError('Workflow could not be found', { extra: { workflowId } }); - } - staticData = workflowData.staticData; - } - - const workflowSettings = fullExecutionData.workflowData.settings ?? {}; - - let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default - - let executionTimeoutTimestamp: number | undefined; - if (workflowTimeout > 0) { - workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')); - executionTimeoutTimestamp = Date.now() + workflowTimeout * 1000; - } - - const workflow = new Workflow({ - id: workflowId, - name: fullExecutionData.workflowData.name, - nodes: fullExecutionData.workflowData.nodes, - connections: fullExecutionData.workflowData.connections, - active: fullExecutionData.workflowData.active, - nodeTypes, - staticData, - settings: fullExecutionData.workflowData.settings, - }); - - const additionalData = await WorkflowExecuteAdditionalData.getBase( - undefined, - undefined, - executionTimeoutTimestamp, - ); - additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( - fullExecutionData.mode, - job.data.executionId, - fullExecutionData.workflowData, - { - retryOf: fullExecutionData.retryOf as string, - }, - ); - - additionalData.hooks.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - const progress: WebhookResponse = { - executionId, - response: WebhookHelpers.encodeWebhookResponse(response), - }; - await job.progress(progress); - }, - ]; - - additionalData.executionId = executionId; - - additionalData.setExecutionStatus = (status: ExecutionStatus) => { - // Can't set the status directly in the queued worker, but it will happen in InternalHook.onWorkflowPostExecute - this.logger.debug(`Queued worker execution status for ${executionId} is "${status}"`); - }; - - let workflowExecute: WorkflowExecute; - let workflowRun: PCancelable; - if (fullExecutionData.data !== undefined) { - workflowExecute = new WorkflowExecute( - additionalData, - fullExecutionData.mode, - fullExecutionData.data, - ); - workflowRun = workflowExecute.processRunExecutionData(workflow); - } else { - // Execute all nodes - // Can execute without webhook so go on - workflowExecute = new WorkflowExecute(additionalData, fullExecutionData.mode); - workflowRun = workflowExecute.run(workflow); - } - - Worker.runningJobs[job.id] = workflowRun; - Worker.runningJobsSummary[job.id] = { - jobId: job.id.toString(), - executionId, - workflowId: fullExecutionData.workflowId ?? '', - workflowName: fullExecutionData.workflowData.name, - mode: fullExecutionData.mode, - startedAt: fullExecutionData.startedAt, - retryOf: fullExecutionData.retryOf ?? '', - status: fullExecutionData.status, - }; - - // Wait till the execution is finished - await workflowRun; - - delete Worker.runningJobs[job.id]; - delete Worker.runningJobsSummary[job.id]; - - // do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution() - // already! - - return { - success: true, - }; - } - constructor(argv: string[], cmdConfig: Config) { super(argv, cmdConfig); @@ -256,6 +112,7 @@ export class Worker extends BaseCommand { this.logger.debug('Starting n8n worker...'); this.logger.debug(`Queue mode id: ${this.queueModeId}`); + await this.setConcurrency(); await super.init(); await this.initLicense(); @@ -268,8 +125,7 @@ export class Worker extends BaseCommand { this.logger.debug('External secrets init complete'); await this.initEventBus(); this.logger.debug('Event bus init complete'); - await this.initQueue(); - this.logger.debug('Queue init complete'); + await this.initScalingService(); await this.initOrchestration(); this.logger.debug('Orchestration init complete'); @@ -301,80 +157,27 @@ export class Worker extends BaseCommand { await Container.get(OrchestrationHandlerWorkerService).initWithOptions({ queueModeId: this.queueModeId, redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher, - getRunningJobIds: () => Object.keys(Worker.runningJobs), - getRunningJobsSummary: () => Object.values(Worker.runningJobsSummary), + getRunningJobIds: () => this.jobProcessor.getRunningJobIds(), + getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(), }); } - async initQueue() { + async setConcurrency() { const { flags } = await this.parse(Worker); - const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); - - this.logger.debug( - `Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`, - ); - - Worker.jobQueue = Container.get(Queue); - await Worker.jobQueue.init(); - this.logger.debug('Queue singleton ready'); - const envConcurrency = config.getEnv('executions.concurrency.productionLimit'); - const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency; - Worker.jobQueue.setConcurrency(concurrency); - void Worker.jobQueue.process(async (job) => await this.runJob(job, this.nodeTypes)); + this.concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency; + } - Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => { - // Progress of a job got updated which does get used - // to communicate that a job got canceled. + async initScalingService() { + this.scalingService = Container.get(ScalingService); - if (progress === -1) { - // Job has to get canceled - if (Worker.runningJobs[jobId] !== undefined) { - // Job is processed by current worker so cancel - Worker.runningJobs[jobId].cancel(); - delete Worker.runningJobs[jobId]; - } - } - }); + await this.scalingService.setupQueue(); - let lastTimer = 0; - let cumulativeTimeout = 0; - Worker.jobQueue.getBullObjectInstance().on('error', (error: Error) => { - if (error.toString().includes('ECONNREFUSED')) { - const now = Date.now(); - if (now - lastTimer > 30000) { - // Means we had no timeout at all or last timeout was temporary and we recovered - lastTimer = now; - cumulativeTimeout = 0; - } else { - cumulativeTimeout += now - lastTimer; - lastTimer = now; - if (cumulativeTimeout > redisConnectionTimeoutLimit) { - this.logger.error( - `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, - ); - process.exit(1); - } - } - this.logger.warn('Redis unavailable - trying to reconnect...'); - } else if (error.toString().includes('Error initializing Lua scripts')) { - // This is a non-recoverable error - // Happens when worker starts and Redis is unavailable - // Even if Redis comes back online, worker will be zombie - this.logger.error('Error initializing worker.'); - process.exit(2); - } else { - this.logger.error('Error from queue: ', error); - - if (error.message.includes('job stalled more than maxStalledCount')) { - throw new MaxStalledCountError(error); - } + this.scalingService.setupWorker(this.concurrency); - throw error; - } - }); + this.jobProcessor = Container.get(JobProcessor); } async setupHealthMonitor() { @@ -410,7 +213,7 @@ export class Worker extends BaseCommand { // if it loses the connection to redis try { // Redis ping - await Worker.jobQueue.ping(); + await this.scalingService.pingQueue(); } catch (e) { this.logger.error('No Redis connection!', e as Error); const error = new ServiceUnavailableError('No Redis connection!'); @@ -476,18 +279,16 @@ export class Worker extends BaseCommand { } async run() { - const { flags } = await this.parse(Worker); - this.logger.info('\nn8n worker is now ready'); this.logger.info(` * Version: ${N8N_VERSION}`); - this.logger.info(` * Concurrency: ${flags.concurrency}`); + this.logger.info(` * Concurrency: ${this.concurrency}`); this.logger.info(''); if (config.getEnv('queue.health.active')) { await this.setupHealthMonitor(); } - if (process.stdout.isTTY) { + if (!inTest && process.stdout.isTTY) { process.stdin.setRawMode(true); process.stdin.resume(); process.stdin.setEncoding('utf8'); diff --git a/packages/cli/src/databases/repositories/projectRelation.repository.ts b/packages/cli/src/databases/repositories/projectRelation.repository.ts index 00fc4de34a195..1f875d011fe06 100644 --- a/packages/cli/src/databases/repositories/projectRelation.repository.ts +++ b/packages/cli/src/databases/repositories/projectRelation.repository.ts @@ -61,4 +61,12 @@ export class ProjectRelationRepository extends Repository { return [...new Set(rows.map((r) => r.userId))]; } + + async findAllByUser(userId: string) { + return await this.find({ + where: { + userId, + }, + }); + } } diff --git a/packages/cli/src/databases/repositories/sharedWorkflow.repository.ts b/packages/cli/src/databases/repositories/sharedWorkflow.repository.ts index 4dc54935cb193..d8e224fee2280 100644 --- a/packages/cli/src/databases/repositories/sharedWorkflow.repository.ts +++ b/packages/cli/src/databases/repositories/sharedWorkflow.repository.ts @@ -200,4 +200,13 @@ export class SharedWorkflowRepository extends Repository { }) )?.project; } + + async getRelationsByWorkflowIdsAndProjectIds(workflowIds: string[], projectIds: string[]) { + return await this.find({ + where: { + workflowId: In(workflowIds), + projectId: In(projectIds), + }, + }); + } } diff --git a/packages/cli/src/errors/max-stalled-count.error.ts b/packages/cli/src/errors/max-stalled-count.error.ts index 6715de0ade837..653ca18eacac7 100644 --- a/packages/cli/src/errors/max-stalled-count.error.ts +++ b/packages/cli/src/errors/max-stalled-count.error.ts @@ -1,7 +1,7 @@ import { ApplicationError } from 'n8n-workflow'; /** - * See https://github.com/OptimalBits/bull/blob/60fa88f08637f0325639988a3f054880a04ce402/docs/README.md?plain=1#L133 + * @docs https://docs.bullmq.io/guide/workers/stalled-jobs */ export class MaxStalledCountError extends ApplicationError { constructor(cause: Error) { diff --git a/packages/cli/src/executions/__tests__/execution.service.test.ts b/packages/cli/src/executions/__tests__/execution.service.test.ts index 567e0c9758a6f..879ee707cdc8d 100644 --- a/packages/cli/src/executions/__tests__/execution.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution.service.test.ts @@ -6,14 +6,15 @@ import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.err import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error'; import type { ActiveExecutions } from '@/ActiveExecutions'; import type { IExecutionResponse } from '@/Interfaces'; -import type { Job, Queue } from '@/Queue'; +import type { ScalingService } from '@/scaling/scaling.service'; import type { WaitTracker } from '@/WaitTracker'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { ExecutionRequest } from '@/executions/execution.types'; import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; +import type { Job } from '@/scaling/types'; describe('ExecutionService', () => { - const queue = mock(); + const scalingService = mock(); const activeExecutions = mock(); const executionRepository = mock(); const waitTracker = mock(); @@ -22,7 +23,7 @@ describe('ExecutionService', () => { const executionService = new ExecutionService( mock(), mock(), - queue, + scalingService, activeExecutions, executionRepository, mock(), @@ -31,6 +32,7 @@ describe('ExecutionService', () => { mock(), concurrencyControl, mock(), + mock(), ); beforeEach(() => { @@ -210,7 +212,7 @@ describe('ExecutionService', () => { expect(concurrencyControl.remove).not.toHaveBeenCalled(); expect(waitTracker.stopExecution).not.toHaveBeenCalled(); - expect(queue.stopJob).not.toHaveBeenCalled(); + expect(scalingService.stopJob).not.toHaveBeenCalled(); }); }); @@ -223,7 +225,8 @@ describe('ExecutionService', () => { const execution = mock({ id: '123', status: 'running' }); executionRepository.findSingleExecution.mockResolvedValue(execution); waitTracker.has.mockReturnValue(false); - queue.findRunningJobBy.mockResolvedValue(mock()); + const job = mock({ data: { executionId: '123' } }); + scalingService.findJobsByStatus.mockResolvedValue([job]); executionRepository.stopDuringRun.mockResolvedValue(mock()); /** @@ -236,8 +239,8 @@ describe('ExecutionService', () => { */ expect(waitTracker.stopExecution).not.toHaveBeenCalled(); expect(activeExecutions.stopExecution).toHaveBeenCalled(); - expect(queue.findRunningJobBy).toBeCalledWith({ executionId: execution.id }); - expect(queue.stopJob).toHaveBeenCalled(); + expect(scalingService.findJobsByStatus).toHaveBeenCalled(); + expect(scalingService.stopJob).toHaveBeenCalled(); expect(executionRepository.stopDuringRun).toHaveBeenCalled(); }); @@ -249,7 +252,8 @@ describe('ExecutionService', () => { const execution = mock({ id: '123', status: 'waiting' }); executionRepository.findSingleExecution.mockResolvedValue(execution); waitTracker.has.mockReturnValue(true); - queue.findRunningJobBy.mockResolvedValue(mock()); + const job = mock({ data: { executionId: '123' } }); + scalingService.findJobsByStatus.mockResolvedValue([job]); executionRepository.stopDuringRun.mockResolvedValue(mock()); /** @@ -261,9 +265,8 @@ describe('ExecutionService', () => { * Assert */ expect(waitTracker.stopExecution).toHaveBeenCalledWith(execution.id); - expect(activeExecutions.stopExecution).toHaveBeenCalled(); - expect(queue.findRunningJobBy).toBeCalledWith({ executionId: execution.id }); - expect(queue.stopJob).toHaveBeenCalled(); + expect(scalingService.findJobsByStatus).toHaveBeenCalled(); + expect(scalingService.stopJob).toHaveBeenCalled(); expect(executionRepository.stopDuringRun).toHaveBeenCalled(); }); }); diff --git a/packages/cli/src/executions/__tests__/executions.controller.test.ts b/packages/cli/src/executions/__tests__/executions.controller.test.ts index 2a4c733c5ab3c..decb88d59854e 100644 --- a/packages/cli/src/executions/__tests__/executions.controller.test.ts +++ b/packages/cli/src/executions/__tests__/executions.controller.test.ts @@ -74,6 +74,8 @@ describe('ExecutionsController', () => { }, ]; + executionService.findRangeWithCount.mockResolvedValue(NO_EXECUTIONS); + describe('if either status or range provided', () => { test.each(QUERIES_WITH_EITHER_STATUS_OR_RANGE)( 'should fetch executions per query', diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 435145545ab16..d691df1b3ce36 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -135,9 +135,11 @@ export class ExecutionRecoveryService { return waitMs; } - const { Queue } = await import('@/Queue'); + const { ScalingService } = await import('@/scaling/scaling.service'); - const queuedIds = await Container.get(Queue).getInProgressExecutionIds(); + const runningJobs = await Container.get(ScalingService).findJobsByStatus(['active', 'waiting']); + + const queuedIds = new Set(runningJobs.map((job) => job.data.executionId)); if (queuedIds.size === 0) { this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions'); diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index bb8650e99f1d6..849d06c968c7e 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -24,7 +24,7 @@ import type { IWorkflowExecutionDataProcess, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; -import { Queue } from '@/Queue'; +import { ScalingService } from '@/scaling/scaling.service'; import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types'; import { WorkflowRunner } from '@/WorkflowRunner'; import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository'; @@ -40,6 +40,8 @@ import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error'; import { License } from '@/License'; +import type { User } from '@/databases/entities/User'; +import { WorkflowSharingService } from '@/workflows/workflowSharing.service'; export const schemaGetExecutionsQueryFilter = { $id: '/IGetExecutionsQueryFilter', @@ -83,7 +85,7 @@ export class ExecutionService { constructor( private readonly globalConfig: GlobalConfig, private readonly logger: Logger, - private readonly queue: Queue, + private readonly scalingService: ScalingService, private readonly activeExecutions: ActiveExecutions, private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, @@ -92,6 +94,7 @@ export class ExecutionService { private readonly workflowRunner: WorkflowRunner, private readonly concurrencyControl: ConcurrencyControlService, private readonly license: License, + private readonly workflowSharingService: WorkflowSharingService, ) {} async findOne( @@ -468,14 +471,28 @@ export class ExecutionService { this.waitTracker.stopExecution(execution.id); } - const job = await this.queue.findRunningJobBy({ executionId: execution.id }); + const jobs = await this.scalingService.findJobsByStatus(['active', 'waiting']); + + const job = jobs.find(({ data }) => data.executionId === execution.id); if (job) { - await this.queue.stopJob(job); + await this.scalingService.stopJob(job); } else { this.logger.debug('Job to stop not in queue', { executionId: execution.id }); } return await this.executionRepository.stopDuringRun(execution); } + + async addScopes(user: User, summaries: ExecutionSummaries.ExecutionSummaryWithScopes[]) { + const workflowIds = [...new Set(summaries.map((s) => s.workflowId))]; + + const scopes = Object.fromEntries( + await this.workflowSharingService.getSharedWorkflowScopes(workflowIds, user), + ); + + for (const s of summaries) { + s.scopes = scopes[s.workflowId] ?? []; + } + } } diff --git a/packages/cli/src/executions/execution.types.ts b/packages/cli/src/executions/execution.types.ts index 7e8872bf1b305..fd5024adbd718 100644 --- a/packages/cli/src/executions/execution.types.ts +++ b/packages/cli/src/executions/execution.types.ts @@ -1,6 +1,12 @@ import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity'; import type { AuthenticatedRequest } from '@/requests'; -import type { ExecutionStatus, IDataObject, WorkflowExecuteMode } from 'n8n-workflow'; +import type { Scope } from '@n8n/permissions'; +import type { + ExecutionStatus, + ExecutionSummary, + IDataObject, + WorkflowExecuteMode, +} from 'n8n-workflow'; export declare namespace ExecutionRequest { namespace QueryParams { @@ -83,6 +89,8 @@ export namespace ExecutionSummaries { stoppedAt?: 'DESC'; }; }; + + export type ExecutionSummaryWithScopes = ExecutionSummary & { scopes: Scope[] }; } export type QueueRecoverySettings = { diff --git a/packages/cli/src/executions/executions.controller.ts b/packages/cli/src/executions/executions.controller.ts index 64b6a5427429f..c68c8cb7d5c48 100644 --- a/packages/cli/src/executions/executions.controller.ts +++ b/packages/cli/src/executions/executions.controller.ts @@ -1,4 +1,4 @@ -import { ExecutionRequest } from './execution.types'; +import { ExecutionRequest, type ExecutionSummaries } from './execution.types'; import { ExecutionService } from './execution.service'; import { Get, Post, RestController } from '@/decorators'; import { EnterpriseExecutionsService } from './execution.service.ee'; @@ -53,10 +53,20 @@ export class ExecutionsController { const noRange = !query.range.lastId || !query.range.firstId; if (noStatus && noRange) { - return await this.executionService.findLatestCurrentAndCompleted(query); + const executions = await this.executionService.findLatestCurrentAndCompleted(query); + await this.executionService.addScopes( + req.user, + executions.results as ExecutionSummaries.ExecutionSummaryWithScopes[], + ); + return executions; } - return await this.executionService.findRangeWithCount(query); + const executions = await this.executionService.findRangeWithCount(query); + await this.executionService.addScopes( + req.user, + executions.results as ExecutionSummaries.ExecutionSummaryWithScopes[], + ); + return executions; } @Get('/:id') diff --git a/packages/cli/src/scaling/constants.ts b/packages/cli/src/scaling/constants.ts new file mode 100644 index 0000000000000..8ef5f716b17aa --- /dev/null +++ b/packages/cli/src/scaling/constants.ts @@ -0,0 +1,3 @@ +export const QUEUE_NAME = 'jobs'; + +export const JOB_TYPE_NAME = 'job'; diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts new file mode 100644 index 0000000000000..6057a1937db76 --- /dev/null +++ b/packages/cli/src/scaling/job-processor.ts @@ -0,0 +1,182 @@ +import { Service } from 'typedi'; +import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow'; +import { WorkflowExecute } from 'n8n-core'; +import { Logger } from '@/Logger'; +import config from '@/config'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; +import { NodeTypes } from '@/NodeTypes'; +import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow'; +import type { Job, JobId, JobResult, RunningJob, RunningJobSummary } from './types'; +import type PCancelable from 'p-cancelable'; + +/** + * Responsible for processing jobs from the queue, i.e. running enqueued executions. + */ +@Service() +export class JobProcessor { + private readonly runningJobs: { [jobId: JobId]: RunningJob } = {}; + + constructor( + private readonly logger: Logger, + private readonly executionRepository: ExecutionRepository, + private readonly workflowRepository: WorkflowRepository, + private readonly nodeTypes: NodeTypes, + ) {} + + async processJob(job: Job): Promise { + const { executionId, loadStaticData } = job.data; + + const execution = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); + + if (!execution) { + this.logger.error('[JobProcessor] Failed to find execution data', { executionId }); + throw new ApplicationError('Failed to find execution data. Aborting execution.', { + extra: { executionId }, + }); + } + + const workflowId = execution.workflowData.id; + + this.logger.info(`[JobProcessor] Starting job ${job.id} (execution ${executionId})`); + + await this.executionRepository.updateStatus(executionId, 'running'); + + let { staticData } = execution.workflowData; + + if (loadStaticData) { + const workflowData = await this.workflowRepository.findOne({ + select: ['id', 'staticData'], + where: { id: workflowId }, + }); + + if (workflowData === null) { + this.logger.error('[JobProcessor] Failed to find workflow', { workflowId, executionId }); + throw new ApplicationError('Failed to find workflow', { extra: { workflowId } }); + } + + staticData = workflowData.staticData; + } + + const workflowSettings = execution.workflowData.settings ?? {}; + + let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); + + let executionTimeoutTimestamp: number | undefined; + + if (workflowTimeout > 0) { + workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')); + executionTimeoutTimestamp = Date.now() + workflowTimeout * 1000; + } + + const workflow = new Workflow({ + id: workflowId, + name: execution.workflowData.name, + nodes: execution.workflowData.nodes, + connections: execution.workflowData.connections, + active: execution.workflowData.active, + nodeTypes: this.nodeTypes, + staticData, + settings: execution.workflowData.settings, + }); + + const additionalData = await WorkflowExecuteAdditionalData.getBase( + undefined, + undefined, + executionTimeoutTimestamp, + ); + + additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( + execution.mode, + job.data.executionId, + execution.workflowData, + { retryOf: execution.retryOf as string }, + ); + + additionalData.hooks.hookFunctions.sendResponse = [ + async (response: IExecuteResponsePromiseData): Promise => { + await job.progress({ + kind: 'respond-to-webhook', + executionId, + response: this.encodeWebhookResponse(response), + }); + }, + ]; + + additionalData.executionId = executionId; + + additionalData.setExecutionStatus = (status: ExecutionStatus) => { + // Can't set the status directly in the queued worker, but it will happen in InternalHook.onWorkflowPostExecute + this.logger.debug( + `[JobProcessor] Queued worker execution status for ${executionId} is "${status}"`, + ); + }; + + let workflowExecute: WorkflowExecute; + let workflowRun: PCancelable; + if (execution.data !== undefined) { + workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data); + workflowRun = workflowExecute.processRunExecutionData(workflow); + } else { + // Execute all nodes + // Can execute without webhook so go on + workflowExecute = new WorkflowExecute(additionalData, execution.mode); + workflowRun = workflowExecute.run(workflow); + } + + const runningJob: RunningJob = { + run: workflowRun, + executionId, + workflowId: execution.workflowId, + workflowName: execution.workflowData.name, + mode: execution.mode, + startedAt: execution.startedAt, + retryOf: execution.retryOf ?? '', + status: execution.status, + }; + + this.runningJobs[job.id] = runningJob; + + await workflowRun; + + delete this.runningJobs[job.id]; + + this.logger.debug('[JobProcessor] Job finished running', { jobId: job.id, executionId }); + + /** + * @important Do NOT call `workflowExecuteAfter` hook here. + * It is being called from processSuccessExecution() already. + */ + + return { success: true }; + } + + stopJob(jobId: JobId) { + this.runningJobs[jobId]?.run.cancel(); + delete this.runningJobs[jobId]; + } + + getRunningJobIds(): JobId[] { + return Object.keys(this.runningJobs); + } + + getRunningJobsSummary(): RunningJobSummary[] { + return Object.values(this.runningJobs).map(({ run, ...summary }) => summary); + } + + private encodeWebhookResponse( + response: IExecuteResponsePromiseData, + ): IExecuteResponsePromiseData { + if (typeof response === 'object' && Buffer.isBuffer(response.body)) { + response.body = { + '__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING), + }; + } + + return response; + } +} diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts new file mode 100644 index 0000000000000..c52a82e8a0da6 --- /dev/null +++ b/packages/cli/src/scaling/scaling.service.ts @@ -0,0 +1,213 @@ +import Container, { Service } from 'typedi'; +import { ApplicationError, BINARY_ENCODING } from 'n8n-workflow'; +import { ActiveExecutions } from '@/ActiveExecutions'; +import config from '@/config'; +import { Logger } from '@/Logger'; +import { MaxStalledCountError } from '@/errors/max-stalled-count.error'; +import { HIGHEST_SHUTDOWN_PRIORITY } from '@/constants'; +import { OnShutdown } from '@/decorators/OnShutdown'; +import { JOB_TYPE_NAME, QUEUE_NAME } from './constants'; +import { JobProcessor } from './job-processor'; +import type { JobQueue, Job, JobData, JobOptions, JobMessage, JobStatus, JobId } from './types'; +import type { IExecuteResponsePromiseData } from 'n8n-workflow'; + +@Service() +export class ScalingService { + private queue: JobQueue; + + private readonly instanceType = config.getEnv('generic.instanceType'); + + constructor( + private readonly logger: Logger, + private readonly activeExecutions: ActiveExecutions, + private readonly jobProcessor: JobProcessor, + ) {} + + // #region Lifecycle + + async setupQueue() { + const { default: BullQueue } = await import('bull'); + const { RedisClientService } = await import('@/services/redis/redis-client.service'); + const service = Container.get(RedisClientService); + + const bullPrefix = config.getEnv('queue.bull.prefix'); + const prefix = service.toValidPrefix(bullPrefix); + + this.queue = new BullQueue(QUEUE_NAME, { + prefix, + settings: config.get('queue.bull.settings'), + createClient: (type) => service.createClient({ type: `${type}(bull)` }), + }); + + this.registerListeners(); + + this.logger.debug('[ScalingService] Queue setup completed'); + } + + setupWorker(concurrency: number) { + this.assertWorker(); + + void this.queue.process( + JOB_TYPE_NAME, + concurrency, + async (job: Job) => await this.jobProcessor.processJob(job), + ); + + this.logger.debug('[ScalingService] Worker setup completed'); + } + + @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) + async pauseQueue() { + await this.queue.pause(true, true); + + this.logger.debug('[ScalingService] Queue paused'); + } + + async pingQueue() { + await this.queue.client.ping(); + } + + // #endregion + + // #region Jobs + + async addJob(jobData: JobData, jobOptions: JobOptions) { + const { executionId } = jobData; + + const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions); + + this.logger.info(`[ScalingService] Added job ${job.id} (execution ${executionId})`); + + return job; + } + + async getJob(jobId: JobId) { + return await this.queue.getJob(jobId); + } + + async findJobsByStatus(statuses: JobStatus[]) { + return await this.queue.getJobs(statuses); + } + + async stopJob(job: Job) { + const props = { jobId: job.id, executionId: job.data.executionId }; + + try { + if (await job.isActive()) { + await job.progress({ kind: 'abort-job' }); + this.logger.debug('[ScalingService] Stopped active job', props); + return true; + } + + await job.remove(); + this.logger.debug('[ScalingService] Stopped inactive job', props); + return true; + } catch (error: unknown) { + await job.progress({ kind: 'abort-job' }); + this.logger.error('[ScalingService] Failed to stop job', { ...props, error }); + return false; + } + } + + // #endregion + + // #region Listeners + + private registerListeners() { + this.queue.on('global:progress', (_jobId: JobId, msg: JobMessage) => { + if (msg.kind === 'respond-to-webhook') { + const { executionId, response } = msg; + this.activeExecutions.resolveResponsePromise( + executionId, + this.decodeWebhookResponse(response), + ); + } + }); + + this.queue.on('global:progress', (jobId: JobId, msg: JobMessage) => { + if (msg.kind === 'abort-job') { + this.jobProcessor.stopJob(jobId); + } + }); + + let latestAttemptTs = 0; + let cumulativeTimeoutMs = 0; + + const MAX_TIMEOUT_MS = config.getEnv('queue.bull.redis.timeoutThreshold'); + const RESET_LENGTH_MS = 30_000; + + this.queue.on('error', (error: Error) => { + this.logger.error('[ScalingService] Queue errored', { error }); + + /** + * On Redis connection failure, try to reconnect. On every failed attempt, + * increment a cumulative timeout - if this exceeds a limit, exit the + * process. Reset the cumulative timeout if >30s between retries. + */ + if (error.message.includes('ECONNREFUSED')) { + const nowTs = Date.now(); + if (nowTs - latestAttemptTs > RESET_LENGTH_MS) { + latestAttemptTs = nowTs; + cumulativeTimeoutMs = 0; + } else { + cumulativeTimeoutMs += nowTs - latestAttemptTs; + latestAttemptTs = nowTs; + if (cumulativeTimeoutMs > MAX_TIMEOUT_MS) { + this.logger.error('[ScalingService] Redis unavailable after max timeout'); + this.logger.error('[ScalingService] Exiting process...'); + process.exit(1); + } + } + + this.logger.warn('[ScalingService] Redis unavailable - retrying to connect...'); + return; + } + + if ( + this.instanceType === 'worker' && + error.message.includes('job stalled more than maxStalledCount') + ) { + throw new MaxStalledCountError(error); + } + + /** + * Non-recoverable error on worker start with Redis unavailable. + * Even if Redis recovers, worker will remain unable to process jobs. + */ + if ( + this.instanceType === 'worker' && + error.message.includes('Error initializing Lua scripts') + ) { + this.logger.error('[ScalingService] Fatal error initializing worker', { error }); + this.logger.error('[ScalingService] Exiting process...'); + process.exit(1); + } + + throw error; + }); + } + + // #endregion + + private decodeWebhookResponse( + response: IExecuteResponsePromiseData, + ): IExecuteResponsePromiseData { + if ( + typeof response === 'object' && + typeof response.body === 'object' && + response.body !== null && + '__@N8nEncodedBuffer@__' in response.body && + typeof response.body['__@N8nEncodedBuffer@__'] === 'string' + ) { + response.body = Buffer.from(response.body['__@N8nEncodedBuffer@__'], BINARY_ENCODING); + } + + return response; + } + + private assertWorker() { + if (this.instanceType === 'worker') return; + + throw new ApplicationError('This method must be called on a `worker` instance'); + } +} diff --git a/packages/cli/src/scaling/types.ts b/packages/cli/src/scaling/types.ts new file mode 100644 index 0000000000000..55d49b8c48487 --- /dev/null +++ b/packages/cli/src/scaling/types.ts @@ -0,0 +1,55 @@ +import type { + ExecutionError, + ExecutionStatus, + IExecuteResponsePromiseData, + IRun, + WorkflowExecuteMode as WorkflowExecutionMode, +} from 'n8n-workflow'; +import type Bull from 'bull'; +import type PCancelable from 'p-cancelable'; + +export type JobQueue = Bull.Queue; + +export type Job = Bull.Job; + +export type JobId = Job['id']; + +export type JobData = { + executionId: string; + loadStaticData: boolean; +}; + +export type JobResult = { + success: boolean; + error?: ExecutionError; +}; + +export type JobStatus = Bull.JobStatus; + +export type JobOptions = Bull.JobOptions; + +/** Message sent by worker to queue or by queue to worker. */ +export type JobMessage = RepondToWebhookMessage | AbortJobMessage; + +export type RepondToWebhookMessage = { + kind: 'respond-to-webhook'; + executionId: string; + response: IExecuteResponsePromiseData; +}; + +export type AbortJobMessage = { + kind: 'abort-job'; +}; + +export type RunningJob = { + executionId: string; + workflowId: string; + workflowName: string; + mode: WorkflowExecutionMode; + startedAt: Date; + retryOf: string; + status: ExecutionStatus; + run: PCancelable; +}; + +export type RunningJobSummary = Omit; diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 7945f59bc35f8..9a7b6b5640dd0 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -7,7 +7,7 @@ import { License } from '@/License'; import { Logger } from '@/Logger'; import { ActiveWorkflowManager } from '@/ActiveWorkflowManager'; import { Push } from '@/push'; -import { TestWebhooks } from '@/TestWebhooks'; +import { TestWebhooks } from '@/webhooks/TestWebhooks'; import { OrchestrationService } from '@/services/orchestration.service'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { CommunityPackagesService } from '@/services/communityPackages.service'; diff --git a/packages/cli/src/services/orchestration/worker/types.ts b/packages/cli/src/services/orchestration/worker/types.ts index 351c56394a31c..84c515466e2c2 100644 --- a/packages/cli/src/services/orchestration/worker/types.ts +++ b/packages/cli/src/services/orchestration/worker/types.ts @@ -1,11 +1,12 @@ import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow'; import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher'; +import type { RunningJobSummary } from '@/scaling/types'; export interface WorkerCommandReceivedHandlerOptions { queueModeId: string; redisPublisher: RedisServicePubSubPublisher; - getRunningJobIds: () => string[]; - getRunningJobsSummary: () => WorkerJobStatusSummary[]; + getRunningJobIds: () => Array; + getRunningJobsSummary: () => RunningJobSummary[]; } export interface WorkerJobStatusSummary { diff --git a/packages/cli/src/ActiveWebhooks.ts b/packages/cli/src/webhooks/ActiveWebhooks.ts similarity index 91% rename from packages/cli/src/ActiveWebhooks.ts rename to packages/cli/src/webhooks/ActiveWebhooks.ts index 43136e75d384a..c18f949b88cca 100644 --- a/packages/cli/src/ActiveWebhooks.ts +++ b/packages/cli/src/webhooks/ActiveWebhooks.ts @@ -5,20 +5,25 @@ import type { INode, IWebhookData, IHttpRequestMethods } from 'n8n-workflow'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { - IResponseCallbackData, + IWebhookResponseCallbackData, IWebhookManager, WebhookAccessControlOptions, WebhookRequest, -} from '@/Interfaces'; +} from './webhook.types'; import { Logger } from '@/Logger'; import { NodeTypes } from '@/NodeTypes'; -import { WebhookService } from '@/services/webhook.service'; +import { WebhookService } from '@/webhooks/webhook.service'; import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; -import * as WebhookHelpers from '@/WebhookHelpers'; +import * as WebhookHelpers from '@/webhooks/WebhookHelpers'; import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service'; +/** + * Service for handling the execution of production webhooks, i.e. webhooks + * that belong to activated workflows and use the production URL + * (https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.webhook/#webhook-urls) + */ @Service() export class ActiveWebhooks implements IWebhookManager { constructor( @@ -57,7 +62,7 @@ export class ActiveWebhooks implements IWebhookManager { async executeWebhook( request: WebhookRequest, response: Response, - ): Promise { + ): Promise { const httpMethod = request.method; const path = request.params.path; diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/webhooks/TestWebhooks.ts similarity index 94% rename from packages/cli/src/TestWebhooks.ts rename to packages/cli/src/webhooks/TestWebhooks.ts index 827226ee546be..8cf1b4292989b 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/webhooks/TestWebhooks.ts @@ -8,26 +8,30 @@ import type { IRunData, } from 'n8n-workflow'; import type { - IResponseCallbackData, + IWebhookResponseCallbackData, IWebhookManager, - IWorkflowDb, WebhookAccessControlOptions, WebhookRequest, -} from '@/Interfaces'; +} from './webhook.types'; import { Push } from '@/push'; import { NodeTypes } from '@/NodeTypes'; -import * as WebhookHelpers from '@/WebhookHelpers'; +import * as WebhookHelpers from '@/webhooks/WebhookHelpers'; import { TEST_WEBHOOK_TIMEOUT } from '@/constants'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error'; import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error'; import * as NodeExecuteFunctions from 'n8n-core'; -import { removeTrailingSlash } from './utils'; -import type { TestWebhookRegistration } from '@/services/test-webhook-registrations.service'; -import { TestWebhookRegistrationsService } from '@/services/test-webhook-registrations.service'; +import { removeTrailingSlash } from '@/utils'; +import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service'; +import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service'; import { OrchestrationService } from '@/services/orchestration.service'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; +import type { IWorkflowDb } from '@/Interfaces'; +/** + * Service for handling the execution of webhooks of manual executions + * that use the [Test URL](https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.webhook/#webhook-urls). + */ @Service() export class TestWebhooks implements IWebhookManager { constructor( @@ -46,7 +50,7 @@ export class TestWebhooks implements IWebhookManager { async executeWebhook( request: WebhookRequest, response: express.Response, - ): Promise { + ): Promise { const httpMethod = request.method; let path = removeTrailingSlash(request.params.path); @@ -117,7 +121,7 @@ export class TestWebhooks implements IWebhookManager { undefined, // executionId request, response, - (error: Error | null, data: IResponseCallbackData) => { + (error: Error | null, data: IWebhookResponseCallbackData) => { if (error !== null) reject(error); else resolve(data); }, diff --git a/packages/cli/src/WaitingWebhooks.ts b/packages/cli/src/webhooks/WaitingWebhooks.ts similarity index 88% rename from packages/cli/src/WaitingWebhooks.ts rename to packages/cli/src/webhooks/WaitingWebhooks.ts index d795c948f142f..367635c45ec64 100644 --- a/packages/cli/src/WaitingWebhooks.ts +++ b/packages/cli/src/webhooks/WaitingWebhooks.ts @@ -2,21 +2,25 @@ import { NodeHelpers, Workflow } from 'n8n-workflow'; import { Service } from 'typedi'; import type express from 'express'; -import * as WebhookHelpers from '@/WebhookHelpers'; +import * as WebhookHelpers from '@/webhooks/WebhookHelpers'; import { NodeTypes } from '@/NodeTypes'; import type { - IExecutionResponse, - IResponseCallbackData, + IWebhookResponseCallbackData, IWebhookManager, - IWorkflowDb, WaitingWebhookRequest, -} from '@/Interfaces'; +} from './webhook.types'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { Logger } from '@/Logger'; -import { ConflictError } from './errors/response-errors/conflict.error'; -import { NotFoundError } from './errors/response-errors/not-found.error'; - +import { ConflictError } from '@/errors/response-errors/conflict.error'; +import { NotFoundError } from '@/errors/response-errors/not-found.error'; +import type { IExecutionResponse, IWorkflowDb } from '@/Interfaces'; + +/** + * Service for handling the execution of webhooks of Wait nodes that use the + * [Resume On Webhook Call](https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.wait/#on-webhook-call) + * feature. + */ @Service() export class WaitingWebhooks implements IWebhookManager { protected includeForms = false; @@ -40,7 +44,7 @@ export class WaitingWebhooks implements IWebhookManager { async executeWebhook( req: WaitingWebhookRequest, res: express.Response, - ): Promise { + ): Promise { const { path: executionId, suffix } = req.params; this.logReceivedWebhook(req.method, executionId); diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/webhooks/WebhookHelpers.ts similarity index 96% rename from packages/cli/src/WebhookHelpers.ts rename to packages/cli/src/webhooks/WebhookHelpers.ts index eafec28133326..f090a7ea7e318 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/webhooks/WebhookHelpers.ts @@ -20,7 +20,6 @@ import type { IDataObject, IDeferredPromise, IExecuteData, - IExecuteResponsePromiseData, IHttpRequestMethods, IN8nHttpFullResponse, INode, @@ -43,27 +42,25 @@ import { } from 'n8n-workflow'; import type { - IExecutionDb, - IResponseCallbackData, + IWebhookResponseCallbackData, IWebhookManager, - IWorkflowDb, - IWorkflowExecutionDataProcess, WebhookCORSRequest, WebhookRequest, -} from '@/Interfaces'; +} from './webhook.types'; import * as ResponseHelper from '@/ResponseHelper'; import * as WorkflowHelpers from '@/WorkflowHelpers'; import { WorkflowRunner } from '@/WorkflowRunner'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { ActiveExecutions } from '@/ActiveExecutions'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; -import { OwnershipService } from './services/ownership.service'; -import { parseBody } from './middlewares'; -import { Logger } from './Logger'; -import { NotFoundError } from './errors/response-errors/not-found.error'; -import { InternalServerError } from './errors/response-errors/internal-server.error'; -import { UnprocessableRequestError } from './errors/response-errors/unprocessable.error'; -import type { Project } from './databases/entities/Project'; +import { OwnershipService } from '@/services/ownership.service'; +import { parseBody } from '@/middlewares'; +import { Logger } from '@/Logger'; +import { NotFoundError } from '@/errors/response-errors/not-found.error'; +import { InternalServerError } from '@/errors/response-errors/internal-server.error'; +import { UnprocessableRequestError } from '@/errors/response-errors/unprocessable.error'; +import type { Project } from '@/databases/entities/Project'; +import type { IExecutionDb, IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces'; export const WEBHOOK_METHODS: IHttpRequestMethods[] = [ 'DELETE', @@ -137,7 +134,7 @@ export const webhookRequestHandler = return ResponseHelper.sendSuccessResponse(res, {}, true, 204); } - let response; + let response: IWebhookResponseCallbackData; try { response = await webhookManager.executeWebhook(req, res); } catch (error) { @@ -192,18 +189,6 @@ export function getWorkflowWebhooks( return returnData; } -export function encodeWebhookResponse( - response: IExecuteResponsePromiseData, -): IExecuteResponsePromiseData { - if (typeof response === 'object' && Buffer.isBuffer(response.body)) { - response.body = { - '__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING), - }; - } - - return response; -} - const normalizeFormData = (values: Record) => { for (const key in values) { const value = values[key]; @@ -228,7 +213,7 @@ export async function executeWebhook( executionId: string | undefined, req: WebhookRequest, res: express.Response, - responseCallback: (error: Error | null, data: IResponseCallbackData) => void, + responseCallback: (error: Error | null, data: IWebhookResponseCallbackData) => void, destinationNode?: string, ): Promise { // Get the nodeType to know which responseMode is set diff --git a/packages/cli/src/WebhookServer.ts b/packages/cli/src/webhooks/WebhookServer.ts similarity index 100% rename from packages/cli/src/WebhookServer.ts rename to packages/cli/src/webhooks/WebhookServer.ts diff --git a/packages/cli/src/__tests__/TestWebhooks.test.ts b/packages/cli/src/webhooks/__tests__/TestWebhooks.test.ts similarity index 93% rename from packages/cli/src/__tests__/TestWebhooks.test.ts rename to packages/cli/src/webhooks/__tests__/TestWebhooks.test.ts index 6c7ae555b3b52..deb6930b96e14 100644 --- a/packages/cli/src/__tests__/TestWebhooks.test.ts +++ b/packages/cli/src/webhooks/__tests__/TestWebhooks.test.ts @@ -1,20 +1,21 @@ import { mock } from 'jest-mock-extended'; -import { TestWebhooks } from '@/TestWebhooks'; +import { TestWebhooks } from '@/webhooks/TestWebhooks'; import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error'; import { v4 as uuid } from 'uuid'; import { generateNanoId } from '@/databases/utils/generators'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; -import * as WebhookHelpers from '@/WebhookHelpers'; +import * as WebhookHelpers from '@/webhooks/WebhookHelpers'; import type * as express from 'express'; -import type { IWorkflowDb, WebhookRequest } from '@/Interfaces'; +import type { IWorkflowDb } from '@/Interfaces'; import type { IWebhookData, IWorkflowExecuteAdditionalData, Workflow } from 'n8n-workflow'; import type { TestWebhookRegistrationsService, TestWebhookRegistration, -} from '@/services/test-webhook-registrations.service'; +} from '@/webhooks/test-webhook-registrations.service'; import * as AdditionalData from '@/WorkflowExecuteAdditionalData'; +import type { WebhookRequest } from '@/webhooks/webhook.types'; jest.mock('@/WorkflowExecuteAdditionalData'); diff --git a/packages/cli/src/__tests__/WebhookHelpers.test.ts b/packages/cli/src/webhooks/__tests__/WebhookHelpers.test.ts similarity index 97% rename from packages/cli/src/__tests__/WebhookHelpers.test.ts rename to packages/cli/src/webhooks/__tests__/WebhookHelpers.test.ts index 391d01b6fcfe4..9f86fbc549fcf 100644 --- a/packages/cli/src/__tests__/WebhookHelpers.test.ts +++ b/packages/cli/src/webhooks/__tests__/WebhookHelpers.test.ts @@ -3,8 +3,8 @@ import { mock } from 'jest-mock-extended'; import { randomString } from 'n8n-workflow'; import type { IHttpRequestMethods } from 'n8n-workflow'; -import type { IWebhookManager, WebhookCORSRequest, WebhookRequest } from '@/Interfaces'; -import { webhookRequestHandler } from '@/WebhookHelpers'; +import type { IWebhookManager, WebhookCORSRequest, WebhookRequest } from '@/webhooks/webhook.types'; +import { webhookRequestHandler } from '@/webhooks/WebhookHelpers'; describe('WebhookHelpers', () => { describe('webhookRequestHandler', () => { diff --git a/packages/cli/src/services/__tests__/test-webhook-registrations.service.test.ts b/packages/cli/src/webhooks/__tests__/test-webhook-registrations.service.test.ts similarity index 95% rename from packages/cli/src/services/__tests__/test-webhook-registrations.service.test.ts rename to packages/cli/src/webhooks/__tests__/test-webhook-registrations.service.test.ts index c93540938c1bd..95502e3611e7c 100644 --- a/packages/cli/src/services/__tests__/test-webhook-registrations.service.test.ts +++ b/packages/cli/src/webhooks/__tests__/test-webhook-registrations.service.test.ts @@ -1,7 +1,7 @@ import type { CacheService } from '@/services/cache/cache.service'; import type { OrchestrationService } from '@/services/orchestration.service'; -import type { TestWebhookRegistration } from '@/services/test-webhook-registrations.service'; -import { TestWebhookRegistrationsService } from '@/services/test-webhook-registrations.service'; +import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service'; +import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service'; import { mock } from 'jest-mock-extended'; describe('TestWebhookRegistrationsService', () => { diff --git a/packages/cli/src/__tests__/waiting-webhooks.test.ts b/packages/cli/src/webhooks/__tests__/waiting-webhooks.test.ts similarity index 90% rename from packages/cli/src/__tests__/waiting-webhooks.test.ts rename to packages/cli/src/webhooks/__tests__/waiting-webhooks.test.ts index 6748c7233566f..31f64eb198432 100644 --- a/packages/cli/src/__tests__/waiting-webhooks.test.ts +++ b/packages/cli/src/webhooks/__tests__/waiting-webhooks.test.ts @@ -1,10 +1,11 @@ import { mock } from 'jest-mock-extended'; -import { WaitingWebhooks } from '@/WaitingWebhooks'; +import { WaitingWebhooks } from '@/webhooks/WaitingWebhooks'; import { ConflictError } from '@/errors/response-errors/conflict.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; -import type { IExecutionResponse, WaitingWebhookRequest } from '@/Interfaces'; +import type { IExecutionResponse } from '@/Interfaces'; import type express from 'express'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import type { WaitingWebhookRequest } from '@/webhooks/webhook.types'; describe('WaitingWebhooks', () => { const executionRepository = mock(); diff --git a/packages/cli/src/services/__tests__/webhook.service.test.ts b/packages/cli/src/webhooks/__tests__/webhook.service.test.ts similarity index 99% rename from packages/cli/src/services/__tests__/webhook.service.test.ts rename to packages/cli/src/webhooks/__tests__/webhook.service.test.ts index 181bc60752956..5a8e19e84c809 100644 --- a/packages/cli/src/services/__tests__/webhook.service.test.ts +++ b/packages/cli/src/webhooks/__tests__/webhook.service.test.ts @@ -2,7 +2,7 @@ import { v4 as uuid } from 'uuid'; import config from '@/config'; import { WebhookRepository } from '@db/repositories/webhook.repository'; import { CacheService } from '@/services/cache/cache.service'; -import { WebhookService } from '@/services/webhook.service'; +import { WebhookService } from '@/webhooks/webhook.service'; import { WebhookEntity } from '@db/entities/WebhookEntity'; import { mockInstance } from '@test/mocking'; diff --git a/packages/cli/src/services/test-webhook-registrations.service.ts b/packages/cli/src/webhooks/test-webhook-registrations.service.ts similarity index 97% rename from packages/cli/src/services/test-webhook-registrations.service.ts rename to packages/cli/src/webhooks/test-webhook-registrations.service.ts index e2abae5605f98..94e7e7d826dbf 100644 --- a/packages/cli/src/services/test-webhook-registrations.service.ts +++ b/packages/cli/src/webhooks/test-webhook-registrations.service.ts @@ -3,7 +3,7 @@ import { CacheService } from '@/services/cache/cache.service'; import type { IWebhookData } from 'n8n-workflow'; import type { IWorkflowDb } from '@/Interfaces'; import { TEST_WEBHOOK_TIMEOUT, TEST_WEBHOOK_TIMEOUT_BUFFER } from '@/constants'; -import { OrchestrationService } from './orchestration.service'; +import { OrchestrationService } from '@/services/orchestration.service'; export type TestWebhookRegistration = { pushRef?: string; diff --git a/packages/cli/src/services/webhook.service.ts b/packages/cli/src/webhooks/webhook.service.ts similarity index 100% rename from packages/cli/src/services/webhook.service.ts rename to packages/cli/src/webhooks/webhook.service.ts diff --git a/packages/cli/src/webhooks/webhook.types.ts b/packages/cli/src/webhooks/webhook.types.ts new file mode 100644 index 0000000000000..7602ed1871bb0 --- /dev/null +++ b/packages/cli/src/webhooks/webhook.types.ts @@ -0,0 +1,37 @@ +import type { Request, Response } from 'express'; +import type { IDataObject, IHttpRequestMethods } from 'n8n-workflow'; + +export type WebhookCORSRequest = Request & { method: 'OPTIONS' }; + +export type WebhookRequest = Request<{ path: string }> & { + method: IHttpRequestMethods; + params: Record; +}; + +export type WaitingWebhookRequest = WebhookRequest & { + params: WebhookRequest['path'] & { suffix?: string }; +}; + +export interface WebhookAccessControlOptions { + allowedOrigins?: string; +} + +export interface IWebhookManager { + /** Gets all request methods associated with a webhook path*/ + getWebhookMethods?: (path: string) => Promise; + + /** Find the CORS options matching a path and method */ + findAccessControlOptions?: ( + path: string, + httpMethod: IHttpRequestMethods, + ) => Promise; + + executeWebhook(req: WebhookRequest, res: Response): Promise; +} + +export interface IWebhookResponseCallbackData { + data?: IDataObject | IDataObject[]; + headers?: object; + noWebhookResponse?: boolean; + responseCode?: number; +} diff --git a/packages/cli/src/workflows/workflowExecution.service.ts b/packages/cli/src/workflows/workflowExecution.service.ts index 8ebe7aed0c142..9ddce37d2f207 100644 --- a/packages/cli/src/workflows/workflowExecution.service.ts +++ b/packages/cli/src/workflows/workflowExecution.service.ts @@ -30,7 +30,7 @@ import type { import { NodeTypes } from '@/NodeTypes'; import { WorkflowRunner } from '@/WorkflowRunner'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; -import { TestWebhooks } from '@/TestWebhooks'; +import { TestWebhooks } from '@/webhooks/TestWebhooks'; import { Logger } from '@/Logger'; import type { Project } from '@/databases/entities/Project'; import { GlobalConfig } from '@n8n/config'; diff --git a/packages/cli/src/workflows/workflowSharing.service.ts b/packages/cli/src/workflows/workflowSharing.service.ts index add5bb31b8fc9..111e50b58103f 100644 --- a/packages/cli/src/workflows/workflowSharing.service.ts +++ b/packages/cli/src/workflows/workflowSharing.service.ts @@ -8,12 +8,14 @@ import { RoleService } from '@/services/role.service'; import type { Scope } from '@n8n/permissions'; import type { ProjectRole } from '@/databases/entities/ProjectRelation'; import type { WorkflowSharingRole } from '@/databases/entities/SharedWorkflow'; +import { ProjectRelationRepository } from '@/databases/repositories/projectRelation.repository'; @Service() export class WorkflowSharingService { constructor( private readonly sharedWorkflowRepository: SharedWorkflowRepository, private readonly roleService: RoleService, + private readonly projectRelationRepository: ProjectRelationRepository, ) {} /** @@ -64,4 +66,28 @@ export class WorkflowSharingService { return sharedWorkflows.map(({ workflowId }) => workflowId); } + + async getSharedWorkflowScopes( + workflowIds: string[], + user: User, + ): Promise> { + const projectRelations = await this.projectRelationRepository.findAllByUser(user.id); + const sharedWorkflows = + await this.sharedWorkflowRepository.getRelationsByWorkflowIdsAndProjectIds( + workflowIds, + projectRelations.map((p) => p.projectId), + ); + + return workflowIds.map((workflowId) => { + return [ + workflowId, + this.roleService.combineResourceScopes( + 'workflow', + user, + sharedWorkflows.filter((s) => s.workflowId === workflowId), + projectRelations, + ), + ]; + }); + } } diff --git a/packages/cli/test/integration/active-workflow-manager.test.ts b/packages/cli/test/integration/active-workflow-manager.test.ts index 03ce58e7ef6bc..de586b643a468 100644 --- a/packages/cli/test/integration/active-workflow-manager.test.ts +++ b/packages/cli/test/integration/active-workflow-manager.test.ts @@ -8,8 +8,8 @@ import { ActiveWorkflowManager } from '@/ActiveWorkflowManager'; import { ExternalHooks } from '@/ExternalHooks'; import { Push } from '@/push'; import { SecretsHelper } from '@/SecretsHelpers'; -import { WebhookService } from '@/services/webhook.service'; -import * as WebhookHelpers from '@/WebhookHelpers'; +import { WebhookService } from '@/webhooks/webhook.service'; +import * as WebhookHelpers from '@/webhooks/WebhookHelpers'; import * as AdditionalData from '@/WorkflowExecuteAdditionalData'; import type { WebhookEntity } from '@db/entities/WebhookEntity'; import { NodeTypes } from '@/NodeTypes'; diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 54c15d381de95..205a60307c643 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -1,5 +1,4 @@ import { BinaryDataService } from 'n8n-core'; -import { mock } from 'jest-mock-extended'; import { Worker } from '@/commands/worker'; import config from '@/config'; @@ -11,7 +10,7 @@ import { OrchestrationHandlerWorkerService } from '@/services/orchestration/work import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { License } from '@/License'; import { ExternalHooks } from '@/ExternalHooks'; -import { type JobQueue, Queue } from '@/Queue'; +import { ScalingService } from '@/scaling/scaling.service'; import { setupTestCommand } from '@test-integration/utils/testCommand'; import { mockInstance } from '../../shared/mocking'; @@ -28,12 +27,10 @@ const license = mockInstance(License); const messageEventBus = mockInstance(MessageEventBus); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const orchestrationHandlerWorkerService = mockInstance(OrchestrationHandlerWorkerService); -const queue = mockInstance(Queue); +const scalingService = mockInstance(ScalingService); const orchestrationWorkerService = mockInstance(OrchestrationWorkerService); const command = setupTestCommand(Worker); -queue.getBullObjectInstance.mockReturnValue(mock({ on: jest.fn() })); - test('worker initializes all its components', async () => { const worker = await command.run(); @@ -45,9 +42,9 @@ test('worker initializes all its components', async () => { expect(externalHooks.init).toHaveBeenCalledTimes(1); expect(externalSecretsManager.init).toHaveBeenCalledTimes(1); expect(messageEventBus.initialize).toHaveBeenCalledTimes(1); + expect(scalingService.setupQueue).toHaveBeenCalledTimes(1); + expect(scalingService.setupWorker).toHaveBeenCalledTimes(1); expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); - expect(queue.init).toHaveBeenCalledTimes(1); - expect(queue.process).toHaveBeenCalledTimes(1); expect(orchestrationWorkerService.init).toHaveBeenCalledTimes(1); expect(orchestrationHandlerWorkerService.initWithOptions).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1); diff --git a/packages/cli/test/integration/execution.service.integration.test.ts b/packages/cli/test/integration/execution.service.integration.test.ts index e30d55602af95..6f29950bf48d4 100644 --- a/packages/cli/test/integration/execution.service.integration.test.ts +++ b/packages/cli/test/integration/execution.service.integration.test.ts @@ -30,6 +30,7 @@ describe('ExecutionService', () => { mock(), mock(), mock(), + mock(), ); }); diff --git a/packages/cli/test/integration/executions.controller.test.ts b/packages/cli/test/integration/executions.controller.test.ts index 1e6d65a4f8368..94faa32351764 100644 --- a/packages/cli/test/integration/executions.controller.test.ts +++ b/packages/cli/test/integration/executions.controller.test.ts @@ -48,6 +48,16 @@ describe('GET /executions', () => { const response2 = await testServer.authAgentFor(member).get('/executions').expect(200); expect(response2.body.data.count).toBe(1); }); + + test('should return a scopes array for each execution', async () => { + testServer.license.enable('feat:sharing'); + const workflow = await createWorkflow({}, owner); + await shareWorkflowWithUsers(workflow, [member]); + await createSuccessfulExecution(workflow); + + const response = await testServer.authAgentFor(member).get('/executions').expect(200); + expect(response.body.data.results[0].scopes).toContain('workflow:execute'); + }); }); describe('GET /executions/:id', () => { diff --git a/packages/cli/test/integration/webhooks.test.ts b/packages/cli/test/integration/webhooks.test.ts index 9bd1977ed53d2..280edb370d286 100644 --- a/packages/cli/test/integration/webhooks.test.ts +++ b/packages/cli/test/integration/webhooks.test.ts @@ -3,13 +3,13 @@ import { agent as testAgent } from 'supertest'; import { mock } from 'jest-mock-extended'; import { AbstractServer } from '@/AbstractServer'; -import { ActiveWebhooks } from '@/ActiveWebhooks'; +import { ActiveWebhooks } from '@/webhooks/ActiveWebhooks'; import { ExternalHooks } from '@/ExternalHooks'; import { InternalHooks } from '@/InternalHooks'; -import { TestWebhooks } from '@/TestWebhooks'; -import { WaitingWebhooks } from '@/WaitingWebhooks'; +import { TestWebhooks } from '@/webhooks/TestWebhooks'; +import { WaitingWebhooks } from '@/webhooks/WaitingWebhooks'; import { WaitingForms } from '@/WaitingForms'; -import type { IResponseCallbackData } from '@/Interfaces'; +import type { IWebhookResponseCallbackData } from '@/webhooks/webhook.types'; import { mockInstance } from '@test/mocking'; import { GlobalConfig } from '@n8n/config'; @@ -80,7 +80,7 @@ describe('WebhookServer', () => { } const mockResponse = (data = {}, headers = {}, status = 200) => { - const response = mock(); + const response = mock(); response.responseCode = status; response.data = data; response.headers = headers; diff --git a/packages/core/README.md b/packages/core/README.md index 0518567fc9bce..ad67d1a3798d6 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -10,8 +10,4 @@ npm install n8n-core ## License -n8n is [fair-code](https://faircode.io) distributed under the [**Sustainable Use License**](https://github.com/n8n-io/n8n/blob/master/packages/cli/LICENSE.md). - -Proprietary licenses are available for enterprise customers. [Get in touch](mailto:license@n8n.io) - -Additional information about the license can be found in the [docs](https://docs.n8n.io/reference/license/). +You can find the license information [here](https://github.com/n8n-io/n8n/blob/master/README.md#license) diff --git a/packages/core/package.json b/packages/core/package.json index dfcc23db99d92..1f857b47e5ea5 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "n8n-core", - "version": "1.53.0", + "version": "1.54.0", "description": "Core functionality of n8n", "main": "dist/index", "types": "dist/index.d.ts", diff --git a/packages/design-system/README.md b/packages/design-system/README.md index ea69f24524a5b..c43317d077334 100644 --- a/packages/design-system/README.md +++ b/packages/design-system/README.md @@ -48,8 +48,4 @@ pnpm watch:theme ## License -n8n is [fair-code](https://faircode.io) distributed under the [**Sustainable Use License**](https://github.com/n8n-io/n8n/blob/master/packages/cli/LICENSE.md). - -Proprietary licenses are available for enterprise customers. [Get in touch](mailto:license@n8n.io) - -Additional information about the license can be found in the [docs](https://docs.n8n.io/reference/license/). +You can find the license information [here](https://github.com/n8n-io/n8n/blob/master/README.md#license) diff --git a/packages/design-system/package.json b/packages/design-system/package.json index 30db649157559..433356ecf0b90 100644 --- a/packages/design-system/package.json +++ b/packages/design-system/package.json @@ -1,6 +1,6 @@ { "name": "n8n-design-system", - "version": "1.43.0", + "version": "1.44.0", "main": "src/main.ts", "import": "src/main.ts", "scripts": { diff --git a/packages/design-system/src/components/N8nAvatar/Avatar.vue b/packages/design-system/src/components/N8nAvatar/Avatar.vue index 352ffb6e76ca8..05e701860f260 100644 --- a/packages/design-system/src/components/N8nAvatar/Avatar.vue +++ b/packages/design-system/src/components/N8nAvatar/Avatar.vue @@ -1,9 +1,9 @@