Skip to content

Commit

Permalink
feat: Log AI related events
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegIvaniv committed Feb 7, 2024
1 parent 3d1ccc6 commit f41b2ed
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
/* eslint-disable n8n-nodes-base/node-dirname-against-convention */
import type { VectorStore } from 'langchain/vectorstores/base';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow';
import type {
INodeCredentialDescription,
INodeProperties,
Expand Down Expand Up @@ -237,6 +237,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
});

resultData.push(...serializedDocs);
void this.logAiEvent('n8n.ai.vector.store.searched', jsonStringify({ query: prompt }));
}

return await this.prepareOutputData(resultData);
Expand All @@ -262,6 +263,8 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>

try {
await args.populateVectorStore(this, embeddings, processedDocuments, itemIndex);

void this.logAiEvent('n8n.ai.vector.store.populated');
} catch (error) {
throw error;
}
Expand Down
63 changes: 52 additions & 11 deletions packages/@n8n/nodes-langchain/utils/logWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
type IExecuteFunctions,
type INodeExecutionData,
NodeConnectionType,
jsonStringify,
} from 'n8n-workflow';

import { Tool } from 'langchain/tools';
Expand Down Expand Up @@ -197,17 +198,20 @@ export function logWrapper(
arguments: [],
})) as BaseMessage[];

executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'getMessages', response } }],
]);
const payload = { action: 'getMessages', response };
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);

void executeFunctions.logAiEvent(
'n8n.ai.memory.get.messages',
jsonStringify({ response }),
);
return response;
};
} else if (prop === 'addMessage' && 'addMessage' in target) {
return async (message: BaseMessage): Promise<void> => {
connectionType = NodeConnectionType.AiMemory;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { action: 'addMessage', message } }],
]);
const payload = { action: 'addMessage', message };
const { index } = executeFunctions.addInputData(connectionType, [[{ json: payload }]]);

await callMethodAsync.call(target, {
executeFunctions,
Expand All @@ -217,9 +221,11 @@ export function logWrapper(
arguments: [message],
});

executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'addMessage' } }],
]);
void executeFunctions.logAiEvent(
'n8n.ai.memory.added.message',
jsonStringify({ message }),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);
};
}
}
Expand All @@ -236,7 +242,6 @@ export function logWrapper(
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { messages, options } }],
]);

try {
const response = (await callMethodAsync.call(target, {
executeFunctions,
Expand All @@ -249,6 +254,18 @@ export function logWrapper(
runManager,
],
})) as ChatResult;

void executeFunctions.logAiEvent(
'n8n.ai.llm.generated',
jsonStringify({
messages:
typeof messages === 'string'
? messages
: messages.map((message) => message.toJSON()),
options,
response,
}),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
} catch (error) {
Expand Down Expand Up @@ -281,6 +298,10 @@ export function logWrapper(
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'getFormatInstructions', response } }],
]);
void executeFunctions.logAiEvent(
'n8n.ai.output.parser.get.instructions',
jsonStringify({ response }),
);
return response;
};
} else if (prop === 'parse' && 'parse' in target) {
Expand All @@ -299,6 +320,10 @@ export function logWrapper(
arguments: [stringifiedText],
})) as object;

void executeFunctions.logAiEvent(
'n8n.ai.output.parser.parsed',
jsonStringify({ text, response }),
);
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'parse', response } }],
]);
Expand Down Expand Up @@ -327,6 +352,10 @@ export function logWrapper(
arguments: [query, config],
})) as Array<Document<Record<string, any>>>;

void executeFunctions.logAiEvent(
'n8n.ai.retriever.get.relevant.documents',
jsonStringify({ query }),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand All @@ -351,6 +380,7 @@ export function logWrapper(
arguments: [documents],
})) as number[][];

void executeFunctions.logAiEvent('n8n.ai.embeddings.embedded.document');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand All @@ -370,7 +400,7 @@ export function logWrapper(
method: target[prop],
arguments: [query],
})) as number[];

void executeFunctions.logAiEvent('n8n.ai.embeddings.embedded.query');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand Down Expand Up @@ -400,6 +430,7 @@ export function logWrapper(
return response;
};
}

// Process Each
if (prop === 'processItem' && 'processItem' in target) {
return async (item: INodeExecutionData, itemIndex: number): Promise<number[]> => {
Expand All @@ -414,6 +445,7 @@ export function logWrapper(
arguments: [item, itemIndex],
})) as number[];

void executeFunctions.logAiEvent('n8n.ai.document.processed');
executeFunctions.addOutputData(connectionType, index, [
[{ json: { response }, pairedItem: { item: itemIndex } }],
]);
Expand All @@ -439,6 +471,7 @@ export function logWrapper(
arguments: [text],
})) as string[];

void executeFunctions.logAiEvent('n8n.ai.text.splitter.split');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand All @@ -462,6 +495,10 @@ export function logWrapper(
arguments: [query],
})) as string;

void executeFunctions.logAiEvent(
'n8n.ai.tool.called',
jsonStringify({ query, response }),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand Down Expand Up @@ -491,6 +528,10 @@ export function logWrapper(
arguments: [query, k, filter, _callbacks],
})) as Array<Document<Record<string, any>>>;

void executeFunctions.logAiEvent(
'n8n.ai.vector.store.searched',
jsonStringify({ query }),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);

return response;
Expand Down
5 changes: 3 additions & 2 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import type {
WorkflowExecuteMode,
ExecutionStatus,
ExecutionError,
EventNamesAiNodesType,
} from 'n8n-workflow';
import {
ApplicationError,
Expand Down Expand Up @@ -988,7 +989,7 @@ export async function getBase(
variables,
secretsHelpers: Container.get(SecretsHelper),
logAiEvent: async (
eventName: string,
eventName: EventNamesAiNodesType,
payload: {
msg?: string | undefined;
executionId: string;
Expand All @@ -1000,7 +1001,7 @@ export async function getBase(
) => {
return await Container.get(MessageEventBus).sendAiNodeEvent({
eventName,
...payload,
payload,
});
},
};
Expand Down
19 changes: 17 additions & 2 deletions packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
getInputSourceData(inputIndex?: number, inputName?: string): ISourceData;
getExecutionCancelSignal(): AbortSignal | undefined;
onExecutionCancellation(handler: () => unknown): void;
logAiEvent(eventName: string, msg?: string | undefined): Promise<void>;
logAiEvent(eventName: EventNamesAiNodesType, msg?: string | undefined): Promise<void>;
};

// TODO: Create later own type only for Config-Nodes
Expand Down Expand Up @@ -1940,7 +1940,22 @@ export interface IWorkflowExecuteHooks {
sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise<void>>;
}

export const eventNamesAiNodes = ['n8n.ai.node.supplied.data', 'n8n.ai.vector.store.populated'];
export const eventNamesAiNodes = [
'n8n.ai.memory.get.messages',
'n8n.ai.memory.added.message',
'n8n.ai.output.parser.get.instructions',
'n8n.ai.output.parser.parsed',
'n8n.ai.retriever.get.relevant.documents',
'n8n.ai.embeddings.embedded.document',
'n8n.ai.embeddings.embedded.query',
'n8n.ai.document.processed',
'n8n.ai.text.splitter.split',
'n8n.ai.tool.called',
'n8n.ai.vector.store.searched',
'n8n.ai.llm.generated',
'n8n.ai.vector.store.populated',
] as const;

export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number];

export interface IWorkflowExecuteAdditionalData {
Expand Down

0 comments on commit f41b2ed

Please sign in to comment.