From 0a642ae39ae624ab93fb1e1d96a71762c711d3de Mon Sep 17 00:00:00 2001 From: Jacob Lee <jacoblee93@gmail.com> Date: Tue, 5 Dec 2023 16:44:27 -0800 Subject: [PATCH] Improve interaction with streams and Node's Readable.from() method (#3556) --- langchain-core/package.json | 2 +- langchain-core/src/utils/stream.ts | 9 ++-- langchain/src/util/stream.ts | 80 +----------------------------- 3 files changed, 8 insertions(+), 83 deletions(-) diff --git a/langchain-core/package.json b/langchain-core/package.json index 00c5fa943c83..b5e5b2cddf0d 100644 --- a/langchain-core/package.json +++ b/langchain-core/package.json @@ -1,6 +1,6 @@ { "name": "@langchain/core", - "version": "0.0.8", + "version": "0.0.9", "description": "Core LangChain.js abstractions and schemas", "type": "module", "engines": { diff --git a/langchain-core/src/utils/stream.ts b/langchain-core/src/utils/stream.ts index a5a5d88f391e..ce8c6289496c 100644 --- a/langchain-core/src/utils/stream.ts +++ b/langchain-core/src/utils/stream.ts @@ -28,9 +28,12 @@ export class IterableReadableStream<T> extends ReadableStream<T> { async return() { this.ensureReader(); - const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet - this.reader.releaseLock(); // release lock first - await cancelPromise; // now await it + // If wrapped in a Node stream, cancel is already called. + if (this.locked) { + const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet + this.reader.releaseLock(); // release lock first + await cancelPromise; // now await it + } return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway } diff --git a/langchain/src/util/stream.ts b/langchain/src/util/stream.ts index a5a5d88f391e..5a78f523e038 100644 --- a/langchain/src/util/stream.ts +++ b/langchain/src/util/stream.ts @@ -1,79 +1 @@ -/* - * Support async iterator syntax for ReadableStreams in all environments. - * Source: https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490 - */ -export class IterableReadableStream<T> extends ReadableStream<T> { - public reader: ReadableStreamDefaultReader<T>; - - ensureReader() { - if (!this.reader) { - this.reader = this.getReader(); - } - } - - async next() { - this.ensureReader(); - try { - const result = await this.reader.read(); - if (result.done) this.reader.releaseLock(); // release lock when stream becomes closed - return { - done: result.done, - value: result.value as T, // Cloudflare Workers typing fix - }; - } catch (e) { - this.reader.releaseLock(); // release lock when stream becomes errored - throw e; - } - } - - async return() { - this.ensureReader(); - const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet - this.reader.releaseLock(); // release lock first - await cancelPromise; // now await it - return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway - } - - [Symbol.asyncIterator]() { - return this; - } - - static fromReadableStream<T>(stream: ReadableStream<T>) { - // From https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#reading_the_stream - const reader = stream.getReader(); - return new IterableReadableStream<T>({ - start(controller) { - return pump(); - function pump(): Promise<T | undefined> { - return reader.read().then(({ done, value }) => { - // When no more data needs to be consumed, close the stream - if (done) { - controller.close(); - return; - } - // Enqueue the next data chunk into our target stream - controller.enqueue(value); - return pump(); - }); - } - }, - cancel() { - reader.releaseLock(); - }, - }); - } - - static fromAsyncGenerator<T>(generator: AsyncGenerator<T>) { - return new IterableReadableStream<T>({ - async pull(controller) { - const { value, done } = await generator.next(); - // When no more data needs to be consumed, close the stream - if (done) { - controller.close(); - } - // Fix: `else if (value)` will hang the streaming when nullish value (e.g. empty string) is pulled - controller.enqueue(value); - }, - }); - } -} +export * from "@langchain/core/utils/stream";