Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(editor): Buffer json chunks in stream response #10439

Merged
merged 5 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion packages/editor-ui/src/api/assistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ export function chatWithAssistant(
onDone: () => void,
onError: (e: Error) => void,
): void {
void streamRequest(ctx, '/ai-assistant/chat', payload, onMessageUpdated, onDone, onError);
void streamRequest<ChatRequest.ResponsePayload>(
ctx,
'/ai-assistant/chat',
payload,
onMessageUpdated,
onDone,
onError,
);
}

export async function replaceCode(
Expand Down
4 changes: 2 additions & 2 deletions packages/editor-ui/src/plugins/i18n/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@
"auth.signup.setupYourAccount": "Set up your account",
"auth.signup.setupYourAccountError": "Problem setting up your account",
"auth.signup.tokenValidationError": "Issue validating invite token",
"aiAssistant.name": "Ava",
"aiAssistant.name": "Assistant",
"aiAssistant.assistant": "AI Assistant",
"aiAssistant.newSessionModal.title.part1": "Start new",
"aiAssistant.newSessionModal.title.part2": "session",
"aiAssistant.newSessionModal.message": "You already have an active AI Assistant session. Starting a new session will clear your current conversation history.",
"aiAssistant.newSessionModal.question": "Are you sure you want to start a new session?",
"aiAssistant.newSessionModal.confirm": "Start new session",
"aiAssistant.serviceError.message": "Unable to connect to n8n's AI service",
"aiAssistant.codeUpdated.message.title": "Ava modified workflow",
"aiAssistant.codeUpdated.message.title": "Assistant modified workflow",
"aiAssistant.codeUpdated.message.body": "Open the <a data-action='openNodeDetail' data-action-parameter-node='{nodeName}'>{nodeName}</a> node to see the changes",
"banners.confirmEmail.message.1": "To secure your account and prevent future access issues, please confirm your",
"banners.confirmEmail.message.2": "email address.",
Expand Down
112 changes: 112 additions & 0 deletions packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { STREAM_SEPERATOR, streamRequest } from '../apiUtils';

describe('streamRequest', () => {
it('should stream data from the API endpoint', async () => {
const encoder = new TextEncoder();
const mockResponse = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}`));
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 2 })}${STREAM_SEPERATOR}`));
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 3 })}${STREAM_SEPERATOR}`));
controller.close();
},
});

const mockFetch = vi.fn().mockResolvedValue({
ok: true,
body: mockResponse,
});

global.fetch = mockFetch;

const onChunkMock = vi.fn();
const onDoneMock = vi.fn();
const onErrorMock = vi.fn();

await streamRequest(
{
baseUrl: 'https://api.example.com',
pushRef: '',
},
'/data',
{ key: 'value' },
onChunkMock,
onDoneMock,
onErrorMock,
);

expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', {
method: 'POST',
body: JSON.stringify({ key: 'value' }),
credentials: 'include',
headers: {
'Content-Type': 'application/json',
'browser-id': expect.stringContaining('-'),
},
});

expect(onChunkMock).toHaveBeenCalledTimes(3);
expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 });
expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 });
expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 });

expect(onDoneMock).toHaveBeenCalledTimes(1);
expect(onErrorMock).not.toHaveBeenCalled();
});

it('should handle broken stream data', async () => {
const encoder = new TextEncoder();
const mockResponse = new ReadableStream({
start(controller) {
controller.enqueue(
encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}{"chunk": `),
);
controller.enqueue(encoder.encode(`2}${STREAM_SEPERATOR}{"ch`));
controller.enqueue(encoder.encode('unk":'));
controller.enqueue(encoder.encode(`3}${STREAM_SEPERATOR}`));
controller.close();
},
});

const mockFetch = vi.fn().mockResolvedValue({
ok: true,
body: mockResponse,
});

global.fetch = mockFetch;

const onChunkMock = vi.fn();
const onDoneMock = vi.fn();
const onErrorMock = vi.fn();

await streamRequest(
{
baseUrl: 'https://api.example.com',
pushRef: '',
},
'/data',
{ key: 'value' },
onChunkMock,
onDoneMock,
onErrorMock,
);

expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', {
method: 'POST',
body: JSON.stringify({ key: 'value' }),
credentials: 'include',
headers: {
'Content-Type': 'application/json',
'browser-id': expect.stringContaining('-'),
},
});

expect(onChunkMock).toHaveBeenCalledTimes(3);
expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 });
expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 });
expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 });

expect(onDoneMock).toHaveBeenCalledTimes(1);
expect(onErrorMock).not.toHaveBeenCalled();
});
});
37 changes: 25 additions & 12 deletions packages/editor-ui/src/utils/apiUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import axios from 'axios';
import { ApplicationError, jsonParse, type GenericValue, type IDataObject } from 'n8n-workflow';
import type { IExecutionFlattedResponse, IExecutionResponse, IRestApiContext } from '@/Interface';
import { parse } from 'flatted';
import type { ChatRequest } from '@/types/assistant.types';
import { assert } from '@/utils/assert';

const BROWSER_ID_STORAGE_KEY = 'n8n-browserId';
Expand All @@ -14,6 +13,7 @@ if (!browserId && 'randomUUID' in crypto) {
}

export const NO_NETWORK_ERROR_CODE = 999;
export const STREAM_SEPERATOR = '⧉⇋⇋➽⌑⧉§§\n';

export class ResponseError extends ApplicationError {
// The HTTP status code of response
Expand Down Expand Up @@ -194,15 +194,15 @@ export function unflattenExecutionData(fullExecutionData: IExecutionFlattedRespo
return returnData;
}

export const streamRequest = async (
export async function streamRequest<T>(
context: IRestApiContext,
apiEndpoint: string,
payload: ChatRequest.RequestPayload,
onChunk?: (chunk: ChatRequest.ResponsePayload) => void,
payload: object,
onChunk?: (chunk: T) => void,
onDone?: () => void,
onError?: (e: Error) => void,
separator = '⧉⇋⇋➽⌑⧉§§\n',
): Promise<void> => {
separator = STREAM_SEPERATOR,
): Promise<void> {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
Expand All @@ -223,23 +223,36 @@ export const streamRequest = async (
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');

let buffer = '';

async function readStream() {
const { done, value } = await reader.read();
if (done) {
onDone?.();
return;
}

const chunk = decoder.decode(value);
const splitChunks = chunk.split(separator);
buffer += chunk;

const splitChunks = buffer.split(separator);

buffer = '';
for (const splitChunk of splitChunks) {
if (splitChunk && onChunk) {
if (splitChunk) {
let data: T;
try {
data = jsonParse<T>(splitChunk, { errorMessage: 'Invalid json' });
} catch (e) {
// incomplete json. append to buffer to complete
buffer += splitChunk;

continue;
}

try {
onChunk(jsonParse(splitChunk, { errorMessage: 'Invalid json chunk in stream' }));
onChunk?.(data);
} catch (e: unknown) {
if (e instanceof Error) {
console.log(`${e.message}: ${splitChunk}`);
mutdmour marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get rid of the console.log?

onError?.(e);
}
}
Expand All @@ -257,4 +270,4 @@ export const streamRequest = async (
assert(e instanceof Error);
onError?.(e);
}
};
}
Loading