From 4291aab979bfc8810199376400747c685dc25023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 11 Sep 2024 09:27:26 +0200 Subject: [PATCH] refactor(core): Include AI events in log streaming relay This PR brings the AI events in line with all other log streaming events in the relay, breaks a dependency cycle in `WorkflowExecuteAdditionalData`, stops awaiting on log streaming during the AI nodes execution, and adds tests for all AI log streaming events. Notes: - Removed `n8n.ai.output.parser.get.instructions` as it was unused. - `cli` does not import `workflow` so I had to duplicate two small types. Once we have a common types package this should no longer be an issue. Follow-up to: #8526 --- .../nodes/llms/N8nLlmTracing.ts | 4 +- .../shared/createVectorStoreNode.ts | 6 +- .../@n8n/nodes-langchain/utils/helpers.ts | 9 +- .../@n8n/nodes-langchain/utils/logWrapper.ts | 22 +- .../workflow-execute-additional-data.test.ts | 16 +- .../event-message-ai-node.ts | 3 +- .../eventbus/event-message-classes/index.ts | 20 +- .../log-streaming-event-relay.test.ts | 241 ++++++++++++++++++ packages/cli/src/events/ai-event-map.ts | 38 +++ packages/cli/src/events/event.service.ts | 3 +- .../src/events/log-streaming-event-relay.ts | 116 +++++++++ packages/cli/src/events/relay-event-map.ts | 3 +- .../src/workflow-execute-additional-data.ts | 23 +- packages/core/src/NodeExecuteFunctions.ts | 10 +- packages/workflow/src/Interfaces.ts | 59 ++--- 15 files changed, 481 insertions(+), 92 deletions(-) create mode 100644 packages/cli/src/events/ai-event-map.ts diff --git a/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts b/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts index d217c53e7b870..0502f096b903e 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts @@ -132,7 +132,7 @@ export class N8nLlmTracing extends BaseCallbackHandler { this.executionFunctions.addOutputData(this.connectionType, this.lastInput.index, [ [{ json: { ...response } }], ]); - void logAiEvent(this.executionFunctions, 'n8n.ai.llm.generated', { + void logAiEvent(this.executionFunctions, 'ai-llm-generated-output', { messages: parsedMessages, options: this.lastInput.options, response, @@ -184,7 +184,7 @@ export class N8nLlmTracing extends BaseCallbackHandler { }); } - void logAiEvent(this.executionFunctions, 'n8n.ai.llm.error', { + void logAiEvent(this.executionFunctions, 'ai-llm-errored', { error: Object.keys(error).length === 0 ? error.toString() : error, runId, parentRunId, diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts index 489c17c976ac4..10ea879bddf52 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts @@ -280,7 +280,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => }); resultData.push(...serializedDocs); - void logAiEvent(this, 'n8n.ai.vector.store.searched', { query: prompt }); + void logAiEvent(this, 'ai-vector-store-searched', { query: prompt }); } return [resultData]; @@ -307,7 +307,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => try { await args.populateVectorStore(this, embeddings, processedDocuments, itemIndex); - void logAiEvent(this, 'n8n.ai.vector.store.populated'); + void logAiEvent(this, 'ai-vector-store-populated'); } catch (error) { throw error; } @@ -361,7 +361,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => ids: [documentId], }); - void logAiEvent(this, 'n8n.ai.vector.store.updated'); + void logAiEvent(this, 'ai-vector-store-updated'); } catch (error) { throw error; } diff --git a/packages/@n8n/nodes-langchain/utils/helpers.ts b/packages/@n8n/nodes-langchain/utils/helpers.ts index bdac2048b2d9c..c70c8a899103d 100644 --- a/packages/@n8n/nodes-langchain/utils/helpers.ts +++ b/packages/@n8n/nodes-langchain/utils/helpers.ts @@ -1,10 +1,5 @@ import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow'; -import type { - EventNamesAiNodesType, - IDataObject, - IExecuteFunctions, - IWebhookFunctions, -} from 'n8n-workflow'; +import type { AiEvent, IDataObject, IExecuteFunctions, IWebhookFunctions } from 'n8n-workflow'; import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import type { BaseOutputParser } from '@langchain/core/output_parsers'; import type { BaseMessage } from '@langchain/core/messages'; @@ -155,7 +150,7 @@ export function getSessionId( export async function logAiEvent( executeFunctions: IExecuteFunctions, - event: EventNamesAiNodesType, + event: AiEvent, data?: IDataObject, ) { try { diff --git a/packages/@n8n/nodes-langchain/utils/logWrapper.ts b/packages/@n8n/nodes-langchain/utils/logWrapper.ts index 252d6b9890857..8707726183ad2 100644 --- a/packages/@n8n/nodes-langchain/utils/logWrapper.ts +++ b/packages/@n8n/nodes-langchain/utils/logWrapper.ts @@ -196,7 +196,7 @@ export function logWrapper( const payload = { action: 'getMessages', response }; executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]); - void logAiEvent(executeFunctions, 'n8n.ai.memory.get.messages', { response }); + void logAiEvent(executeFunctions, 'ai-messages-retrieved-from-memory', { response }); return response; }; } else if (prop === 'addMessage' && 'addMessage' in target) { @@ -213,7 +213,7 @@ export function logWrapper( arguments: [message], }); - void logAiEvent(executeFunctions, 'n8n.ai.memory.added.message', { message }); + void logAiEvent(executeFunctions, 'ai-message-added-to-memory', { message }); executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]); }; } @@ -238,13 +238,13 @@ export function logWrapper( arguments: [stringifiedText], })) as object; - void logAiEvent(executeFunctions, 'n8n.ai.output.parser.parsed', { text, response }); + void logAiEvent(executeFunctions, 'ai-output-parsed', { text, response }); executeFunctions.addOutputData(connectionType, index, [ [{ json: { action: 'parse', response } }], ]); return response; } catch (error) { - void logAiEvent(executeFunctions, 'n8n.ai.output.parser.parsed', { + void logAiEvent(executeFunctions, 'ai-output-parsed', { text, response: error.message ?? error, }); @@ -277,7 +277,7 @@ export function logWrapper( arguments: [query, config], })) as Array>>; - void logAiEvent(executeFunctions, 'n8n.ai.retriever.get.relevant.documents', { query }); + void logAiEvent(executeFunctions, 'ai-documents-retrieved', { query }); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -302,7 +302,7 @@ export function logWrapper( arguments: [documents], })) as number[][]; - void logAiEvent(executeFunctions, 'n8n.ai.embeddings.embedded.document'); + void logAiEvent(executeFunctions, 'ai-document-embedded'); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -322,7 +322,7 @@ export function logWrapper( method: target[prop], arguments: [query], })) as number[]; - void logAiEvent(executeFunctions, 'n8n.ai.embeddings.embedded.query'); + void logAiEvent(executeFunctions, 'ai-query-embedded'); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -367,7 +367,7 @@ export function logWrapper( arguments: [item, itemIndex], })) as number[]; - void logAiEvent(executeFunctions, 'n8n.ai.document.processed'); + void logAiEvent(executeFunctions, 'ai-document-processed'); executeFunctions.addOutputData(connectionType, index, [ [{ json: { response }, pairedItem: { item: itemIndex } }], ]); @@ -393,7 +393,7 @@ export function logWrapper( arguments: [text], })) as string[]; - void logAiEvent(executeFunctions, 'n8n.ai.text.splitter.split'); + void logAiEvent(executeFunctions, 'ai-text-split'); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -417,7 +417,7 @@ export function logWrapper( arguments: [query], })) as string; - void logAiEvent(executeFunctions, 'n8n.ai.tool.called', { query, response }); + void logAiEvent(executeFunctions, 'ai-tool-called', { query, response }); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -447,7 +447,7 @@ export function logWrapper( arguments: [query, k, filter, _callbacks], })) as Array>>; - void logAiEvent(executeFunctions, 'n8n.ai.vector.store.searched', { query }); + void logAiEvent(executeFunctions, 'ai-vector-store-searched', { query }); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; diff --git a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts index 5bd84bf88ff84..6fa8d8c0c0a88 100644 --- a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts +++ b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts @@ -1,18 +1,17 @@ import { VariablesService } from '@/environments/variables/variables.service.ee'; import { mockInstance } from '@test/mocking'; -import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { getBase } from '@/workflow-execute-additional-data'; import Container from 'typedi'; import { CredentialsHelper } from '@/credentials-helper'; import { SecretsHelper } from '@/secrets-helpers'; +import { EventService } from '@/events/event.service'; describe('WorkflowExecuteAdditionalData', () => { - const messageEventBus = mockInstance(MessageEventBus); const variablesService = mockInstance(VariablesService); variablesService.getAllCached.mockResolvedValue([]); const credentialsHelper = mockInstance(CredentialsHelper); const secretsHelper = mockInstance(SecretsHelper); - Container.set(MessageEventBus, messageEventBus); + const eventService = mockInstance(EventService); Container.set(VariablesService, variablesService); Container.set(CredentialsHelper, credentialsHelper); Container.set(SecretsHelper, secretsHelper); @@ -20,7 +19,7 @@ describe('WorkflowExecuteAdditionalData', () => { test('logAiEvent should call MessageEventBus', async () => { const additionalData = await getBase('user-id'); - const eventName = 'n8n.ai.memory.get.messages'; + const eventName = 'ai-messages-retrieved-from-memory'; const payload = { msg: 'test message', executionId: '123', @@ -30,12 +29,9 @@ describe('WorkflowExecuteAdditionalData', () => { nodeType: 'n8n-memory', }; - await additionalData.logAiEvent(eventName, payload); + additionalData.logAiEvent(eventName, payload); - expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledTimes(1); - expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledWith({ - eventName, - payload, - }); + expect(eventService.emit).toHaveBeenCalledTimes(1); + expect(eventService.emit).toHaveBeenCalledWith(eventName, payload); }); }); diff --git a/packages/cli/src/eventbus/event-message-classes/event-message-ai-node.ts b/packages/cli/src/eventbus/event-message-classes/event-message-ai-node.ts index b1ee96099373c..a59220fba9976 100644 --- a/packages/cli/src/eventbus/event-message-classes/event-message-ai-node.ts +++ b/packages/cli/src/eventbus/event-message-classes/event-message-ai-node.ts @@ -1,5 +1,6 @@ import { AbstractEventMessage, isEventMessageOptionsWithType } from './abstract-event-message'; -import type { EventNamesAiNodesType, JsonObject } from 'n8n-workflow'; +import type { JsonObject } from 'n8n-workflow'; +import type { EventNamesAiNodesType } from '.'; import { EventMessageTypeNames } from 'n8n-workflow'; import type { AbstractEventMessageOptions } from './abstract-event-message-options'; import type { AbstractEventPayload } from './abstract-event-payload'; diff --git a/packages/cli/src/eventbus/event-message-classes/index.ts b/packages/cli/src/eventbus/event-message-classes/index.ts index 8e0f367571819..d7444cf3ab1ac 100644 --- a/packages/cli/src/eventbus/event-message-classes/index.ts +++ b/packages/cli/src/eventbus/event-message-classes/index.ts @@ -4,7 +4,25 @@ import type { EventMessageExecution } from './event-message-execution'; import type { EventMessageGeneric } from './event-message-generic'; import type { EventMessageNode } from './event-message-node'; import type { EventMessageWorkflow } from './event-message-workflow'; -import { eventNamesAiNodes, type EventNamesAiNodesType } from 'n8n-workflow'; + +export const eventNamesAiNodes = [ + 'n8n.ai.memory.get.messages', + 'n8n.ai.memory.added.message', + 'n8n.ai.output.parser.parsed', + 'n8n.ai.retriever.get.relevant.documents', + 'n8n.ai.embeddings.embedded.document', + 'n8n.ai.embeddings.embedded.query', + 'n8n.ai.document.processed', + 'n8n.ai.text.splitter.split', + 'n8n.ai.tool.called', + 'n8n.ai.vector.store.searched', + 'n8n.ai.llm.generated', + 'n8n.ai.llm.error', + 'n8n.ai.vector.store.populated', + 'n8n.ai.vector.store.updated', +] as const; + +export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number]; export const eventNamesWorkflow = [ 'n8n.workflow.started', diff --git a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts index 99995f6d21d97..fab52149609c9 100644 --- a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts +++ b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts @@ -945,4 +945,245 @@ describe('LogStreamingEventRelay', () => { }); }); }); + + describe('AI events', () => { + it('should log on `ai-messages-retrieved-from-memory` event', () => { + const payload: RelayEventMap['ai-messages-retrieved-from-memory'] = { + msg: 'Hello, world!', + executionId: 'exec789', + nodeName: 'Memory', + workflowId: 'wf123', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.memory', + }; + + eventService.emit('ai-messages-retrieved-from-memory', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.memory.get.messages', + payload, + }); + }); + + it('should log on `ai-message-added-to-memory` event', () => { + const payload: RelayEventMap['ai-message-added-to-memory'] = { + executionId: 'exec456', + nodeName: 'Memory', + workflowId: 'wf789', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.memory', + }; + + eventService.emit('ai-message-added-to-memory', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.memory.added.message', + payload, + }); + }); + + it('should log on `ai-output-parsed` event', () => { + const payload: RelayEventMap['ai-output-parsed'] = { + executionId: 'exec123', + nodeName: 'Output Parser', + workflowId: 'wf456', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.outputParser', + }; + + eventService.emit('ai-output-parsed', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.output.parser.parsed', + payload, + }); + }); + + it('should log on `ai-documents-retrieved` event', () => { + const payload: RelayEventMap['ai-documents-retrieved'] = { + executionId: 'exec789', + nodeName: 'Retriever', + workflowId: 'wf123', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.retriever', + }; + + eventService.emit('ai-documents-retrieved', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.retriever.get.relevant.documents', + payload, + }); + }); + + it('should log on `ai-document-embedded` event', () => { + const payload: RelayEventMap['ai-document-embedded'] = { + executionId: 'exec456', + nodeName: 'Embeddings', + workflowId: 'wf789', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.embeddings', + }; + + eventService.emit('ai-document-embedded', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.embeddings.embedded.document', + payload, + }); + }); + + it('should log on `ai-query-embedded` event', () => { + const payload: RelayEventMap['ai-query-embedded'] = { + executionId: 'exec123', + nodeName: 'Embeddings', + workflowId: 'wf456', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.embeddings', + }; + + eventService.emit('ai-query-embedded', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.embeddings.embedded.query', + payload, + }); + }); + + it('should log on `ai-document-processed` event', () => { + const payload: RelayEventMap['ai-document-processed'] = { + executionId: 'exec789', + nodeName: 'Embeddings', + workflowId: 'wf789', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.embeddings', + }; + + eventService.emit('ai-document-processed', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.document.processed', + payload, + }); + }); + + it('should log on `ai-text-split` event', () => { + const payload: RelayEventMap['ai-text-split'] = { + executionId: 'exec456', + nodeName: 'Text Splitter', + workflowId: 'wf789', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.textSplitter', + }; + + eventService.emit('ai-text-split', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.text.splitter.split', + payload, + }); + }); + + it('should log on `ai-tool-called` event', () => { + const payload: RelayEventMap['ai-tool-called'] = { + executionId: 'exec123', + nodeName: 'Tool', + workflowId: 'wf456', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.tool', + }; + + eventService.emit('ai-tool-called', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.tool.called', + payload, + }); + }); + + it('should log on `ai-vector-store-searched` event', () => { + const payload: RelayEventMap['ai-vector-store-searched'] = { + executionId: 'exec789', + nodeName: 'Vector Store', + workflowId: 'wf123', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.vectorStore', + }; + + eventService.emit('ai-vector-store-searched', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.vector.store.searched', + payload, + }); + }); + + it('should log on `ai-llm-generated-output` event', () => { + const payload: RelayEventMap['ai-llm-generated-output'] = { + executionId: 'exec456', + nodeName: 'OpenAI', + workflowId: 'wf789', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.openai', + }; + + eventService.emit('ai-llm-generated-output', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.llm.generated', + payload, + }); + }); + + it('should log on `ai-llm-errored` event', () => { + const payload: RelayEventMap['ai-llm-errored'] = { + executionId: 'exec789', + nodeName: 'OpenAI', + workflowId: 'wf123', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.openai', + }; + + eventService.emit('ai-llm-errored', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.llm.error', + payload, + }); + }); + + it('should log on `ai-vector-store-populated` event', () => { + const payload: RelayEventMap['ai-vector-store-populated'] = { + executionId: 'exec456', + nodeName: 'Vector Store', + workflowId: 'wf789', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.vectorStore', + }; + + eventService.emit('ai-vector-store-populated', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.vector.store.populated', + payload, + }); + }); + + it('should log on `ai-vector-store-updated` event', () => { + const payload: RelayEventMap['ai-vector-store-updated'] = { + executionId: 'exec789', + nodeName: 'Vector Store', + workflowId: 'wf123', + workflowName: 'My Workflow', + nodeType: 'n8n-nodes-base.vectorStore', + }; + + eventService.emit('ai-vector-store-updated', payload); + + expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName: 'n8n.ai.vector.store.updated', + payload, + }); + }); + }); }); diff --git a/packages/cli/src/events/ai-event-map.ts b/packages/cli/src/events/ai-event-map.ts new file mode 100644 index 0000000000000..85170a0cd3e65 --- /dev/null +++ b/packages/cli/src/events/ai-event-map.ts @@ -0,0 +1,38 @@ +export type AiEventPayload = { + msg: string; + workflowName: string; + executionId: string; + nodeName: string; + workflowId?: string; + nodeType?: string; +}; + +export type AiEventMap = { + 'ai-messages-retrieved-from-memory': AiEventPayload; + + 'ai-message-added-to-memory': AiEventPayload; + + 'ai-output-parsed': AiEventPayload; + + 'ai-documents-retrieved': AiEventPayload; + + 'ai-document-embedded': AiEventPayload; + + 'ai-query-embedded': AiEventPayload; + + 'ai-document-processed': AiEventPayload; + + 'ai-text-split': AiEventPayload; + + 'ai-tool-called': AiEventPayload; + + 'ai-vector-store-searched': AiEventPayload; + + 'ai-llm-generated-output': AiEventPayload; + + 'ai-llm-errored': AiEventPayload; + + 'ai-vector-store-populated': AiEventPayload; + + 'ai-vector-store-updated': AiEventPayload; +}; diff --git a/packages/cli/src/events/event.service.ts b/packages/cli/src/events/event.service.ts index ad2ecdf12fe55..f709f7519a8fa 100644 --- a/packages/cli/src/events/event.service.ts +++ b/packages/cli/src/events/event.service.ts @@ -2,8 +2,9 @@ import { Service } from 'typedi'; import { TypedEmitter } from '@/typed-emitter'; import type { RelayEventMap } from './relay-event-map'; import type { QueueMetricsEventMap } from './queue-metrics-event-map'; +import type { AiEventMap } from './ai-event-map'; -type EventMap = RelayEventMap & QueueMetricsEventMap; +type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap; @Service() export class EventService extends TypedEmitter {} diff --git a/packages/cli/src/events/log-streaming-event-relay.ts b/packages/cli/src/events/log-streaming-event-relay.ts index db704128eace2..d0f0df5f6c21f 100644 --- a/packages/cli/src/events/log-streaming-event-relay.ts +++ b/packages/cli/src/events/log-streaming-event-relay.ts @@ -46,6 +46,20 @@ export class LogStreamingEventRelay extends EventRelay { 'community-package-deleted': (event) => this.communityPackageDeleted(event), 'execution-throttled': (event) => this.executionThrottled(event), 'execution-started-during-bootup': (event) => this.executionStartedDuringBootup(event), + 'ai-messages-retrieved-from-memory': (event) => this.aiMessagesRetrievedFromMemory(event), + 'ai-message-added-to-memory': (event) => this.aiMessageAddedToMemory(event), + 'ai-output-parsed': (event) => this.aiOutputParsed(event), + 'ai-documents-retrieved': (event) => this.aiDocumentsRetrieved(event), + 'ai-document-embedded': (event) => this.aiDocumentEmbedded(event), + 'ai-query-embedded': (event) => this.aiQueryEmbedded(event), + 'ai-document-processed': (event) => this.aiDocumentProcessed(event), + 'ai-text-split': (event) => this.aiTextSplitIntoChunks(event), + 'ai-tool-called': (event) => this.aiToolCalled(event), + 'ai-vector-store-searched': (event) => this.aiVectorStoreSearched(event), + 'ai-llm-generated-output': (event) => this.aiLlmGeneratedOutput(event), + 'ai-llm-errored': (event) => this.aiLlmErrored(event), + 'ai-vector-store-populated': (event) => this.aiVectorStorePopulated(event), + 'ai-vector-store-updated': (event) => this.aiVectorStoreUpdated(event), }); } @@ -387,4 +401,106 @@ export class LogStreamingEventRelay extends EventRelay { } // #endregion + + // #region AI + + private aiMessagesRetrievedFromMemory( + payload: RelayEventMap['ai-messages-retrieved-from-memory'], + ) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.memory.get.messages', + payload, + }); + } + + private aiMessageAddedToMemory(payload: RelayEventMap['ai-message-added-to-memory']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.memory.added.message', + payload, + }); + } + + private aiOutputParsed(payload: RelayEventMap['ai-output-parsed']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.output.parser.parsed', + payload, + }); + } + + private aiDocumentsRetrieved(payload: RelayEventMap['ai-documents-retrieved']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.retriever.get.relevant.documents', + payload, + }); + } + + private aiDocumentEmbedded(payload: RelayEventMap['ai-document-embedded']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.embeddings.embedded.document', + payload, + }); + } + + private aiQueryEmbedded(payload: RelayEventMap['ai-query-embedded']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.embeddings.embedded.query', + payload, + }); + } + + private aiDocumentProcessed(payload: RelayEventMap['ai-document-processed']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.document.processed', + payload, + }); + } + + private aiTextSplitIntoChunks(payload: RelayEventMap['ai-text-split']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.text.splitter.split', + payload, + }); + } + + private aiToolCalled(payload: RelayEventMap['ai-tool-called']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.tool.called', + payload, + }); + } + + private aiVectorStoreSearched(payload: RelayEventMap['ai-vector-store-searched']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.vector.store.searched', + payload, + }); + } + + private aiLlmGeneratedOutput(payload: RelayEventMap['ai-llm-generated-output']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.llm.generated', + payload, + }); + } + + private aiLlmErrored(payload: RelayEventMap['ai-llm-errored']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.llm.error', + payload, + }); + } + + private aiVectorStorePopulated(payload: RelayEventMap['ai-vector-store-populated']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.vector.store.populated', + payload, + }); + } + + private aiVectorStoreUpdated(payload: RelayEventMap['ai-vector-store-updated']) { + void this.eventBus.sendAiNodeEvent({ + eventName: 'n8n.ai.vector.store.updated', + payload, + }); + } } diff --git a/packages/cli/src/events/relay-event-map.ts b/packages/cli/src/events/relay-event-map.ts index 4e4f79c5a9934..b0a4cb5176c2b 100644 --- a/packages/cli/src/events/relay-event-map.ts +++ b/packages/cli/src/events/relay-event-map.ts @@ -8,6 +8,7 @@ import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/interfaces'; import type { ProjectRole } from '@/databases/entities/project-relation'; import type { GlobalRole } from '@/databases/entities/user'; import type { AuthProviderType } from '@/databases/entities/auth-identity'; +import type { AiEventMap } from './ai-event-map'; export type UserLike = { id: string; @@ -469,4 +470,4 @@ export type RelayEventMap = { }; // #endregion -}; +} & AiEventMap; diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index dfa079ffbed76..b70536a1d23c9 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -23,7 +23,6 @@ import type { WorkflowExecuteMode, ExecutionStatus, ExecutionError, - EventNamesAiNodesType, CallbackManager, } from 'n8n-workflow'; import { @@ -69,7 +68,7 @@ import { WorkflowStaticDataService } from './workflows/workflow-static-data.serv import { WorkflowRepository } from './databases/repositories/workflow.repository'; import { UrlService } from './services/url.service'; import { WorkflowExecutionService } from './workflows/workflow-execution.service'; -import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; +import type { AiEventMap, AiEventPayload } from './events/ai-event-map'; import { EventService } from './events/event.service'; import { GlobalConfig } from '@n8n/config'; import { SubworkflowPolicyChecker } from './subworkflows/subworkflow-policy-checker.service'; @@ -994,6 +993,8 @@ export async function getBase( const variables = await WorkflowHelpers.getVariables(); + const eventService = Container.get(EventService); + return { credentialsHelper: Container.get(CredentialsHelper), executeWorkflow, @@ -1009,22 +1010,8 @@ export async function getBase( setExecutionStatus, variables, secretsHelpers: Container.get(SecretsHelper), - logAiEvent: async ( - eventName: EventNamesAiNodesType, - payload: { - msg?: string | undefined; - executionId: string; - nodeName: string; - workflowId?: string | undefined; - workflowName: string; - nodeType?: string | undefined; - }, - ) => { - return await Container.get(MessageEventBus).sendAiNodeEvent({ - eventName, - payload, - }); - }, + logAiEvent: (eventName: keyof AiEventMap, payload: AiEventPayload) => + eventService.emit(eventName, payload), }; } diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 3f7be2a7050df..32262cf359c5e 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -39,7 +39,6 @@ import type { BinaryHelperFunctions, CloseFunction, ContextType, - EventNamesAiNodesType, FieldType, FileSystemHelperFunctions, FunctionsBase, @@ -102,6 +101,7 @@ import type { EnsureTypeOptions, SSHTunnelFunctions, SchedulingFunctions, + AiEvent, } from 'n8n-workflow'; import { NodeConnectionType, @@ -3888,8 +3888,8 @@ export function getExecuteFunctions( constructExecutionMetaData, }, nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id), - logAiEvent: async (eventName: EventNamesAiNodesType, msg: string) => { - return await additionalData.logAiEvent(eventName, { + logAiEvent: async (eventName: AiEvent, msg: string) => { + return additionalData.logAiEvent(eventName, { executionId: additionalData.executionId ?? 'unsaved-execution', nodeName: node.name, workflowName: workflow.name ?? 'Unnamed workflow', @@ -4039,8 +4039,8 @@ export function getExecuteSingleFunctions( getBinaryDataBuffer: async (propertyName, inputIndex = 0) => await getBinaryDataBuffer(inputData, itemIndex, propertyName, inputIndex), }, - logAiEvent: async (eventName: EventNamesAiNodesType, msg: string) => { - return await additionalData.logAiEvent(eventName, { + logAiEvent: async (eventName: AiEvent, msg: string) => { + return additionalData.logAiEvent(eventName, { executionId: additionalData.executionId ?? 'unsaved-execution', nodeName: node.name, workflowName: workflow.name ?? 'Unnamed workflow', diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index c8e3f307c9666..f36f11a639839 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -898,7 +898,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & { getInputSourceData(inputIndex?: number, inputName?: string): ISourceData; getExecutionCancelSignal(): AbortSignal | undefined; onExecutionCancellation(handler: () => unknown): void; - logAiEvent(eventName: EventNamesAiNodesType, msg?: string | undefined): Promise; + logAiEvent(eventName: AiEvent, msg?: string | undefined): Promise; }; // TODO: Create later own type only for Config-Nodes @@ -2147,26 +2147,6 @@ export interface IWorkflowExecuteHooks { sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise>; } -export const eventNamesAiNodes = [ - 'n8n.ai.memory.get.messages', - 'n8n.ai.memory.added.message', - 'n8n.ai.output.parser.get.instructions', - 'n8n.ai.output.parser.parsed', - 'n8n.ai.retriever.get.relevant.documents', - 'n8n.ai.embeddings.embedded.document', - 'n8n.ai.embeddings.embedded.query', - 'n8n.ai.document.processed', - 'n8n.ai.text.splitter.split', - 'n8n.ai.tool.called', - 'n8n.ai.vector.store.searched', - 'n8n.ai.llm.generated', - 'n8n.ai.llm.error', - 'n8n.ai.vector.store.populated', - 'n8n.ai.vector.store.updated', -] as const; - -export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number]; - export interface ExecuteWorkflowOptions { node?: INode; parentWorkflowId: string; @@ -2178,6 +2158,31 @@ export interface ExecuteWorkflowOptions { parentCallbackManager?: CallbackManager; } +export type AiEvent = + | 'ai-messages-retrieved-from-memory' + | 'ai-message-added-to-memory' + | 'ai-output-parsed' + | 'ai-documents-retrieved' + | 'ai-document-embedded' + | 'ai-query-embedded' + | 'ai-document-processed' + | 'ai-text-split' + | 'ai-tool-called' + | 'ai-vector-store-searched' + | 'ai-llm-generated-output' + | 'ai-llm-errored' + | 'ai-vector-store-populated' + | 'ai-vector-store-updated'; + +type AiEventPayload = { + msg: string; + workflowName: string; + executionId: string; + nodeName: string; + workflowId?: string; + nodeType?: string; +}; + export interface IWorkflowExecuteAdditionalData { credentialsHelper: ICredentialsHelper; executeWorkflow: ( @@ -2203,17 +2208,7 @@ export interface IWorkflowExecuteAdditionalData { userId?: string; variables: IDataObject; secretsHelpers: SecretsHelpersBase; - logAiEvent: ( - eventName: EventNamesAiNodesType, - payload: { - msg?: string; - executionId: string; - nodeName: string; - workflowId?: string; - workflowName: string; - nodeType?: string; - }, - ) => Promise; + logAiEvent: (eventName: AiEvent, payload: AiEventPayload) => void; parentCallbackManager?: CallbackManager; }