From fa9a3e14d7c1e2993793ea99b6aa77302dfbf0ca Mon Sep 17 00:00:00 2001 From: Mutasem Aldmour Date: Thu, 15 Aug 2024 14:19:28 +0200 Subject: [PATCH 1/5] fix: Buffer json chunks in stream response --- packages/editor-ui/src/api/assistant.ts | 9 ++++++- packages/editor-ui/src/utils/apiUtils.ts | 31 +++++++++++++++++------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/packages/editor-ui/src/api/assistant.ts b/packages/editor-ui/src/api/assistant.ts index d9e9ec0cded21..4b1d127ef15c6 100644 --- a/packages/editor-ui/src/api/assistant.ts +++ b/packages/editor-ui/src/api/assistant.ts @@ -9,7 +9,14 @@ export function chatWithAssistant( onDone: () => void, onError: (e: Error) => void, ): void { - void streamRequest(ctx, '/ai-assistant/chat', payload, onMessageUpdated, onDone, onError); + void streamRequest( + ctx, + '/ai-assistant/chat', + payload, + onMessageUpdated, + onDone, + onError, + ); } export async function replaceCode( diff --git a/packages/editor-ui/src/utils/apiUtils.ts b/packages/editor-ui/src/utils/apiUtils.ts index e9201b7c331ea..b70b4b0bcaada 100644 --- a/packages/editor-ui/src/utils/apiUtils.ts +++ b/packages/editor-ui/src/utils/apiUtils.ts @@ -194,15 +194,15 @@ export function unflattenExecutionData(fullExecutionData: IExecutionFlattedRespo return returnData; } -export const streamRequest = async ( +export async function streamRequest( context: IRestApiContext, apiEndpoint: string, - payload: ChatRequest.RequestPayload, - onChunk?: (chunk: ChatRequest.ResponsePayload) => void, + payload: object, + onChunk?: (chunk: T) => void, onDone?: () => void, onError?: (e: Error) => void, separator = '⧉⇋⇋➽⌑⧉§§\n', -): Promise => { +): Promise { const headers: Record = { 'Content-Type': 'application/json', }; @@ -223,23 +223,36 @@ export const streamRequest = async ( const reader = response.body.getReader(); const decoder = new TextDecoder('utf-8'); + let buffer = ''; + async function readStream() { const { done, value } = await reader.read(); if (done) { onDone?.(); return; } - const chunk = decoder.decode(value); - const splitChunks = chunk.split(separator); + buffer += chunk; + const splitChunks = buffer.split(separator); + + buffer = ''; for (const splitChunk of splitChunks) { - if (splitChunk && onChunk) { + if (splitChunk) { + let data: T; + try { + data = jsonParse(splitChunk, { errorMessage: 'Invalid json' }); + } catch (e) { + // incomplete json. append to buffer to complete + buffer += splitChunk; + + continue; + } + try { - onChunk(jsonParse(splitChunk, { errorMessage: 'Invalid json chunk in stream' })); + onChunk?.(data); } catch (e: unknown) { if (e instanceof Error) { - console.log(`${e.message}: ${splitChunk}`); onError?.(e); } } From fd84d1dd9db220808400567e8308fe3aac51f707 Mon Sep 17 00:00:00 2001 From: Mutasem Aldmour Date: Thu, 15 Aug 2024 14:21:22 +0200 Subject: [PATCH 2/5] fix: lint --- packages/editor-ui/src/utils/apiUtils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/editor-ui/src/utils/apiUtils.ts b/packages/editor-ui/src/utils/apiUtils.ts index b70b4b0bcaada..41dd9c11ecb88 100644 --- a/packages/editor-ui/src/utils/apiUtils.ts +++ b/packages/editor-ui/src/utils/apiUtils.ts @@ -270,4 +270,4 @@ export async function streamRequest( assert(e instanceof Error); onError?.(e); } -}; +} From d79e2fa11f4e52f58b6a3849ccb1e1d449672a4f Mon Sep 17 00:00:00 2001 From: Mutasem Aldmour Date: Thu, 15 Aug 2024 14:29:33 +0200 Subject: [PATCH 3/5] fix: lint --- packages/editor-ui/src/utils/apiUtils.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/editor-ui/src/utils/apiUtils.ts b/packages/editor-ui/src/utils/apiUtils.ts index 41dd9c11ecb88..adfff50be67b3 100644 --- a/packages/editor-ui/src/utils/apiUtils.ts +++ b/packages/editor-ui/src/utils/apiUtils.ts @@ -3,7 +3,6 @@ import axios from 'axios'; import { ApplicationError, jsonParse, type GenericValue, type IDataObject } from 'n8n-workflow'; import type { IExecutionFlattedResponse, IExecutionResponse, IRestApiContext } from '@/Interface'; import { parse } from 'flatted'; -import type { ChatRequest } from '@/types/assistant.types'; import { assert } from '@/utils/assert'; const BROWSER_ID_STORAGE_KEY = 'n8n-browserId'; From 5fc20e17f8b1a95c8882674b71ebfd1eaa9635e6 Mon Sep 17 00:00:00 2001 From: Mutasem Aldmour Date: Thu, 15 Aug 2024 14:30:42 +0200 Subject: [PATCH 4/5] feat: rename assistant --- packages/editor-ui/src/plugins/i18n/locales/en.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/editor-ui/src/plugins/i18n/locales/en.json b/packages/editor-ui/src/plugins/i18n/locales/en.json index b082675d2d077..49b331e4c1787 100644 --- a/packages/editor-ui/src/plugins/i18n/locales/en.json +++ b/packages/editor-ui/src/plugins/i18n/locales/en.json @@ -131,7 +131,7 @@ "auth.signup.setupYourAccount": "Set up your account", "auth.signup.setupYourAccountError": "Problem setting up your account", "auth.signup.tokenValidationError": "Issue validating invite token", - "aiAssistant.name": "Ava", + "aiAssistant.name": "Assistant", "aiAssistant.assistant": "AI Assistant", "aiAssistant.newSessionModal.title.part1": "Start new", "aiAssistant.newSessionModal.title.part2": "session", @@ -139,7 +139,7 @@ "aiAssistant.newSessionModal.question": "Are you sure you want to start a new session?", "aiAssistant.newSessionModal.confirm": "Start new session", "aiAssistant.serviceError.message": "Unable to connect to n8n's AI service", - "aiAssistant.codeUpdated.message.title": "Ava modified workflow", + "aiAssistant.codeUpdated.message.title": "Assistant modified workflow", "aiAssistant.codeUpdated.message.body": "Open the {nodeName} node to see the changes", "banners.confirmEmail.message.1": "To secure your account and prevent future access issues, please confirm your", "banners.confirmEmail.message.2": "email address.", From f0d98b9ae924a94e9ae4180581efb5a4cde43fbb Mon Sep 17 00:00:00 2001 From: Mutasem Aldmour Date: Thu, 15 Aug 2024 15:01:27 +0200 Subject: [PATCH 5/5] feat: add tests --- .../src/utils/__tests__/apiUtils.spec.ts | 112 ++++++++++++++++++ packages/editor-ui/src/utils/apiUtils.ts | 3 +- 2 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts diff --git a/packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts b/packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts new file mode 100644 index 0000000000000..cefb6f3389bb7 --- /dev/null +++ b/packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts @@ -0,0 +1,112 @@ +import { STREAM_SEPERATOR, streamRequest } from '../apiUtils'; + +describe('streamRequest', () => { + it('should stream data from the API endpoint', async () => { + const encoder = new TextEncoder(); + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}`)); + controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 2 })}${STREAM_SEPERATOR}`)); + controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 3 })}${STREAM_SEPERATOR}`)); + controller.close(); + }, + }); + + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + body: mockResponse, + }); + + global.fetch = mockFetch; + + const onChunkMock = vi.fn(); + const onDoneMock = vi.fn(); + const onErrorMock = vi.fn(); + + await streamRequest( + { + baseUrl: 'https://api.example.com', + pushRef: '', + }, + '/data', + { key: 'value' }, + onChunkMock, + onDoneMock, + onErrorMock, + ); + + expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', { + method: 'POST', + body: JSON.stringify({ key: 'value' }), + credentials: 'include', + headers: { + 'Content-Type': 'application/json', + 'browser-id': expect.stringContaining('-'), + }, + }); + + expect(onChunkMock).toHaveBeenCalledTimes(3); + expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 }); + expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 }); + expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 }); + + expect(onDoneMock).toHaveBeenCalledTimes(1); + expect(onErrorMock).not.toHaveBeenCalled(); + }); + + it('should handle broken stream data', async () => { + const encoder = new TextEncoder(); + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}{"chunk": `), + ); + controller.enqueue(encoder.encode(`2}${STREAM_SEPERATOR}{"ch`)); + controller.enqueue(encoder.encode('unk":')); + controller.enqueue(encoder.encode(`3}${STREAM_SEPERATOR}`)); + controller.close(); + }, + }); + + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + body: mockResponse, + }); + + global.fetch = mockFetch; + + const onChunkMock = vi.fn(); + const onDoneMock = vi.fn(); + const onErrorMock = vi.fn(); + + await streamRequest( + { + baseUrl: 'https://api.example.com', + pushRef: '', + }, + '/data', + { key: 'value' }, + onChunkMock, + onDoneMock, + onErrorMock, + ); + + expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', { + method: 'POST', + body: JSON.stringify({ key: 'value' }), + credentials: 'include', + headers: { + 'Content-Type': 'application/json', + 'browser-id': expect.stringContaining('-'), + }, + }); + + expect(onChunkMock).toHaveBeenCalledTimes(3); + expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 }); + expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 }); + expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 }); + + expect(onDoneMock).toHaveBeenCalledTimes(1); + expect(onErrorMock).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/editor-ui/src/utils/apiUtils.ts b/packages/editor-ui/src/utils/apiUtils.ts index adfff50be67b3..b07f8910b90c8 100644 --- a/packages/editor-ui/src/utils/apiUtils.ts +++ b/packages/editor-ui/src/utils/apiUtils.ts @@ -13,6 +13,7 @@ if (!browserId && 'randomUUID' in crypto) { } export const NO_NETWORK_ERROR_CODE = 999; +export const STREAM_SEPERATOR = '⧉⇋⇋➽⌑⧉§§\n'; export class ResponseError extends ApplicationError { // The HTTP status code of response @@ -200,7 +201,7 @@ export async function streamRequest( onChunk?: (chunk: T) => void, onDone?: () => void, onError?: (e: Error) => void, - separator = '⧉⇋⇋➽⌑⧉§§\n', + separator = STREAM_SEPERATOR, ): Promise { const headers: Record = { 'Content-Type': 'application/json',