Skip to content

Commit

Permalink
Use node-fetch on javascript SDK
Browse files Browse the repository at this point in the history
We realized that the sdk is run on client's server and we can't control
the nodejs version or if the have `fetch` polyfill. Also one user saw
our sdk failing with TLSSocket.onSocketEnd TypeError terminated
other.side closed. The stack trace we saw was pointing to undinci
nodejs/undici#583
Which looks is the nodejs fetch implementatin. So we try to use
node-fetch to see if this mitigates the error
  • Loading branch information
andresgutgon committed Oct 16, 2024
1 parent 9ff7f37 commit a74cd5f
Show file tree
Hide file tree
Showing 5 changed files with 569 additions and 472 deletions.
3 changes: 3 additions & 0 deletions packages/sdks/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"dependencies": {
"@t3-oss/env-core": "^0.10.1",
"eventsource-parser": "^2.0.1",
"node-fetch": "3.3.2",
"zod": "^3.23.8"
},
"devDependencies": {
Expand All @@ -38,6 +39,8 @@
"@rollup/plugin-replace": "^6.0.1",
"@rollup/plugin-typescript": "^11.1.6",
"@types/eventsource": "^1.1.15",
"@types/node": "^22.7.5",
"@types/node-fetch": "^2.6.11",
"msw": "^2.3.5",
"rollup": "^4.21.1",
"rollup-plugin-dts": "^6.1.1",
Expand Down
2 changes: 2 additions & 0 deletions packages/sdks/typescript/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ const aliasEntries = {
const EXTERNALS = [
'@t3-oss/env-core',
'zod',
// This fix some circular dependencies from core. Not really needed in prod
'flydrive/types',
'node-fetch',
'stream',
'eventsource-parser/stream',
]
Expand Down
38 changes: 32 additions & 6 deletions packages/sdks/typescript/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Readable } from 'stream'

import type { Config, Message } from '@latitude-data/compiler'
import type {
ChainCallResponseDto,
Expand All @@ -7,9 +9,11 @@ import type {
} from '@latitude-data/core/browser'
import env from '$sdk/env'
import { GatewayApiConfig, RouteResolver } from '$sdk/utils'
import { nodeFetchResponseToReadableStream } from '$sdk/utils/nodeFetchResponseToReadableStream'
import { BodyParams, HandlerType, UrlParams } from '$sdk/utils/types'
import { ParsedEvent, ReconnectInterval } from 'eventsource-parser'
import { EventSourceParserStream } from 'eventsource-parser/stream'
import nodeFetch from 'node-fetch'

export type StreamChainResponse = {
conversation: Message[]
Expand Down Expand Up @@ -153,7 +157,7 @@ export class Latitude {
versionUuid = versionUuid ?? this.versionUuid

try {
const response = await this.request({
const response = await this.nodeFetchRequest({
method: 'POST',
handler: HandlerType.RunDocument,
params: { projectId, versionUuid },
Expand All @@ -166,7 +170,7 @@ export class Latitude {
}

return this.handleStream({
stream: response.body!,
body: response.body! as Readable,
onEvent,
onFinished,
onError,
Expand All @@ -182,33 +186,34 @@ export class Latitude {
messages: Message[],
{ onEvent, onFinished, onError }: StreamResponseCallbacks = {},
) {
const response = await this.request({
const response = await this.nodeFetchRequest({
method: 'POST',
handler: HandlerType.Chat,
params: { conversationUuid: uuid },
body: { messages },
})
return this.handleStream({
stream: response.body!,
body: response.body! as Readable,
onEvent,
onFinished,
onError,
})
}

private async handleStream({
stream,
body,
onEvent,
onFinished,
onError,
}: StreamResponseCallbacks & {
stream: ReadableStream
body: Readable
}) {
let conversation: Message[] = []
let uuid: string | undefined
let chainResponse: ChainCallResponseDto | undefined

const parser = new EventSourceParserStream()
const stream = nodeFetchResponseToReadableStream(body)
const eventStream = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(parser)
Expand Down Expand Up @@ -286,6 +291,27 @@ export class Latitude {
})
}

private async nodeFetchRequest<H extends HandlerType>({
method,
handler,
params,
body,
}: {
handler: H
params?: UrlParams<H>
method: 'POST' | 'GET' | 'PUT' | 'DELETE'
body?: BodyParams<H>
}) {
return nodeFetch(this.routeResolver.resolve({ handler, params }), {
method,
headers: this.authHeader,
body:
method === 'POST'
? this.bodyToString({ ...body, __internal: { source: this.source } })
: undefined,
})
}

private parseJSON(line: string) {
try {
return JSON.parse(line) as ChainEventDto
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Readable } from 'stream'

export function nodeFetchResponseToReadableStream(nodeStream: Readable) {
return new ReadableStream({
start(controller) {
nodeStream.on('data', (chunk: Buffer) => {
controller.enqueue(chunk)
})

nodeStream.on('end', () => {
controller.close()
})

nodeStream.on('error', (err) => {
controller.error(err)
})
},
})
}
Loading

0 comments on commit a74cd5f

Please sign in to comment.