diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index 8504b24fdedd..e1a1c4c0d815 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -11,7 +11,10 @@ import { RunLogPatch, } from "../tracers/log_stream.js"; import { Serializable } from "../load/serializable.js"; -import { IterableReadableStream } from "../utils/stream.js"; +import { + IterableReadableStream, + type IterableReadableStreamInterface, +} from "../utils/stream.js"; import { RunnableConfig, getCallbackMangerForConfig, @@ -21,6 +24,54 @@ import { AsyncCaller } from "../utils/async_caller.js"; import { Run } from "../tracers/base.js"; import { RootListenersTracer } from "../tracers/root_listener.js"; +/** + * Base interface implemented by all runnables. + * Used for cross-compatibility between different versions of LangChain core. + * + * Should not change on patch releases. + */ +export interface RunnableInterface< + RunInput, + RunOutput, + CallOptions extends RunnableConfig = RunnableConfig +> { + invoke(input: RunInput, options?: Partial): Promise; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptions & { returnExceptions?: false } + ): Promise; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptions & { returnExceptions: true } + ): Promise<(RunOutput | Error)[]>; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptions + ): Promise<(RunOutput | Error)[]>; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptions + ): Promise<(RunOutput | Error)[]>; + + stream( + input: RunInput, + options?: Partial + ): Promise>; + + transform( + generator: AsyncGenerator, + options: Partial + ): AsyncGenerator; +} + export type RunnableFunc = ( input: RunInput, options?: @@ -37,7 +88,8 @@ export type RunnableMapLike = { // eslint-disable-next-line @typescript-eslint/no-explicit-any export type RunnableLike = - | Runnable + // eslint-disable-next-line @typescript-eslint/no-explicit-any + | RunnableInterface | RunnableFunc | RunnableMapLike; @@ -61,12 +113,15 @@ function _coerceToDict(value: any, defaultKey: string) { * transformed. */ export abstract class Runnable< - // eslint-disable-next-line @typescript-eslint/no-explicit-any - RunInput = any, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - RunOutput = any, - CallOptions extends RunnableConfig = RunnableConfig -> extends Serializable { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + RunInput = any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + RunOutput = any, + CallOptions extends RunnableConfig = RunnableConfig + > + extends Serializable + implements RunnableInterface +{ protected lc_runnable = true; abstract invoke( diff --git a/langchain-core/src/runnables/tests/runnable_interface.test.ts b/langchain-core/src/runnables/tests/runnable_interface.test.ts new file mode 100644 index 000000000000..d8c20e3bbe71 --- /dev/null +++ b/langchain-core/src/runnables/tests/runnable_interface.test.ts @@ -0,0 +1,258 @@ +/* eslint-disable no-promise-executor-return */ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { StringOutputParser } from "../../output_parsers/string.js"; +import { PromptTemplate } from "../../prompts/prompt.js"; +import { RunnableSequence } from "../base.js"; + +type RunnableBatchOptionsV0 = { + maxConcurrency?: number; + returnExceptions?: boolean; +}; + +interface RunnableInterfaceV0 { + invoke(input: RunInput, options?: Partial): Promise; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptionsV0 & { returnExceptions?: false } + ): Promise; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptionsV0 & { returnExceptions: true } + ): Promise<(RunOutput | Error)[]>; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptionsV0 + ): Promise<(RunOutput | Error)[]>; + + batch( + inputs: RunInput[], + options?: Partial | Partial[], + batchOptions?: RunnableBatchOptionsV0 + ): Promise<(RunOutput | Error)[]>; + + stream( + input: RunInput, + options?: Partial + ): Promise>; + + transform( + generator: AsyncGenerator, + options: Partial + ): AsyncGenerator; +} + +class IterableReadableStreamV0 extends ReadableStream { + public reader: ReadableStreamDefaultReader; + + ensureReader() { + if (!this.reader) { + this.reader = this.getReader(); + } + } + + async next() { + this.ensureReader(); + try { + const result = await this.reader.read(); + if (result.done) this.reader.releaseLock(); // release lock when stream becomes closed + return { + done: result.done, + value: result.value as T, // Cloudflare Workers typing fix + }; + } catch (e) { + this.reader.releaseLock(); // release lock when stream becomes errored + throw e; + } + } + + async return() { + this.ensureReader(); + // If wrapped in a Node stream, cancel is already called. + if (this.locked) { + const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet + this.reader.releaseLock(); // release lock first + await cancelPromise; // now await it + } + return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway + } + + async throw(e: any): Promise> { + throw e; + } + + [Symbol.asyncIterator]() { + return this; + } + + static fromReadableStream(stream: ReadableStream) { + // From https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#reading_the_stream + const reader = stream.getReader(); + return new IterableReadableStreamV0({ + start(controller) { + return pump(); + function pump(): Promise { + return reader.read().then(({ done, value }) => { + // When no more data needs to be consumed, close the stream + if (done) { + controller.close(); + return; + } + // Enqueue the next data chunk into our target stream + controller.enqueue(value); + return pump(); + }); + } + }, + cancel() { + reader.releaseLock(); + }, + }); + } + + static fromAsyncGenerator(generator: AsyncGenerator) { + return new IterableReadableStreamV0({ + async pull(controller) { + const { value, done } = await generator.next(); + // When no more data needs to be consumed, close the stream + if (done) { + controller.close(); + } + // Fix: `else if (value)` will hang the streaming when nullish value (e.g. empty string) is pulled + controller.enqueue(value); + }, + }); + } +} + +/** + * Base class for all types of messages in a conversation. It includes + * properties like `content`, `name`, and `additional_kwargs`. It also + * includes methods like `toDict()` and `_getType()`. + */ +class AIMessageV0 { + lc_namespace = ["langchain_core", "messages"]; + + lc_serializable = true; + + /** The content of the message. */ + content: string; + + /** The name of the message sender in a multi-user chat. */ + name?: string; + + /** The type of the message. */ + _getType() { + return "ai"; + } + + constructor(content: string) { + this.content = content; + } +} + +class StringPromptValueV0 { + lc_namespace = ["langchain_core", "prompt_values"]; + + lc_serializable = true; + + value: string; + + constructor(value: string) { + this.value = value; + } + + toString() { + return this.value; + } +} + +class RunnableV0 + implements RunnableInterfaceV0 +{ + protected lc_runnable = true; + + async invoke( + input: StringPromptValueV0, + _options?: Partial | undefined + ): Promise { + return new AIMessageV0(input.toString()); + } + + async batch( + inputs: StringPromptValueV0[], + options?: Partial | Partial[] | undefined, + batchOptions?: + | (RunnableBatchOptionsV0 & { returnExceptions?: false | undefined }) + | undefined + ): Promise; + + async batch( + inputs: StringPromptValueV0[], + options?: Partial | Partial[] | undefined, + batchOptions?: + | (RunnableBatchOptionsV0 & { returnExceptions: true }) + | undefined + ): Promise; + + async batch( + inputs: StringPromptValueV0[], + options?: Partial | Partial[] | undefined, + batchOptions?: RunnableBatchOptionsV0 | undefined + ): Promise; + + async batch( + inputs: StringPromptValueV0[], + options?: Partial | Partial[] | undefined, + batchOptions?: RunnableBatchOptionsV0 | undefined + ): Promise; + + async batch( + _inputs: unknown, + _options?: unknown, + _batchOptions?: unknown + ): Promise { + return []; + } + + async stream( + _input: StringPromptValueV0, + _options?: Partial | undefined + ): Promise> { + throw new Error("Not implemented"); + } + + // eslint-disable-next-line require-yield + async *transform( + _generator: AsyncGenerator, + _options: Partial + ): AsyncGenerator { + throw new Error("Not implemented"); + } +} + +test("Pipe with a class that implements a runnable interface", async () => { + const promptTemplate = PromptTemplate.fromTemplate("{input}"); + const llm = new RunnableV0(); + const outputParser = new StringOutputParser(); + const runnable = promptTemplate.pipe(llm).pipe(outputParser); + const result = await runnable.invoke({ input: "Hello world!!" }); + console.log(result); + expect(result).toBe("Hello world!!"); +}); + +test("Runnable sequence with a class that implements a runnable interface", async () => { + const promptTemplate = PromptTemplate.fromTemplate("{input}"); + const llm = new RunnableV0(); + const outputParser = new StringOutputParser(); + const runnable = RunnableSequence.from([promptTemplate, llm, outputParser]); + const result = await runnable.invoke({ input: "Hello sequence!!" }); + console.log(result); + expect(result).toBe("Hello sequence!!"); +}); diff --git a/langchain-core/src/utils/stream.ts b/langchain-core/src/utils/stream.ts index ce8c6289496c..73bc24901181 100644 --- a/langchain-core/src/utils/stream.ts +++ b/langchain-core/src/utils/stream.ts @@ -1,8 +1,15 @@ +export interface IterableReadableStreamInterface + extends ReadableStream, + AsyncGenerator {} + /* * Support async iterator syntax for ReadableStreams in all environments. * Source: https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490 */ -export class IterableReadableStream extends ReadableStream { +export class IterableReadableStream + extends ReadableStream + implements IterableReadableStreamInterface +{ public reader: ReadableStreamDefaultReader; ensureReader() { @@ -37,6 +44,17 @@ export class IterableReadableStream extends ReadableStream { return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + async throw(e: any): Promise> { + this.ensureReader(); + if (this.locked) { + const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet + this.reader.releaseLock(); // release lock first + await cancelPromise; // now await it + } + throw e; + } + [Symbol.asyncIterator]() { return this; }