From e83cb1e74e4eba70c62de2e3f043ada2da172b60 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Thu, 1 Feb 2024 08:12:05 +0000 Subject: [PATCH 1/9] feat: Add new log streaming ai node types --- .../EventMessageClasses/EventMessageAiNode.ts | 53 +++++++++++++++++++ .../src/eventbus/EventMessageClasses/index.ts | 4 ++ packages/workflow/src/MessageEventBus.ts | 1 + 3 files changed, 58 insertions(+) create mode 100644 packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts diff --git a/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts new file mode 100644 index 0000000000000..238b8a9a7c7f8 --- /dev/null +++ b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts @@ -0,0 +1,53 @@ +import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage'; +import type { JsonObject } from 'n8n-workflow'; +import { EventMessageTypeNames } from 'n8n-workflow'; +import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions'; +import type { AbstractEventPayload } from './AbstractEventPayload'; +import type { EventNamesAiNodesType } from '.'; + +// -------------------------------------- +// EventMessage class for Node events +// -------------------------------------- +export interface EventPayloadAiNode extends AbstractEventPayload { + msg?: string; + executionId: string; + nodeName: string; + workflowId?: string; + workflowName: string; + nodeType?: string; +} + +export interface EventMessageAiNodeOptions extends AbstractEventMessageOptions { + eventName: EventNamesAiNodesType; + + payload?: EventPayloadAiNode | undefined; +} + +export class EventMessageAiNode extends AbstractEventMessage { + readonly __type = EventMessageTypeNames.aiNode; + + eventName: EventNamesAiNodesType; + + payload: EventPayloadAiNode; + + constructor(options: EventMessageAiNodeOptions) { + super(options); + if (options.payload) this.setPayload(options.payload); + if (options.anonymize) { + this.anonymize(); + } + } + + setPayload(payload: EventPayloadAiNode): this { + this.payload = payload; + return this; + } + + deserialize(data: JsonObject): this { + if (isEventMessageOptionsWithType(data, this.__type)) { + this.setOptionsOrDefault(data); + if (data.payload) this.setPayload(data.payload as EventPayloadAiNode); + } + return this; + } +} diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index c6a0f85bd99ff..3041fd3e19c7a 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -34,17 +34,20 @@ export const eventNamesAudit = [ 'n8n.audit.workflow.deleted', 'n8n.audit.workflow.updated', ] as const; +export const eventNamesAiNodes = ['n8n.ai.nodeSuppliedData', 'n8n.ai.vectorStorePopulated']; export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number]; export type EventNamesAuditType = (typeof eventNamesAudit)[number]; export type EventNamesNodeType = (typeof eventNamesNode)[number]; export type EventNamesGenericType = (typeof eventNamesGeneric)[number]; +export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number]; export type EventNamesTypes = | EventNamesAuditType | EventNamesWorkflowType | EventNamesNodeType | EventNamesGenericType + | EventNamesAiNodesType | 'n8n.destination.test'; export const eventNamesAll = [ @@ -52,6 +55,7 @@ export const eventNamesAll = [ ...eventNamesWorkflow, ...eventNamesNode, ...eventNamesGeneric, + ...eventNamesAiNodes, ]; export type EventMessageTypes = diff --git a/packages/workflow/src/MessageEventBus.ts b/packages/workflow/src/MessageEventBus.ts index 2da8c7a20d826..97ca4116b43e1 100644 --- a/packages/workflow/src/MessageEventBus.ts +++ b/packages/workflow/src/MessageEventBus.ts @@ -11,6 +11,7 @@ export const enum EventMessageTypeNames { confirm = '$$EventMessageConfirm', workflow = '$$EventMessageWorkflow', node = '$$EventMessageNode', + aiNode = '$$EventMessageAiNode', } export const enum MessageEventBusDestinationTypeNames { From 6238e0a30a86e8ceb9dd6775e9860dd8fb57fefc Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Thu, 1 Feb 2024 09:24:43 +0000 Subject: [PATCH 2/9] feat: Adjust event names in the UI --- packages/cli/src/eventbus/EventMessageClasses/index.ts | 2 +- packages/editor-ui/src/plugins/i18n/locales/en.json | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index 3041fd3e19c7a..f1e61b9191327 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -34,7 +34,7 @@ export const eventNamesAudit = [ 'n8n.audit.workflow.deleted', 'n8n.audit.workflow.updated', ] as const; -export const eventNamesAiNodes = ['n8n.ai.nodeSuppliedData', 'n8n.ai.vectorStorePopulated']; +export const eventNamesAiNodes = ['n8n.ai.node.supplied.data', 'n8n.ai.vector.store.populated']; export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number]; export type EventNamesAuditType = (typeof eventNamesAudit)[number]; diff --git a/packages/editor-ui/src/plugins/i18n/locales/en.json b/packages/editor-ui/src/plugins/i18n/locales/en.json index 5e6423e3e972c..4f032096ecff0 100644 --- a/packages/editor-ui/src/plugins/i18n/locales/en.json +++ b/packages/editor-ui/src/plugins/i18n/locales/en.json @@ -1517,6 +1517,7 @@ "settings.log-streaming.tab.events.title": "Select groups or single events to subscribe to:", "settings.log-streaming.tab.events.anonymize": "Anonymize sensitive data", "settings.log-streaming.tab.events.anonymize.info": "Fields containing personal information like name or email are anonymized", + "settings.log-streaming.eventGroup.n8n.ai": "AI node logs", "settings.log-streaming.eventGroup.n8n.audit": "Audit Events", "settings.log-streaming.eventGroup.n8n.audit.info": "Will send events when user details or other audit data changes", "settings.log-streaming.eventGroup.n8n.workflow": "Workflow Events", From ff1ca740b77c0f3e1d907f1e252be51b7bb6eb88 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Thu, 1 Feb 2024 09:32:09 +0000 Subject: [PATCH 3/9] feat: Add sendAiNodeEvent method --- packages/cli/src/eventbus/EventMessageClasses/index.ts | 4 +++- .../cli/src/eventbus/MessageEventBus/MessageEventBus.ts | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index f1e61b9191327..e26e7467c4d82 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -1,3 +1,4 @@ +import type { EventMessageAiNode } from './EventMessageAiNode'; import type { EventMessageAudit } from './EventMessageAudit'; import type { EventMessageGeneric } from './EventMessageGeneric'; import type { EventMessageNode } from './EventMessageNode'; @@ -62,7 +63,8 @@ export type EventMessageTypes = | EventMessageGeneric | EventMessageWorkflow | EventMessageAudit - | EventMessageNode; + | EventMessageNode + | EventMessageAiNode; export interface FailedEventSummary { lastNodeExecuted: string; diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index e6784327c4fe6..47987195d0457 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -37,6 +37,10 @@ import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; import { ExecutionDataRecoveryService } from '../executionDataRecovery.service'; +import { + EventMessageAiNode, + EventMessageAiNodeOptions, +} from '../EventMessageClasses/EventMessageAiNode'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -457,4 +461,8 @@ export class MessageEventBus extends EventEmitter { async sendNodeEvent(options: EventMessageNodeOptions) { await this.send(new EventMessageNode(options)); } + + async sendAiNodeEvent(options: EventMessageAiNodeOptions) { + await this.send(new EventMessageAiNode(options)); + } } From 56fac8aab98f9321654dbd553820e9794b1adef6 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Thu, 1 Feb 2024 11:51:13 +0000 Subject: [PATCH 4/9] feat: Add API for nodes to send AI logs --- .../cli/src/WorkflowExecuteAdditionalData.ts | 17 +++++++++++++++++ .../EventMessageClasses/EventMessageAiNode.ts | 3 +-- .../src/eventbus/EventMessageClasses/index.ts | 3 +-- packages/workflow/src/Interfaces.ts | 14 ++++++++++++++ 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index f9bbb83c8b593..983f1214a6554 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -68,6 +68,7 @@ import { WorkflowStaticDataService } from './workflows/workflowStaticData.servic import { WorkflowRepository } from './databases/repositories/workflow.repository'; import { UrlService } from './services/url.service'; import { WorkflowExecutionService } from './workflows/workflowExecution.service'; +import { MessageEventBus } from './eventbus'; const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); @@ -986,6 +987,22 @@ export async function getBase( setExecutionStatus, variables, secretsHelpers: Container.get(SecretsHelper), + logAiEvent: async ( + eventName: string, + payload: { + msg?: string | undefined; + executionId: string; + nodeName: string; + workflowId?: string | undefined; + workflowName: string; + nodeType?: string | undefined; + }, + ) => { + return await Container.get(MessageEventBus).sendAiNodeEvent({ + eventName, + ...payload, + }); + }, }; } diff --git a/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts index 238b8a9a7c7f8..44d9feafbde14 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts @@ -1,9 +1,8 @@ import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage'; -import type { JsonObject } from 'n8n-workflow'; +import type { EventNamesAiNodesType, JsonObject } from 'n8n-workflow'; import { EventMessageTypeNames } from 'n8n-workflow'; import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions'; import type { AbstractEventPayload } from './AbstractEventPayload'; -import type { EventNamesAiNodesType } from '.'; // -------------------------------------- // EventMessage class for Node events diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index e26e7467c4d82..51ab91fb01a10 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -3,6 +3,7 @@ import type { EventMessageAudit } from './EventMessageAudit'; import type { EventMessageGeneric } from './EventMessageGeneric'; import type { EventMessageNode } from './EventMessageNode'; import type { EventMessageWorkflow } from './EventMessageWorkflow'; +import { eventNamesAiNodes, type EventNamesAiNodesType } from 'n8n-workflow'; export const eventNamesWorkflow = [ 'n8n.workflow.started', @@ -35,13 +36,11 @@ export const eventNamesAudit = [ 'n8n.audit.workflow.deleted', 'n8n.audit.workflow.updated', ] as const; -export const eventNamesAiNodes = ['n8n.ai.node.supplied.data', 'n8n.ai.vector.store.populated']; export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number]; export type EventNamesAuditType = (typeof eventNamesAudit)[number]; export type EventNamesNodeType = (typeof eventNamesNode)[number]; export type EventNamesGenericType = (typeof eventNamesGeneric)[number]; -export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number]; export type EventNamesTypes = | EventNamesAuditType diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 7dcc8da2432fa..1926406151b3b 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -1939,6 +1939,9 @@ export interface IWorkflowExecuteHooks { sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise>; } +export const eventNamesAiNodes = ['n8n.ai.node.supplied.data', 'n8n.ai.vector.store.populated']; +export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number]; + export interface IWorkflowExecuteAdditionalData { credentialsHelper: ICredentialsHelper; executeWorkflow: ( @@ -1972,6 +1975,17 @@ export interface IWorkflowExecuteAdditionalData { userId: string; variables: IDataObject; secretsHelpers: SecretsHelpersBase; + logAiEvent: ( + eventName: EventNamesAiNodesType, + payload: { + msg?: string; + executionId: string; + nodeName: string; + workflowId?: string; + workflowName: string; + nodeType?: string; + }, + ) => Promise; } export type WorkflowExecuteMode = From 3d1ccc6785ed3f620a8ce0062a6973b7e18ffa7b Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Thu, 1 Feb 2024 12:08:30 +0000 Subject: [PATCH 5/9] feat: Add helper to nodes to log ai events --- packages/core/src/NodeExecuteFunctions.ts | 21 +++++++++++++++++++++ packages/workflow/src/Interfaces.ts | 1 + 2 files changed, 22 insertions(+) diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 5a54e6c81122d..4c81423067436 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -40,6 +40,7 @@ import type { CloseFunction, ConnectionTypes, ContextType, + EventNamesAiNodesType, FieldType, FileSystemHelperFunctions, FunctionsBase, @@ -3597,6 +3598,16 @@ export function getExecuteFunctions( constructExecutionMetaData, }, nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id), + logAiEvent: async (eventName: EventNamesAiNodesType, msg: string) => { + return await additionalData.logAiEvent(eventName, { + executionId: additionalData.executionId ?? 'unsaved-execution', + nodeName: node.name, + workflowName: workflow.name ?? 'Unnamed workflow', + nodeType: node.type, + workflowId: workflow.id ?? 'unsaved-workflow', + msg, + }); + }, }; })(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions; } @@ -3737,6 +3748,16 @@ export function getExecuteSingleFunctions( getBinaryDataBuffer: async (propertyName, inputIndex = 0) => await getBinaryDataBuffer(inputData, itemIndex, propertyName, inputIndex), }, + logAiEvent: async (eventName: EventNamesAiNodesType, msg: string) => { + return await additionalData.logAiEvent(eventName, { + executionId: additionalData.executionId ?? 'unsaved-execution', + nodeName: node.name, + workflowName: workflow.name ?? 'Unnamed workflow', + nodeType: node.type, + workflowId: workflow.id ?? 'unsaved-workflow', + msg, + }); + }, }; })(workflow, runExecutionData, connectionInputData, inputData, node, itemIndex); } diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 1926406151b3b..29013eca0af5c 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -787,6 +787,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & { getInputSourceData(inputIndex?: number, inputName?: string): ISourceData; getExecutionCancelSignal(): AbortSignal | undefined; onExecutionCancellation(handler: () => unknown): void; + logAiEvent(eventName: string, msg?: string | undefined): Promise; }; // TODO: Create later own type only for Config-Nodes From f41b2ed9c1095ab20d175a3fde7cba66b3822061 Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Wed, 7 Feb 2024 15:30:02 +0100 Subject: [PATCH 6/9] feat: Log AI related events --- .../shared/createVectorStoreNode.ts | 5 +- .../@n8n/nodes-langchain/utils/logWrapper.ts | 63 +++++++++++++++---- .../cli/src/WorkflowExecuteAdditionalData.ts | 5 +- packages/workflow/src/Interfaces.ts | 19 +++++- 4 files changed, 76 insertions(+), 16 deletions(-) diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts index 277254204dc41..b061afa348158 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts @@ -1,7 +1,7 @@ /* eslint-disable n8n-nodes-base/node-filename-against-convention */ /* eslint-disable n8n-nodes-base/node-dirname-against-convention */ import type { VectorStore } from 'langchain/vectorstores/base'; -import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow'; import type { INodeCredentialDescription, INodeProperties, @@ -237,6 +237,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => }); resultData.push(...serializedDocs); + void this.logAiEvent('n8n.ai.vector.store.searched', jsonStringify({ query: prompt })); } return await this.prepareOutputData(resultData); @@ -262,6 +263,8 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => try { await args.populateVectorStore(this, embeddings, processedDocuments, itemIndex); + + void this.logAiEvent('n8n.ai.vector.store.populated'); } catch (error) { throw error; } diff --git a/packages/@n8n/nodes-langchain/utils/logWrapper.ts b/packages/@n8n/nodes-langchain/utils/logWrapper.ts index 52a24ef6ac327..5ab8b46467a80 100644 --- a/packages/@n8n/nodes-langchain/utils/logWrapper.ts +++ b/packages/@n8n/nodes-langchain/utils/logWrapper.ts @@ -4,6 +4,7 @@ import { type IExecuteFunctions, type INodeExecutionData, NodeConnectionType, + jsonStringify, } from 'n8n-workflow'; import { Tool } from 'langchain/tools'; @@ -197,17 +198,20 @@ export function logWrapper( arguments: [], })) as BaseMessage[]; - executeFunctions.addOutputData(connectionType, index, [ - [{ json: { action: 'getMessages', response } }], - ]); + const payload = { action: 'getMessages', response }; + executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]); + + void executeFunctions.logAiEvent( + 'n8n.ai.memory.get.messages', + jsonStringify({ response }), + ); return response; }; } else if (prop === 'addMessage' && 'addMessage' in target) { return async (message: BaseMessage): Promise => { connectionType = NodeConnectionType.AiMemory; - const { index } = executeFunctions.addInputData(connectionType, [ - [{ json: { action: 'addMessage', message } }], - ]); + const payload = { action: 'addMessage', message }; + const { index } = executeFunctions.addInputData(connectionType, [[{ json: payload }]]); await callMethodAsync.call(target, { executeFunctions, @@ -217,9 +221,11 @@ export function logWrapper( arguments: [message], }); - executeFunctions.addOutputData(connectionType, index, [ - [{ json: { action: 'addMessage' } }], - ]); + void executeFunctions.logAiEvent( + 'n8n.ai.memory.added.message', + jsonStringify({ message }), + ); + executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]); }; } } @@ -236,7 +242,6 @@ export function logWrapper( const { index } = executeFunctions.addInputData(connectionType, [ [{ json: { messages, options } }], ]); - try { const response = (await callMethodAsync.call(target, { executeFunctions, @@ -249,6 +254,18 @@ export function logWrapper( runManager, ], })) as ChatResult; + + void executeFunctions.logAiEvent( + 'n8n.ai.llm.generated', + jsonStringify({ + messages: + typeof messages === 'string' + ? messages + : messages.map((message) => message.toJSON()), + options, + response, + }), + ); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; } catch (error) { @@ -281,6 +298,10 @@ export function logWrapper( executeFunctions.addOutputData(connectionType, index, [ [{ json: { action: 'getFormatInstructions', response } }], ]); + void executeFunctions.logAiEvent( + 'n8n.ai.output.parser.get.instructions', + jsonStringify({ response }), + ); return response; }; } else if (prop === 'parse' && 'parse' in target) { @@ -299,6 +320,10 @@ export function logWrapper( arguments: [stringifiedText], })) as object; + void executeFunctions.logAiEvent( + 'n8n.ai.output.parser.parsed', + jsonStringify({ text, response }), + ); executeFunctions.addOutputData(connectionType, index, [ [{ json: { action: 'parse', response } }], ]); @@ -327,6 +352,10 @@ export function logWrapper( arguments: [query, config], })) as Array>>; + void executeFunctions.logAiEvent( + 'n8n.ai.retriever.get.relevant.documents', + jsonStringify({ query }), + ); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -351,6 +380,7 @@ export function logWrapper( arguments: [documents], })) as number[][]; + void executeFunctions.logAiEvent('n8n.ai.embeddings.embedded.document'); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -370,7 +400,7 @@ export function logWrapper( method: target[prop], arguments: [query], })) as number[]; - + void executeFunctions.logAiEvent('n8n.ai.embeddings.embedded.query'); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -400,6 +430,7 @@ export function logWrapper( return response; }; } + // Process Each if (prop === 'processItem' && 'processItem' in target) { return async (item: INodeExecutionData, itemIndex: number): Promise => { @@ -414,6 +445,7 @@ export function logWrapper( arguments: [item, itemIndex], })) as number[]; + void executeFunctions.logAiEvent('n8n.ai.document.processed'); executeFunctions.addOutputData(connectionType, index, [ [{ json: { response }, pairedItem: { item: itemIndex } }], ]); @@ -439,6 +471,7 @@ export function logWrapper( arguments: [text], })) as string[]; + void executeFunctions.logAiEvent('n8n.ai.text.splitter.split'); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -462,6 +495,10 @@ export function logWrapper( arguments: [query], })) as string; + void executeFunctions.logAiEvent( + 'n8n.ai.tool.called', + jsonStringify({ query, response }), + ); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -491,6 +528,10 @@ export function logWrapper( arguments: [query, k, filter, _callbacks], })) as Array>>; + void executeFunctions.logAiEvent( + 'n8n.ai.vector.store.searched', + jsonStringify({ query }), + ); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 983f1214a6554..ab55fdcb216fe 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -23,6 +23,7 @@ import type { WorkflowExecuteMode, ExecutionStatus, ExecutionError, + EventNamesAiNodesType, } from 'n8n-workflow'; import { ApplicationError, @@ -988,7 +989,7 @@ export async function getBase( variables, secretsHelpers: Container.get(SecretsHelper), logAiEvent: async ( - eventName: string, + eventName: EventNamesAiNodesType, payload: { msg?: string | undefined; executionId: string; @@ -1000,7 +1001,7 @@ export async function getBase( ) => { return await Container.get(MessageEventBus).sendAiNodeEvent({ eventName, - ...payload, + payload, }); }, }; diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 29013eca0af5c..51755b32a0f89 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -787,7 +787,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & { getInputSourceData(inputIndex?: number, inputName?: string): ISourceData; getExecutionCancelSignal(): AbortSignal | undefined; onExecutionCancellation(handler: () => unknown): void; - logAiEvent(eventName: string, msg?: string | undefined): Promise; + logAiEvent(eventName: EventNamesAiNodesType, msg?: string | undefined): Promise; }; // TODO: Create later own type only for Config-Nodes @@ -1940,7 +1940,22 @@ export interface IWorkflowExecuteHooks { sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise>; } -export const eventNamesAiNodes = ['n8n.ai.node.supplied.data', 'n8n.ai.vector.store.populated']; +export const eventNamesAiNodes = [ + 'n8n.ai.memory.get.messages', + 'n8n.ai.memory.added.message', + 'n8n.ai.output.parser.get.instructions', + 'n8n.ai.output.parser.parsed', + 'n8n.ai.retriever.get.relevant.documents', + 'n8n.ai.embeddings.embedded.document', + 'n8n.ai.embeddings.embedded.query', + 'n8n.ai.document.processed', + 'n8n.ai.text.splitter.split', + 'n8n.ai.tool.called', + 'n8n.ai.vector.store.searched', + 'n8n.ai.llm.generated', + 'n8n.ai.vector.store.populated', +] as const; + export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number]; export interface IWorkflowExecuteAdditionalData { From aa4ad813379cc70994f9691587d53685a622009a Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Thu, 8 Feb 2024 18:08:12 +0000 Subject: [PATCH 7/9] test: Add tests for new ai event method --- .../unit/ExecutionMetadataService.test.ts | 34 +++++++++++ .../WorkflowExecuteAdditionalData.test.ts | 57 +++++++++++-------- 2 files changed, 66 insertions(+), 25 deletions(-) create mode 100644 packages/cli/test/unit/ExecutionMetadataService.test.ts diff --git a/packages/cli/test/unit/ExecutionMetadataService.test.ts b/packages/cli/test/unit/ExecutionMetadataService.test.ts new file mode 100644 index 0000000000000..48b754f0ff78a --- /dev/null +++ b/packages/cli/test/unit/ExecutionMetadataService.test.ts @@ -0,0 +1,34 @@ +import { Container } from 'typedi'; +import { ExecutionMetadataRepository } from '@db/repositories/executionMetadata.repository'; +import { ExecutionMetadataService } from '@/services/executionMetadata.service'; +import { mockInstance } from '../shared/mocking'; + +describe('ExecutionMetadataService', () => { + const repository = mockInstance(ExecutionMetadataRepository); + + test('Execution metadata is saved in a batch', async () => { + const toSave = { + test1: 'value1', + test2: 'value2', + }; + const executionId = '1234'; + + await Container.get(ExecutionMetadataService).save(executionId, toSave); + + expect(repository.save).toHaveBeenCalledTimes(1); + expect(repository.save.mock.calls[0]).toEqual([ + [ + { + execution: { id: executionId }, + key: 'test1', + value: 'value1', + }, + { + execution: { id: executionId }, + key: 'test2', + value: 'value2', + }, + ], + ]); + }); +}); diff --git a/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts b/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts index c36c43455e129..8c257bcac7506 100644 --- a/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts +++ b/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts @@ -1,34 +1,41 @@ -import { Container } from 'typedi'; -import { ExecutionMetadataRepository } from '@db/repositories/executionMetadata.repository'; -import { ExecutionMetadataService } from '@/services/executionMetadata.service'; +import { VariablesService } from '@/environments/variables/variables.service.ee'; import { mockInstance } from '../shared/mocking'; +import { MessageEventBus } from '@/eventbus'; +import { getBase } from '@/WorkflowExecuteAdditionalData'; +import Container from 'typedi'; +import { CredentialsHelper } from '@/CredentialsHelper'; +import { SecretsHelper } from '@/SecretsHelpers'; describe('WorkflowExecuteAdditionalData', () => { - const repository = mockInstance(ExecutionMetadataRepository); + const messageEventBus = mockInstance(MessageEventBus); + const variablesService = mockInstance(VariablesService); + variablesService.getAllCached.mockResolvedValue([]); + const credentialsHelper = mockInstance(CredentialsHelper); + const secretsHelper = mockInstance(SecretsHelper); + Container.set(MessageEventBus, messageEventBus); + Container.set(VariablesService, variablesService); + Container.set(CredentialsHelper, credentialsHelper); + Container.set(SecretsHelper, secretsHelper); - test('Execution metadata is saved in a batch', async () => { - const toSave = { - test1: 'value1', - test2: 'value2', + test('logAiEvent should call MessageEventBus', async () => { + const additionalData = getBase('user-id'); + + const eventName = 'n8n.ai.memory.get.messages'; + const payload = { + msg: 'test message', + executionId: '123', + nodeName: 'n8n-memory', + workflowId: 'workflow-id', + workflowName: 'workflow-name', + nodeType: 'n8n-memory', }; - const executionId = '1234'; - await Container.get(ExecutionMetadataService).save(executionId, toSave); + (await additionalData).logAiEvent(eventName, payload); - expect(repository.save).toHaveBeenCalledTimes(1); - expect(repository.save.mock.calls[0]).toEqual([ - [ - { - execution: { id: executionId }, - key: 'test1', - value: 'value1', - }, - { - execution: { id: executionId }, - key: 'test2', - value: 'value2', - }, - ], - ]); + expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledTimes(1); + expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName, + payload, + }); }); }); From cf17bf240e968f471909734b9c3080d8aa8a4c09 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Fri, 9 Feb 2024 08:34:53 +0000 Subject: [PATCH 8/9] fix: Adjust import --- packages/cli/src/WorkflowExecuteAdditionalData.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index ddbb420ec9998..58bab9f70bbe5 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -69,7 +69,7 @@ import { WorkflowStaticDataService } from './workflows/workflowStaticData.servic import { WorkflowRepository } from './databases/repositories/workflow.repository'; import { UrlService } from './services/url.service'; import { WorkflowExecutionService } from './workflows/workflowExecution.service'; -import { MessageEventBus } from './eventbus'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); From 31bc98b71c1e639c8a6a0ae909b3127c78b2af49 Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Fri, 9 Feb 2024 09:36:05 +0100 Subject: [PATCH 9/9] Fix linting and import issues --- .../cli/src/eventbus/MessageEventBus/MessageEventBus.ts | 2 +- .../cli/test/unit/WorkflowExecuteAdditionalData.test.ts | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index cd1f76b10b134..1b2fe14026e1e 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -39,7 +39,7 @@ import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; import { ExecutionDataRecoveryService } from '../executionDataRecovery.service'; import { EventMessageAiNode, - EventMessageAiNodeOptions, + type EventMessageAiNodeOptions, } from '../EventMessageClasses/EventMessageAiNode'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; diff --git a/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts b/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts index 8c257bcac7506..2984220637aae 100644 --- a/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts +++ b/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts @@ -1,6 +1,6 @@ import { VariablesService } from '@/environments/variables/variables.service.ee'; import { mockInstance } from '../shared/mocking'; -import { MessageEventBus } from '@/eventbus'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { getBase } from '@/WorkflowExecuteAdditionalData'; import Container from 'typedi'; import { CredentialsHelper } from '@/CredentialsHelper'; @@ -18,7 +18,7 @@ describe('WorkflowExecuteAdditionalData', () => { Container.set(SecretsHelper, secretsHelper); test('logAiEvent should call MessageEventBus', async () => { - const additionalData = getBase('user-id'); + const additionalData = await getBase('user-id'); const eventName = 'n8n.ai.memory.get.messages'; const payload = { @@ -30,7 +30,7 @@ describe('WorkflowExecuteAdditionalData', () => { nodeType: 'n8n-memory', }; - (await additionalData).logAiEvent(eventName, payload); + await additionalData.logAiEvent(eventName, payload); expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledTimes(1); expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledWith({