Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for AI log streaming #8526

Merged
merged 10 commits into from
Feb 9, 2024
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
/* eslint-disable n8n-nodes-base/node-dirname-against-convention */
import type { VectorStore } from 'langchain/vectorstores/base';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow';
import type {
INodeCredentialDescription,
INodeProperties,
Expand Down Expand Up @@ -237,6 +237,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
});

resultData.push(...serializedDocs);
void this.logAiEvent('n8n.ai.vector.store.searched', jsonStringify({ query: prompt }));
}

return await this.prepareOutputData(resultData);
Expand All @@ -262,6 +263,8 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>

try {
await args.populateVectorStore(this, embeddings, processedDocuments, itemIndex);

void this.logAiEvent('n8n.ai.vector.store.populated');
} catch (error) {
throw error;
}
Expand Down
63 changes: 52 additions & 11 deletions packages/@n8n/nodes-langchain/utils/logWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
type IExecuteFunctions,
type INodeExecutionData,
NodeConnectionType,
jsonStringify,
} from 'n8n-workflow';

import { Tool } from 'langchain/tools';
Expand Down Expand Up @@ -198,17 +199,20 @@ export function logWrapper(
arguments: [],
})) as BaseMessage[];

executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'getMessages', response } }],
]);
const payload = { action: 'getMessages', response };
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);

void executeFunctions.logAiEvent(
'n8n.ai.memory.get.messages',
jsonStringify({ response }),
);
return response;
};
} else if (prop === 'addMessage' && 'addMessage' in target) {
return async (message: BaseMessage): Promise<void> => {
connectionType = NodeConnectionType.AiMemory;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { action: 'addMessage', message } }],
]);
const payload = { action: 'addMessage', message };
const { index } = executeFunctions.addInputData(connectionType, [[{ json: payload }]]);

await callMethodAsync.call(target, {
executeFunctions,
Expand All @@ -218,9 +222,11 @@ export function logWrapper(
arguments: [message],
});

executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'addMessage' } }],
]);
void executeFunctions.logAiEvent(
'n8n.ai.memory.added.message',
jsonStringify({ message }),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);
};
}
}
Expand All @@ -237,7 +243,6 @@ export function logWrapper(
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { messages, options } }],
]);

try {
const response = (await callMethodAsync.call(target, {
executeFunctions,
Expand All @@ -250,6 +255,18 @@ export function logWrapper(
runManager,
],
})) as ChatResult;

void executeFunctions.logAiEvent(
'n8n.ai.llm.generated',
jsonStringify({
messages:
typeof messages === 'string'
? messages
: messages.map((message) => message.toJSON()),
options,
response,
}),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
} catch (error) {
Expand Down Expand Up @@ -282,6 +299,10 @@ export function logWrapper(
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'getFormatInstructions', response } }],
]);
void executeFunctions.logAiEvent(
'n8n.ai.output.parser.get.instructions',
jsonStringify({ response }),
);
return response;
};
} else if (prop === 'parse' && 'parse' in target) {
Expand All @@ -300,6 +321,10 @@ export function logWrapper(
arguments: [stringifiedText],
})) as object;

void executeFunctions.logAiEvent(
'n8n.ai.output.parser.parsed',
jsonStringify({ text, response }),
);
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'parse', response } }],
]);
Expand Down Expand Up @@ -328,6 +353,10 @@ export function logWrapper(
arguments: [query, config],
})) as Array<Document<Record<string, any>>>;

void executeFunctions.logAiEvent(
'n8n.ai.retriever.get.relevant.documents',
jsonStringify({ query }),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand All @@ -352,6 +381,7 @@ export function logWrapper(
arguments: [documents],
})) as number[][];

void executeFunctions.logAiEvent('n8n.ai.embeddings.embedded.document');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand All @@ -371,7 +401,7 @@ export function logWrapper(
method: target[prop],
arguments: [query],
})) as number[];

void executeFunctions.logAiEvent('n8n.ai.embeddings.embedded.query');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand Down Expand Up @@ -401,6 +431,7 @@ export function logWrapper(
return response;
};
}

// Process Each
if (prop === 'processItem' && 'processItem' in target) {
return async (item: INodeExecutionData, itemIndex: number): Promise<number[]> => {
Expand All @@ -415,6 +446,7 @@ export function logWrapper(
arguments: [item, itemIndex],
})) as number[];

void executeFunctions.logAiEvent('n8n.ai.document.processed');
executeFunctions.addOutputData(connectionType, index, [
[{ json: { response }, pairedItem: { item: itemIndex } }],
]);
Expand All @@ -440,6 +472,7 @@ export function logWrapper(
arguments: [text],
})) as string[];

void executeFunctions.logAiEvent('n8n.ai.text.splitter.split');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand All @@ -463,6 +496,10 @@ export function logWrapper(
arguments: [query],
})) as string;

void executeFunctions.logAiEvent(
'n8n.ai.tool.called',
jsonStringify({ query, response }),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
Expand Down Expand Up @@ -492,6 +529,10 @@ export function logWrapper(
arguments: [query, k, filter, _callbacks],
})) as Array<Document<Record<string, any>>>;

void executeFunctions.logAiEvent(
'n8n.ai.vector.store.searched',
jsonStringify({ query }),
);
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);

return response;
Expand Down
18 changes: 18 additions & 0 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import type {
WorkflowExecuteMode,
ExecutionStatus,
ExecutionError,
EventNamesAiNodesType,
} from 'n8n-workflow';
import {
ApplicationError,
Expand Down Expand Up @@ -68,6 +69,7 @@ import { WorkflowStaticDataService } from './workflows/workflowStaticData.servic
import { WorkflowRepository } from './databases/repositories/workflow.repository';
import { UrlService } from './services/url.service';
import { WorkflowExecutionService } from './workflows/workflowExecution.service';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';

const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');

Expand Down Expand Up @@ -982,6 +984,22 @@ export async function getBase(
setExecutionStatus,
variables,
secretsHelpers: Container.get(SecretsHelper),
logAiEvent: async (
eventName: EventNamesAiNodesType,
payload: {
msg?: string | undefined;
executionId: string;
nodeName: string;
workflowId?: string | undefined;
workflowName: string;
nodeType?: string | undefined;
},
) => {
return await Container.get(MessageEventBus).sendAiNodeEvent({
eventName,
payload,
});
},
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage';
import type { EventNamesAiNodesType, JsonObject } from 'n8n-workflow';
import { EventMessageTypeNames } from 'n8n-workflow';
import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions';
import type { AbstractEventPayload } from './AbstractEventPayload';

// --------------------------------------
// EventMessage class for Node events
// --------------------------------------
export interface EventPayloadAiNode extends AbstractEventPayload {
msg?: string;
executionId: string;
nodeName: string;
workflowId?: string;
workflowName: string;
nodeType?: string;
}

export interface EventMessageAiNodeOptions extends AbstractEventMessageOptions {
eventName: EventNamesAiNodesType;

payload?: EventPayloadAiNode | undefined;
}

export class EventMessageAiNode extends AbstractEventMessage {
readonly __type = EventMessageTypeNames.aiNode;

eventName: EventNamesAiNodesType;

payload: EventPayloadAiNode;

constructor(options: EventMessageAiNodeOptions) {
super(options);
if (options.payload) this.setPayload(options.payload);
if (options.anonymize) {
this.anonymize();
}
}

setPayload(payload: EventPayloadAiNode): this {
this.payload = payload;
return this;
}

deserialize(data: JsonObject): this {
if (isEventMessageOptionsWithType(data, this.__type)) {
this.setOptionsOrDefault(data);
if (data.payload) this.setPayload(data.payload as EventPayloadAiNode);
}
return this;
}
}
7 changes: 6 additions & 1 deletion packages/cli/src/eventbus/EventMessageClasses/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { EventMessageAiNode } from './EventMessageAiNode';
import type { EventMessageAudit } from './EventMessageAudit';
import type { EventMessageGeneric } from './EventMessageGeneric';
import type { EventMessageNode } from './EventMessageNode';
import type { EventMessageWorkflow } from './EventMessageWorkflow';
import { eventNamesAiNodes, type EventNamesAiNodesType } from 'n8n-workflow';

export const eventNamesWorkflow = [
'n8n.workflow.started',
Expand Down Expand Up @@ -45,20 +47,23 @@ export type EventNamesTypes =
| EventNamesWorkflowType
| EventNamesNodeType
| EventNamesGenericType
| EventNamesAiNodesType
| 'n8n.destination.test';

export const eventNamesAll = [
...eventNamesAudit,
...eventNamesWorkflow,
...eventNamesNode,
...eventNamesGeneric,
...eventNamesAiNodes,
];

export type EventMessageTypes =
| EventMessageGeneric
| EventMessageWorkflow
| EventMessageAudit
| EventMessageNode;
| EventMessageNode
| EventMessageAiNode;

export interface FailedEventSummary {
lastNodeExecuted: string;
Expand Down
8 changes: 8 additions & 0 deletions packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { ExecutionDataRecoveryService } from '../executionDataRecovery.service';
import {
EventMessageAiNode,
type EventMessageAiNodeOptions,
} from '../EventMessageClasses/EventMessageAiNode';

export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';

Expand Down Expand Up @@ -457,4 +461,8 @@ export class MessageEventBus extends EventEmitter {
async sendNodeEvent(options: EventMessageNodeOptions) {
await this.send(new EventMessageNode(options));
}

async sendAiNodeEvent(options: EventMessageAiNodeOptions) {
await this.send(new EventMessageAiNode(options));
}
}
34 changes: 34 additions & 0 deletions packages/cli/test/unit/ExecutionMetadataService.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Container } from 'typedi';
import { ExecutionMetadataRepository } from '@db/repositories/executionMetadata.repository';
import { ExecutionMetadataService } from '@/services/executionMetadata.service';
import { mockInstance } from '../shared/mocking';

describe('ExecutionMetadataService', () => {
const repository = mockInstance(ExecutionMetadataRepository);

test('Execution metadata is saved in a batch', async () => {
const toSave = {
test1: 'value1',
test2: 'value2',
};
const executionId = '1234';

await Container.get(ExecutionMetadataService).save(executionId, toSave);

expect(repository.save).toHaveBeenCalledTimes(1);
expect(repository.save.mock.calls[0]).toEqual([
[
{
execution: { id: executionId },
key: 'test1',
value: 'value1',
},
{
execution: { id: executionId },
key: 'test2',
value: 'value2',
},
],
]);
});
});
Loading
Loading