Skip to content

Commit

Permalink
fix(editor): Buffer json chunks in stream response (n8n-io#10439)
Browse files Browse the repository at this point in the history
  • Loading branch information
mutdmour authored Aug 16, 2024
1 parent 1466ff7 commit 37797f3
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 15 deletions.
9 changes: 8 additions & 1 deletion packages/editor-ui/src/api/assistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ export function chatWithAssistant(
onDone: () => void,
onError: (e: Error) => void,
): void {
void streamRequest(ctx, '/ai-assistant/chat', payload, onMessageUpdated, onDone, onError);
void streamRequest<ChatRequest.ResponsePayload>(
ctx,
'/ai-assistant/chat',
payload,
onMessageUpdated,
onDone,
onError,
);
}

export async function replaceCode(
Expand Down
4 changes: 2 additions & 2 deletions packages/editor-ui/src/plugins/i18n/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@
"auth.signup.setupYourAccount": "Set up your account",
"auth.signup.setupYourAccountError": "Problem setting up your account",
"auth.signup.tokenValidationError": "Issue validating invite token",
"aiAssistant.name": "Ava",
"aiAssistant.name": "Assistant",
"aiAssistant.assistant": "AI Assistant",
"aiAssistant.newSessionModal.title.part1": "Start new",
"aiAssistant.newSessionModal.title.part2": "session",
"aiAssistant.newSessionModal.message": "You already have an active AI Assistant session. Starting a new session will clear your current conversation history.",
"aiAssistant.newSessionModal.question": "Are you sure you want to start a new session?",
"aiAssistant.newSessionModal.confirm": "Start new session",
"aiAssistant.serviceError.message": "Unable to connect to n8n's AI service",
"aiAssistant.codeUpdated.message.title": "Ava modified workflow",
"aiAssistant.codeUpdated.message.title": "Assistant modified workflow",
"aiAssistant.codeUpdated.message.body": "Open the <a data-action='openNodeDetail' data-action-parameter-node='{nodeName}'>{nodeName}</a> node to see the changes",
"banners.confirmEmail.message.1": "To secure your account and prevent future access issues, please confirm your",
"banners.confirmEmail.message.2": "email address.",
Expand Down
112 changes: 112 additions & 0 deletions packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { STREAM_SEPERATOR, streamRequest } from '../apiUtils';

describe('streamRequest', () => {
it('should stream data from the API endpoint', async () => {
const encoder = new TextEncoder();
const mockResponse = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}`));
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 2 })}${STREAM_SEPERATOR}`));
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 3 })}${STREAM_SEPERATOR}`));
controller.close();
},
});

const mockFetch = vi.fn().mockResolvedValue({
ok: true,
body: mockResponse,
});

global.fetch = mockFetch;

const onChunkMock = vi.fn();
const onDoneMock = vi.fn();
const onErrorMock = vi.fn();

await streamRequest(
{
baseUrl: 'https://api.example.com',
pushRef: '',
},
'/data',
{ key: 'value' },
onChunkMock,
onDoneMock,
onErrorMock,
);

expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', {
method: 'POST',
body: JSON.stringify({ key: 'value' }),
credentials: 'include',
headers: {
'Content-Type': 'application/json',
'browser-id': expect.stringContaining('-'),
},
});

expect(onChunkMock).toHaveBeenCalledTimes(3);
expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 });
expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 });
expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 });

expect(onDoneMock).toHaveBeenCalledTimes(1);
expect(onErrorMock).not.toHaveBeenCalled();
});

it('should handle broken stream data', async () => {
const encoder = new TextEncoder();
const mockResponse = new ReadableStream({
start(controller) {
controller.enqueue(
encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}{"chunk": `),
);
controller.enqueue(encoder.encode(`2}${STREAM_SEPERATOR}{"ch`));
controller.enqueue(encoder.encode('unk":'));
controller.enqueue(encoder.encode(`3}${STREAM_SEPERATOR}`));
controller.close();
},
});

const mockFetch = vi.fn().mockResolvedValue({
ok: true,
body: mockResponse,
});

global.fetch = mockFetch;

const onChunkMock = vi.fn();
const onDoneMock = vi.fn();
const onErrorMock = vi.fn();

await streamRequest(
{
baseUrl: 'https://api.example.com',
pushRef: '',
},
'/data',
{ key: 'value' },
onChunkMock,
onDoneMock,
onErrorMock,
);

expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', {
method: 'POST',
body: JSON.stringify({ key: 'value' }),
credentials: 'include',
headers: {
'Content-Type': 'application/json',
'browser-id': expect.stringContaining('-'),
},
});

expect(onChunkMock).toHaveBeenCalledTimes(3);
expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 });
expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 });
expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 });

expect(onDoneMock).toHaveBeenCalledTimes(1);
expect(onErrorMock).not.toHaveBeenCalled();
});
});
37 changes: 25 additions & 12 deletions packages/editor-ui/src/utils/apiUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import axios from 'axios';
import { ApplicationError, jsonParse, type GenericValue, type IDataObject } from 'n8n-workflow';
import type { IExecutionFlattedResponse, IExecutionResponse, IRestApiContext } from '@/Interface';
import { parse } from 'flatted';
import type { ChatRequest } from '@/types/assistant.types';
import { assert } from '@/utils/assert';

const BROWSER_ID_STORAGE_KEY = 'n8n-browserId';
Expand All @@ -14,6 +13,7 @@ if (!browserId && 'randomUUID' in crypto) {
}

export const NO_NETWORK_ERROR_CODE = 999;
export const STREAM_SEPERATOR = '⧉⇋⇋➽⌑⧉§§\n';

export class ResponseError extends ApplicationError {
// The HTTP status code of response
Expand Down Expand Up @@ -194,15 +194,15 @@ export function unflattenExecutionData(fullExecutionData: IExecutionFlattedRespo
return returnData;
}

export const streamRequest = async (
export async function streamRequest<T>(
context: IRestApiContext,
apiEndpoint: string,
payload: ChatRequest.RequestPayload,
onChunk?: (chunk: ChatRequest.ResponsePayload) => void,
payload: object,
onChunk?: (chunk: T) => void,
onDone?: () => void,
onError?: (e: Error) => void,
separator = '⧉⇋⇋➽⌑⧉§§\n',
): Promise<void> => {
separator = STREAM_SEPERATOR,
): Promise<void> {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
Expand All @@ -223,23 +223,36 @@ export const streamRequest = async (
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');

let buffer = '';

async function readStream() {
const { done, value } = await reader.read();
if (done) {
onDone?.();
return;
}

const chunk = decoder.decode(value);
const splitChunks = chunk.split(separator);
buffer += chunk;

const splitChunks = buffer.split(separator);

buffer = '';
for (const splitChunk of splitChunks) {
if (splitChunk && onChunk) {
if (splitChunk) {
let data: T;
try {
data = jsonParse<T>(splitChunk, { errorMessage: 'Invalid json' });
} catch (e) {
// incomplete json. append to buffer to complete
buffer += splitChunk;

continue;
}

try {
onChunk(jsonParse(splitChunk, { errorMessage: 'Invalid json chunk in stream' }));
onChunk?.(data);
} catch (e: unknown) {
if (e instanceof Error) {
console.log(`${e.message}: ${splitChunk}`);
onError?.(e);
}
}
Expand All @@ -257,4 +270,4 @@ export const streamRequest = async (
assert(e instanceof Error);
onError?.(e);
}
};
}

0 comments on commit 37797f3

Please sign in to comment.