From 45216514f66b0493cc0fa4d86deb163e51b39508 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 15 Dec 2023 17:22:52 -0800 Subject: [PATCH 1/4] Use an interface for runnables to allow more compatibility between core versions --- langchain-core/src/runnables/base.ts | 52 ++++- .../tests/runnable_interface.test.ts | 206 ++++++++++++++++++ langchain-core/src/utils/stream.ts | 15 +- 3 files changed, 269 insertions(+), 4 deletions(-) create mode 100644 langchain-core/src/runnables/tests/runnable_interface.test.ts diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index 8504b24fdedd..a3d51e30a232 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -11,7 +11,10 @@ import { RunLogPatch, } from "../tracers/log_stream.js"; import { Serializable } from "../load/serializable.js"; -import { IterableReadableStream } from "../utils/stream.js"; +import { + IterableReadableStream, + type IterableReadableStreamInterface +} from "../utils/stream.js"; import { RunnableConfig, getCallbackMangerForConfig, @@ -21,6 +24,48 @@ import { AsyncCaller } from "../utils/async_caller.js"; import { Run } from "../tracers/base.js"; import { RootListenersTracer } from "../tracers/root_listener.js"; +/** + * Base interface implemented by all runnables. + * Used for cross-compatibility between different versions of LangChain core. + * + * Should not change on patch releases. + */ +export interface RunnableInterface { + invoke( + input: RunInput, + options?: Partial + ): Promise; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptions & { returnExceptions?: false } + ): Promise; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptions & { returnExceptions: true } + ): Promise<(RunOutput | Error)[]>; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptions + ): Promise<(RunOutput | Error)[]>; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptions + ): Promise<(RunOutput | Error)[]>; + + stream( + input: RunInput, + options?: Partial + ): Promise>; +} + export type RunnableFunc = ( input: RunInput, options?: @@ -37,7 +82,8 @@ export type RunnableMapLike = { // eslint-disable-next-line @typescript-eslint/no-explicit-any export type RunnableLike = - | Runnable + // eslint-disable-next-line @typescript-eslint/no-explicit-any + | RunnableInterface | RunnableFunc | RunnableMapLike; @@ -66,7 +112,7 @@ export abstract class Runnable< // eslint-disable-next-line @typescript-eslint/no-explicit-any RunOutput = any, CallOptions extends RunnableConfig = RunnableConfig -> extends Serializable { +> extends Serializable implements RunnableInterface { protected lc_runnable = true; abstract invoke( diff --git a/langchain-core/src/runnables/tests/runnable_interface.test.ts b/langchain-core/src/runnables/tests/runnable_interface.test.ts new file mode 100644 index 000000000000..3efc7a88ee9d --- /dev/null +++ b/langchain-core/src/runnables/tests/runnable_interface.test.ts @@ -0,0 +1,206 @@ +/* eslint-disable no-promise-executor-return */ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { StringOutputParser } from "../../output_parsers/string.js"; +import { PromptTemplate } from "../../prompts/prompt.js"; +import { RunnableSequence } from "../base.js"; + +type RunnableBatchOptionsV0 = { + maxConcurrency?: number; + returnExceptions?: boolean; +}; + +interface RunnableInterfaceV0 { + invoke( + input: RunInput, + options?: Partial + ): Promise; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptionsV0 & { returnExceptions?: false } + ): Promise; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptionsV0 & { returnExceptions: true } + ): Promise<(RunOutput | Error)[]>; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptionsV0 + ): Promise<(RunOutput | Error)[]>; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptionsV0 + ): Promise<(RunOutput | Error)[]>; + + stream( + input: RunInput, + options?: Partial + ): Promise>; +} + +export class IterableReadableStreamV0 extends ReadableStream { + public reader: ReadableStreamDefaultReader; + + ensureReader() { + if (!this.reader) { + this.reader = this.getReader(); + } + } + + async next() { + this.ensureReader(); + try { + const result = await this.reader.read(); + if (result.done) this.reader.releaseLock(); // release lock when stream becomes closed + return { + done: result.done, + value: result.value as T, // Cloudflare Workers typing fix + }; + } catch (e) { + this.reader.releaseLock(); // release lock when stream becomes errored + throw e; + } + } + + async return() { + this.ensureReader(); + // If wrapped in a Node stream, cancel is already called. + if (this.locked) { + const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet + this.reader.releaseLock(); // release lock first + await cancelPromise; // now await it + } + return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway + } + + async throw(e: any): Promise> { + throw e; + } + + [Symbol.asyncIterator]() { + return this; + } + + static fromReadableStream(stream: ReadableStream) { + // From https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#reading_the_stream + const reader = stream.getReader(); + return new IterableReadableStreamV0({ + start(controller) { + return pump(); + function pump(): Promise { + return reader.read().then(({ done, value }) => { + // When no more data needs to be consumed, close the stream + if (done) { + controller.close(); + return; + } + // Enqueue the next data chunk into our target stream + controller.enqueue(value); + return pump(); + }); + } + }, + cancel() { + reader.releaseLock(); + }, + }); + } + + static fromAsyncGenerator(generator: AsyncGenerator) { + return new IterableReadableStreamV0({ + async pull(controller) { + const { value, done } = await generator.next(); + // When no more data needs to be consumed, close the stream + if (done) { + controller.close(); + } + // Fix: `else if (value)` will hang the streaming when nullish value (e.g. empty string) is pulled + controller.enqueue(value); + }, + }); + } +} + +/** + * Base class for all types of messages in a conversation. It includes + * properties like `content`, `name`, and `additional_kwargs`. It also + * includes methods like `toDict()` and `_getType()`. + */ +export class AIMessageV0 { + lc_namespace = ["langchain_core", "messages"]; + + lc_serializable = true; + + /** The content of the message. */ + content: string; + + /** The name of the message sender in a multi-user chat. */ + name?: string; + + /** The type of the message. */ + _getType() { + return "ai"; + }; + + constructor( + content: string + ) { + this.content = content; + } +} + +class RunnableV0 implements RunnableInterfaceV0 { + protected lc_runnable = true; + + async invoke(input: any, _options?: Partial | undefined): Promise { + return new AIMessageV0(input.toString()); + } + + async batch(inputs: any[], options?: Partial | Partial[] | undefined, batchOptions?: (RunnableBatchOptionsV0 & { returnExceptions?: false | undefined; }) | undefined): Promise; + async batch(inputs: any[], options?: Partial | Partial[] | undefined, batchOptions?: (RunnableBatchOptionsV0 & { returnExceptions: true; }) | undefined): Promise; + async batch(inputs: any[], options?: Partial | Partial[] | undefined, batchOptions?: RunnableBatchOptionsV0 | undefined): Promise; + async batch(inputs: any[], options?: Partial | Partial[] | undefined, batchOptions?: RunnableBatchOptionsV0 | undefined): Promise; + async batch(inputs: unknown, _options?: unknown, _batchOptions?: unknown): Promise { + return [inputs]; + } + + async stream(input: any, _options?: Partial | undefined): Promise> { + return input; + } + + async *transform(generator: AsyncGenerator, _options: Partial): AsyncGenerator { + yield *generator; + } +} + +test("Pipe with a class that implements a runnable interface", async () => { + const promptTemplate = PromptTemplate.fromTemplate("{input}"); + const llm = new RunnableV0(); + const outputParser = new StringOutputParser(); + const runnable = promptTemplate.pipe(llm).pipe(outputParser); + const result = await runnable.invoke({ input: "Hello world!!" }); + console.log(result); + expect(result).toBe("Hello world!!"); +}); + +test("Runnable sequence with a class that implements a runnable interface", async () => { + const promptTemplate = PromptTemplate.fromTemplate("{input}"); + const llm = new RunnableV0(); + const outputParser = new StringOutputParser(); + const runnable = RunnableSequence.from([ + promptTemplate, + llm, + outputParser, + ]) + const result = await runnable.invoke({ input: "Hello sequence!!" }); + console.log(result); + expect(result).toBe("Hello sequence!!"); +}); diff --git a/langchain-core/src/utils/stream.ts b/langchain-core/src/utils/stream.ts index ce8c6289496c..324f84945c3f 100644 --- a/langchain-core/src/utils/stream.ts +++ b/langchain-core/src/utils/stream.ts @@ -1,8 +1,10 @@ +export interface IterableReadableStreamInterface extends ReadableStream, AsyncGenerator {} + /* * Support async iterator syntax for ReadableStreams in all environments. * Source: https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490 */ -export class IterableReadableStream extends ReadableStream { +export class IterableReadableStream extends ReadableStream implements IterableReadableStreamInterface { public reader: ReadableStreamDefaultReader; ensureReader() { @@ -36,6 +38,17 @@ export class IterableReadableStream extends ReadableStream { } return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + async throw(e: any): Promise> { + this.ensureReader(); + if (this.locked) { + const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet + this.reader.releaseLock(); // release lock first + await cancelPromise; // now await it + } + throw e; + } [Symbol.asyncIterator]() { return this; From 1f072f036111ec3f27b0620a5f1c3d790e257237 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 15 Dec 2023 17:27:39 -0800 Subject: [PATCH 2/4] Lint + format --- langchain-core/src/runnables/base.ts | 38 +++---- .../tests/runnable_interface.test.ts | 99 +++++++++++++------ langchain-core/src/utils/stream.ts | 11 ++- 3 files changed, 98 insertions(+), 50 deletions(-) diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index a3d51e30a232..8a9b73ba133f 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -11,9 +11,9 @@ import { RunLogPatch, } from "../tracers/log_stream.js"; import { Serializable } from "../load/serializable.js"; -import { - IterableReadableStream, - type IterableReadableStreamInterface +import { + IterableReadableStream, + type IterableReadableStreamInterface, } from "../utils/stream.js"; import { RunnableConfig, @@ -27,15 +27,16 @@ import { RootListenersTracer } from "../tracers/root_listener.js"; /** * Base interface implemented by all runnables. * Used for cross-compatibility between different versions of LangChain core. - * + * * Should not change on patch releases. */ -export interface RunnableInterface { - invoke( - input: RunInput, - options?: Partial - ): Promise; - +export interface RunnableInterface< + RunInput, + RunOutput, + CallOptions extends RunnableConfig = RunnableConfig +> { + invoke(input: RunInput, options?: Partial): Promise; + batch( inputs: RunInput[], options?: Partial | Partial[], @@ -59,7 +60,7 @@ export interface RunnableInterface | Partial[], batchOptions?: RunnableBatchOptions ): Promise<(RunOutput | Error)[]>; - + stream( input: RunInput, options?: Partial @@ -107,12 +108,15 @@ function _coerceToDict(value: any, defaultKey: string) { * transformed. */ export abstract class Runnable< - // eslint-disable-next-line @typescript-eslint/no-explicit-any - RunInput = any, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - RunOutput = any, - CallOptions extends RunnableConfig = RunnableConfig -> extends Serializable implements RunnableInterface { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + RunInput = any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + RunOutput = any, + CallOptions extends RunnableConfig = RunnableConfig + > + extends Serializable + implements RunnableInterface +{ protected lc_runnable = true; abstract invoke( diff --git a/langchain-core/src/runnables/tests/runnable_interface.test.ts b/langchain-core/src/runnables/tests/runnable_interface.test.ts index 3efc7a88ee9d..9352cf31a3ca 100644 --- a/langchain-core/src/runnables/tests/runnable_interface.test.ts +++ b/langchain-core/src/runnables/tests/runnable_interface.test.ts @@ -11,10 +11,7 @@ type RunnableBatchOptionsV0 = { }; interface RunnableInterfaceV0 { - invoke( - input: RunInput, - options?: Partial - ): Promise; + invoke(input: RunInput, options?: Partial): Promise; batch( inputs: RunInput[], @@ -80,7 +77,7 @@ export class IterableReadableStreamV0 extends ReadableStream { } return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway } - + async throw(e: any): Promise> { throw e; } @@ -148,36 +145,82 @@ export class AIMessageV0 { /** The type of the message. */ _getType() { return "ai"; - }; + } - constructor( - content: string - ) { + constructor(content: string) { this.content = content; } } -class RunnableV0 implements RunnableInterfaceV0 { +export class StringPromptValueV0 { + lc_namespace = ["langchain_core", "prompt_values"]; + + lc_serializable = true; + + value: string; + + constructor(value: string) { + this.value = value; + } + + toString() { + return this.value; + } +} + +class RunnableV0 + implements RunnableInterfaceV0 +{ protected lc_runnable = true; - - async invoke(input: any, _options?: Partial | undefined): Promise { + + async invoke( + input: StringPromptValueV0, + _options?: Partial | undefined + ): Promise { return new AIMessageV0(input.toString()); } - - async batch(inputs: any[], options?: Partial | Partial[] | undefined, batchOptions?: (RunnableBatchOptionsV0 & { returnExceptions?: false | undefined; }) | undefined): Promise; - async batch(inputs: any[], options?: Partial | Partial[] | undefined, batchOptions?: (RunnableBatchOptionsV0 & { returnExceptions: true; }) | undefined): Promise; - async batch(inputs: any[], options?: Partial | Partial[] | undefined, batchOptions?: RunnableBatchOptionsV0 | undefined): Promise; - async batch(inputs: any[], options?: Partial | Partial[] | undefined, batchOptions?: RunnableBatchOptionsV0 | undefined): Promise; - async batch(inputs: unknown, _options?: unknown, _batchOptions?: unknown): Promise { - return [inputs]; - } - - async stream(input: any, _options?: Partial | undefined): Promise> { - return input; + + async batch( + inputs: StringPromptValueV0[], + options?: Partial | Partial[] | undefined, + batchOptions?: + | (RunnableBatchOptionsV0 & { returnExceptions?: false | undefined }) + | undefined + ): Promise; + + async batch( + inputs: StringPromptValueV0[], + options?: Partial | Partial[] | undefined, + batchOptions?: + | (RunnableBatchOptionsV0 & { returnExceptions: true }) + | undefined + ): Promise; + + async batch( + inputs: StringPromptValueV0[], + options?: Partial | Partial[] | undefined, + batchOptions?: RunnableBatchOptionsV0 | undefined + ): Promise; + + async batch( + inputs: StringPromptValueV0[], + options?: Partial | Partial[] | undefined, + batchOptions?: RunnableBatchOptionsV0 | undefined + ): Promise; + + async batch( + _inputs: unknown, + _options?: unknown, + _batchOptions?: unknown + ): Promise { + return []; } - async *transform(generator: AsyncGenerator, _options: Partial): AsyncGenerator { - yield *generator; + async stream( + _input: StringPromptValueV0, + _options?: Partial | undefined + ): Promise> { + throw new Error("Not implemented"); } } @@ -195,11 +238,7 @@ test("Runnable sequence with a class that implements a runnable interface", asyn const promptTemplate = PromptTemplate.fromTemplate("{input}"); const llm = new RunnableV0(); const outputParser = new StringOutputParser(); - const runnable = RunnableSequence.from([ - promptTemplate, - llm, - outputParser, - ]) + const runnable = RunnableSequence.from([promptTemplate, llm, outputParser]); const result = await runnable.invoke({ input: "Hello sequence!!" }); console.log(result); expect(result).toBe("Hello sequence!!"); diff --git a/langchain-core/src/utils/stream.ts b/langchain-core/src/utils/stream.ts index 324f84945c3f..73bc24901181 100644 --- a/langchain-core/src/utils/stream.ts +++ b/langchain-core/src/utils/stream.ts @@ -1,10 +1,15 @@ -export interface IterableReadableStreamInterface extends ReadableStream, AsyncGenerator {} +export interface IterableReadableStreamInterface + extends ReadableStream, + AsyncGenerator {} /* * Support async iterator syntax for ReadableStreams in all environments. * Source: https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490 */ -export class IterableReadableStream extends ReadableStream implements IterableReadableStreamInterface { +export class IterableReadableStream + extends ReadableStream + implements IterableReadableStreamInterface +{ public reader: ReadableStreamDefaultReader; ensureReader() { @@ -38,7 +43,7 @@ export class IterableReadableStream extends ReadableStream implements Iter } return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway } - + // eslint-disable-next-line @typescript-eslint/no-explicit-any async throw(e: any): Promise> { this.ensureReader(); From 1876e315d64f0f308309de7610d2b72e063661f7 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 15 Dec 2023 17:29:45 -0800 Subject: [PATCH 3/4] Remove unnecessary exports --- .../src/runnables/tests/runnable_interface.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/langchain-core/src/runnables/tests/runnable_interface.test.ts b/langchain-core/src/runnables/tests/runnable_interface.test.ts index 9352cf31a3ca..0e99a4c19b82 100644 --- a/langchain-core/src/runnables/tests/runnable_interface.test.ts +++ b/langchain-core/src/runnables/tests/runnable_interface.test.ts @@ -43,7 +43,7 @@ interface RunnableInterfaceV0 { ): Promise>; } -export class IterableReadableStreamV0 extends ReadableStream { +class IterableReadableStreamV0 extends ReadableStream { public reader: ReadableStreamDefaultReader; ensureReader() { @@ -131,7 +131,7 @@ export class IterableReadableStreamV0 extends ReadableStream { * properties like `content`, `name`, and `additional_kwargs`. It also * includes methods like `toDict()` and `_getType()`. */ -export class AIMessageV0 { +class AIMessageV0 { lc_namespace = ["langchain_core", "messages"]; lc_serializable = true; @@ -152,7 +152,7 @@ export class AIMessageV0 { } } -export class StringPromptValueV0 { +class StringPromptValueV0 { lc_namespace = ["langchain_core", "prompt_values"]; lc_serializable = true; From 1c6b5b2f05ea148d2da2a7aae247445a62909df7 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Sat, 16 Dec 2023 13:41:54 -0800 Subject: [PATCH 4/4] Add transform to required methods --- langchain-core/src/runnables/base.ts | 5 +++++ .../src/runnables/tests/runnable_interface.test.ts | 13 +++++++++++++ 2 files changed, 18 insertions(+) diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index 8a9b73ba133f..e1a1c4c0d815 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -65,6 +65,11 @@ export interface RunnableInterface< input: RunInput, options?: Partial ): Promise>; + + transform( + generator: AsyncGenerator, + options: Partial + ): AsyncGenerator; } export type RunnableFunc = ( diff --git a/langchain-core/src/runnables/tests/runnable_interface.test.ts b/langchain-core/src/runnables/tests/runnable_interface.test.ts index 0e99a4c19b82..d8c20e3bbe71 100644 --- a/langchain-core/src/runnables/tests/runnable_interface.test.ts +++ b/langchain-core/src/runnables/tests/runnable_interface.test.ts @@ -41,6 +41,11 @@ interface RunnableInterfaceV0 { input: RunInput, options?: Partial ): Promise>; + + transform( + generator: AsyncGenerator, + options: Partial + ): AsyncGenerator; } class IterableReadableStreamV0 extends ReadableStream { @@ -222,6 +227,14 @@ class RunnableV0 ): Promise> { throw new Error("Not implemented"); } + + // eslint-disable-next-line require-yield + async *transform( + _generator: AsyncGenerator, + _options: Partial + ): AsyncGenerator { + throw new Error("Not implemented"); + } } test("Pipe with a class that implements a runnable interface", async () => {