Skip to content

Commit

Permalink
refactor(core): Include AI events in log streaming relay (#10768)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Sep 12, 2024
1 parent 8240b2a commit c133a6e
Show file tree
Hide file tree
Showing 15 changed files with 496 additions and 92 deletions.
4 changes: 2 additions & 2 deletions packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export class N8nLlmTracing extends BaseCallbackHandler {
this.executionFunctions.addOutputData(this.connectionType, runDetails.index, [
[{ json: { ...response } }],
]);
void logAiEvent(this.executionFunctions, 'n8n.ai.llm.generated', {
void logAiEvent(this.executionFunctions, 'ai-llm-generated-output', {
messages: parsedMessages,
options: runDetails.options,
response,
Expand Down Expand Up @@ -186,7 +186,7 @@ export class N8nLlmTracing extends BaseCallbackHandler {
});
}

void logAiEvent(this.executionFunctions, 'n8n.ai.llm.error', {
void logAiEvent(this.executionFunctions, 'ai-llm-errored', {
error: Object.keys(error).length === 0 ? error.toString() : error,
runId,
parentRunId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
});

resultData.push(...serializedDocs);
void logAiEvent(this, 'n8n.ai.vector.store.searched', { query: prompt });
void logAiEvent(this, 'ai-vector-store-searched', { query: prompt });
}

return [resultData];
Expand All @@ -307,7 +307,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
try {
await args.populateVectorStore(this, embeddings, processedDocuments, itemIndex);

void logAiEvent(this, 'n8n.ai.vector.store.populated');
void logAiEvent(this, 'ai-vector-store-populated');
} catch (error) {
throw error;
}
Expand Down Expand Up @@ -361,7 +361,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
ids: [documentId],
});

void logAiEvent(this, 'n8n.ai.vector.store.updated');
void logAiEvent(this, 'ai-vector-store-updated');
} catch (error) {
throw error;
}
Expand Down
9 changes: 2 additions & 7 deletions packages/@n8n/nodes-langchain/utils/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow';
import type {
EventNamesAiNodesType,
IDataObject,
IExecuteFunctions,
IWebhookFunctions,
} from 'n8n-workflow';
import type { AiEvent, IDataObject, IExecuteFunctions, IWebhookFunctions } from 'n8n-workflow';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { BaseOutputParser } from '@langchain/core/output_parsers';
import type { BaseMessage } from '@langchain/core/messages';
Expand Down Expand Up @@ -155,7 +150,7 @@ export function getSessionId(

export async function logAiEvent(
executeFunctions: IExecuteFunctions,
event: EventNamesAiNodesType,
event: AiEvent,
data?: IDataObject,
) {
try {
Expand Down
22 changes: 11 additions & 11 deletions packages/@n8n/nodes-langchain/utils/logWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ export function logWrapper(
const payload = { action: 'getMessages', response };
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);

void logAiEvent(executeFunctions, 'n8n.ai.memory.get.messages', { response });
void logAiEvent(executeFunctions, 'ai-messages-retrieved-from-memory', { response });
return response;
};
} else if (prop === 'addMessage' && 'addMessage' in target) {
Expand All @@ -213,7 +213,7 @@ export function logWrapper(
arguments: [message],
});

void logAiEvent(executeFunctions, 'n8n.ai.memory.added.message', { message });
void logAiEvent(executeFunctions, 'ai-message-added-to-memory', { message });
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);
};
}
Expand All @@ -238,13 +238,13 @@ export function logWrapper(
arguments: [stringifiedText],
})) as object;

void logAiEvent(executeFunctions, 'n8n.ai.output.parser.parsed', { text, response });
void logAiEvent(executeFunctions, 'ai-output-parsed', { text, response });
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'parse', response } }],
]);
return response;
} catch (error) {
void logAiEvent(executeFunctions, 'n8n.ai.output.parser.parsed', {
void logAiEvent(executeFunctions, 'ai-output-parsed', {
text,
response: error.message ?? error,
});
Expand Down Expand Up @@ -277,7 +277,7 @@ export function logWrapper(
arguments: [query, config],
})) as Array<Document<Record<string, any>>>;

void logAiEvent(executeFunctions, 'n8n.ai.retriever.get.relevant.documents', { query });
void logAiEvent(executeFunctions, 'ai-documents-retrieved', { query });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand All @@ -302,7 +302,7 @@ export function logWrapper(
arguments: [documents],
})) as number[][];

void logAiEvent(executeFunctions, 'n8n.ai.embeddings.embedded.document');
void logAiEvent(executeFunctions, 'ai-document-embedded');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand All @@ -322,7 +322,7 @@ export function logWrapper(
method: target[prop],
arguments: [query],
})) as number[];
void logAiEvent(executeFunctions, 'n8n.ai.embeddings.embedded.query');
void logAiEvent(executeFunctions, 'ai-query-embedded');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand Down Expand Up @@ -367,7 +367,7 @@ export function logWrapper(
arguments: [item, itemIndex],
})) as number[];

void logAiEvent(executeFunctions, 'n8n.ai.document.processed');
void logAiEvent(executeFunctions, 'ai-document-processed');
executeFunctions.addOutputData(connectionType, index, [
[{ json: { response }, pairedItem: { item: itemIndex } }],
]);
Expand All @@ -393,7 +393,7 @@ export function logWrapper(
arguments: [text],
})) as string[];

void logAiEvent(executeFunctions, 'n8n.ai.text.splitter.split');
void logAiEvent(executeFunctions, 'ai-text-split');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand All @@ -417,7 +417,7 @@ export function logWrapper(
arguments: [query],
})) as string;

void logAiEvent(executeFunctions, 'n8n.ai.tool.called', { query, response });
void logAiEvent(executeFunctions, 'ai-tool-called', { query, response });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand Down Expand Up @@ -447,7 +447,7 @@ export function logWrapper(
arguments: [query, k, filter, _callbacks],
})) as Array<Document<Record<string, any>>>;

void logAiEvent(executeFunctions, 'n8n.ai.vector.store.searched', { query });
void logAiEvent(executeFunctions, 'ai-vector-store-searched', { query });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);

return response;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
import { VariablesService } from '@/environments/variables/variables.service.ee';
import { mockInstance } from '@test/mocking';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { getBase } from '@/workflow-execute-additional-data';
import Container from 'typedi';
import { CredentialsHelper } from '@/credentials-helper';
import { SecretsHelper } from '@/secrets-helpers';
import { EventService } from '@/events/event.service';

describe('WorkflowExecuteAdditionalData', () => {
const messageEventBus = mockInstance(MessageEventBus);
const variablesService = mockInstance(VariablesService);
variablesService.getAllCached.mockResolvedValue([]);
const credentialsHelper = mockInstance(CredentialsHelper);
const secretsHelper = mockInstance(SecretsHelper);
Container.set(MessageEventBus, messageEventBus);
const eventService = mockInstance(EventService);
Container.set(VariablesService, variablesService);
Container.set(CredentialsHelper, credentialsHelper);
Container.set(SecretsHelper, secretsHelper);

test('logAiEvent should call MessageEventBus', async () => {
const additionalData = await getBase('user-id');

const eventName = 'n8n.ai.memory.get.messages';
const eventName = 'ai-messages-retrieved-from-memory';
const payload = {
msg: 'test message',
executionId: '123',
Expand All @@ -30,12 +29,9 @@ describe('WorkflowExecuteAdditionalData', () => {
nodeType: 'n8n-memory',
};

await additionalData.logAiEvent(eventName, payload);
additionalData.logAiEvent(eventName, payload);

expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledTimes(1);
expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName,
payload,
});
expect(eventService.emit).toHaveBeenCalledTimes(1);
expect(eventService.emit).toHaveBeenCalledWith(eventName, payload);
});
});
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AbstractEventMessage, isEventMessageOptionsWithType } from './abstract-event-message';
import type { EventNamesAiNodesType, JsonObject } from 'n8n-workflow';
import type { JsonObject } from 'n8n-workflow';
import type { EventNamesAiNodesType } from '.';
import { EventMessageTypeNames } from 'n8n-workflow';
import type { AbstractEventMessageOptions } from './abstract-event-message-options';
import type { AbstractEventPayload } from './abstract-event-payload';
Expand Down
20 changes: 19 additions & 1 deletion packages/cli/src/eventbus/event-message-classes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,25 @@ import type { EventMessageExecution } from './event-message-execution';
import type { EventMessageGeneric } from './event-message-generic';
import type { EventMessageNode } from './event-message-node';
import type { EventMessageWorkflow } from './event-message-workflow';
import { eventNamesAiNodes, type EventNamesAiNodesType } from 'n8n-workflow';

export const eventNamesAiNodes = [
'n8n.ai.memory.get.messages',
'n8n.ai.memory.added.message',
'n8n.ai.output.parser.parsed',
'n8n.ai.retriever.get.relevant.documents',
'n8n.ai.embeddings.embedded.document',
'n8n.ai.embeddings.embedded.query',
'n8n.ai.document.processed',
'n8n.ai.text.splitter.split',
'n8n.ai.tool.called',
'n8n.ai.vector.store.searched',
'n8n.ai.llm.generated',
'n8n.ai.llm.error',
'n8n.ai.vector.store.populated',
'n8n.ai.vector.store.updated',
] as const;

export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number];

export const eventNamesWorkflow = [
'n8n.workflow.started',
Expand Down
Loading

0 comments on commit c133a6e

Please sign in to comment.