Skip to content

Commit

Permalink
Improve interaction with streams and Node's Readable.from() method (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoblee93 authored Dec 6, 2023
1 parent 1e2bfb7 commit 0a642ae
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 83 deletions.
2 changes: 1 addition & 1 deletion langchain-core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@langchain/core",
"version": "0.0.8",
"version": "0.0.9",
"description": "Core LangChain.js abstractions and schemas",
"type": "module",
"engines": {
Expand Down
9 changes: 6 additions & 3 deletions langchain-core/src/utils/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ export class IterableReadableStream<T> extends ReadableStream<T> {

async return() {
this.ensureReader();
const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
this.reader.releaseLock(); // release lock first
await cancelPromise; // now await it
// If wrapped in a Node stream, cancel is already called.
if (this.locked) {
const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
this.reader.releaseLock(); // release lock first
await cancelPromise; // now await it
}
return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway
}

Expand Down
80 changes: 1 addition & 79 deletions langchain/src/util/stream.ts
Original file line number Diff line number Diff line change
@@ -1,79 +1 @@
/*
* Support async iterator syntax for ReadableStreams in all environments.
* Source: https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
*/
export class IterableReadableStream<T> extends ReadableStream<T> {
public reader: ReadableStreamDefaultReader<T>;

ensureReader() {
if (!this.reader) {
this.reader = this.getReader();
}
}

async next() {
this.ensureReader();
try {
const result = await this.reader.read();
if (result.done) this.reader.releaseLock(); // release lock when stream becomes closed
return {
done: result.done,
value: result.value as T, // Cloudflare Workers typing fix
};
} catch (e) {
this.reader.releaseLock(); // release lock when stream becomes errored
throw e;
}
}

async return() {
this.ensureReader();
const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
this.reader.releaseLock(); // release lock first
await cancelPromise; // now await it
return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway
}

[Symbol.asyncIterator]() {
return this;
}

static fromReadableStream<T>(stream: ReadableStream<T>) {
// From https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#reading_the_stream
const reader = stream.getReader();
return new IterableReadableStream<T>({
start(controller) {
return pump();
function pump(): Promise<T | undefined> {
return reader.read().then(({ done, value }) => {
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
return;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
});
}
},
cancel() {
reader.releaseLock();
},
});
}

static fromAsyncGenerator<T>(generator: AsyncGenerator<T>) {
return new IterableReadableStream<T>({
async pull(controller) {
const { value, done } = await generator.next();
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
}
// Fix: `else if (value)` will hang the streaming when nullish value (e.g. empty string) is pulled
controller.enqueue(value);
},
});
}
}
export * from "@langchain/core/utils/stream";

2 comments on commit 0a642ae

@vercel
Copy link

@vercel vercel bot commented on 0a642ae Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vercel
Copy link

@vercel vercel bot commented on 0a642ae Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

langchainjs-docs – ./docs/core_docs/

langchainjs-docs-langchain.vercel.app
langchainjs-docs-git-main-langchain.vercel.app
langchainjs-docs-ruddy.vercel.app
js.langchain.com

Please sign in to comment.