Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[patch]: Use an interface for runnables to allow more compatibility between core versions #3684

Merged
merged 4 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import {
RunLogPatch,
} from "../tracers/log_stream.js";
import { Serializable } from "../load/serializable.js";
import { IterableReadableStream } from "../utils/stream.js";
import {
IterableReadableStream,
type IterableReadableStreamInterface
} from "../utils/stream.js";
import {
RunnableConfig,
getCallbackMangerForConfig,
Expand All @@ -21,6 +24,48 @@ import { AsyncCaller } from "../utils/async_caller.js";
import { Run } from "../tracers/base.js";
import { RootListenersTracer } from "../tracers/root_listener.js";

/**
* Base interface implemented by all runnables.
* Used for cross-compatibility between different versions of LangChain core.
*
* Should not change on patch releases.
*/
export interface RunnableInterface<RunInput, RunOutput, CallOptions extends RunnableConfig = RunnableConfig> {
invoke(
input: RunInput,
options?: Partial<CallOptions>
): Promise<RunOutput>;

batch(
inputs: RunInput[],
options?: Partial<CallOptions> | Partial<CallOptions>[],
batchOptions?: RunnableBatchOptions & { returnExceptions?: false }
): Promise<RunOutput[]>;

batch(
inputs: RunInput[],
options?: Partial<CallOptions> | Partial<CallOptions>[],
batchOptions?: RunnableBatchOptions & { returnExceptions: true }
): Promise<(RunOutput | Error)[]>;

batch(
inputs: RunInput[],
options?: Partial<CallOptions> | Partial<CallOptions>[],
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]>;

batch(
inputs: RunInput[],
options?: Partial<CallOptions> | Partial<CallOptions>[],
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]>;

stream(
input: RunInput,
options?: Partial<CallOptions>
): Promise<IterableReadableStreamInterface<RunOutput>>;
}

export type RunnableFunc<RunInput, RunOutput> = (
input: RunInput,
options?:
Expand All @@ -37,7 +82,8 @@ export type RunnableMapLike<RunInput, RunOutput> = {

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type RunnableLike<RunInput = any, RunOutput = any> =
| Runnable<RunInput, RunOutput>
// eslint-disable-next-line @typescript-eslint/no-explicit-any
| RunnableInterface<RunInput, RunOutput>
| RunnableFunc<RunInput, RunOutput>
| RunnableMapLike<RunInput, RunOutput>;

Expand Down Expand Up @@ -66,7 +112,7 @@ export abstract class Runnable<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
RunOutput = any,
CallOptions extends RunnableConfig = RunnableConfig
> extends Serializable {
> extends Serializable implements RunnableInterface<RunInput, RunOutput, CallOptions> {
protected lc_runnable = true;

abstract invoke(
Expand Down
206 changes: 206 additions & 0 deletions langchain-core/src/runnables/tests/runnable_interface.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/* eslint-disable no-promise-executor-return */
/* eslint-disable @typescript-eslint/no-explicit-any */

import { StringOutputParser } from "../../output_parsers/string.js";
import { PromptTemplate } from "../../prompts/prompt.js";
import { RunnableSequence } from "../base.js";

type RunnableBatchOptionsV0 = {
maxConcurrency?: number;
returnExceptions?: boolean;
};

interface RunnableInterfaceV0<RunInput, RunOutput, CallOptions = any> {
invoke(
input: RunInput,
options?: Partial<CallOptions>
): Promise<RunOutput>;

batch(
inputs: RunInput[],
options?: Partial<CallOptions> | Partial<CallOptions>[],
batchOptions?: RunnableBatchOptionsV0 & { returnExceptions?: false }
): Promise<RunOutput[]>;

batch(
inputs: RunInput[],
options?: Partial<CallOptions> | Partial<CallOptions>[],
batchOptions?: RunnableBatchOptionsV0 & { returnExceptions: true }
): Promise<(RunOutput | Error)[]>;

batch(
inputs: RunInput[],
options?: Partial<CallOptions> | Partial<CallOptions>[],
batchOptions?: RunnableBatchOptionsV0
): Promise<(RunOutput | Error)[]>;

batch(
inputs: RunInput[],
options?: Partial<CallOptions> | Partial<CallOptions>[],
batchOptions?: RunnableBatchOptionsV0
): Promise<(RunOutput | Error)[]>;

stream(
input: RunInput,
options?: Partial<CallOptions>
): Promise<IterableReadableStreamV0<RunOutput>>;
}

export class IterableReadableStreamV0<T> extends ReadableStream<T> {
jacoblee93 marked this conversation as resolved.
Show resolved Hide resolved
public reader: ReadableStreamDefaultReader<T>;

ensureReader() {
if (!this.reader) {
this.reader = this.getReader();
}
}

async next() {
this.ensureReader();
try {
const result = await this.reader.read();
if (result.done) this.reader.releaseLock(); // release lock when stream becomes closed
return {
done: result.done,
value: result.value as T, // Cloudflare Workers typing fix
};
} catch (e) {
this.reader.releaseLock(); // release lock when stream becomes errored
throw e;
}
}

async return() {
this.ensureReader();
// If wrapped in a Node stream, cancel is already called.
if (this.locked) {
const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
this.reader.releaseLock(); // release lock first
await cancelPromise; // now await it
}
return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway
}

async throw(e: any): Promise<IteratorResult<T>> {
throw e;
}

[Symbol.asyncIterator]() {
return this;
}

static fromReadableStream<T>(stream: ReadableStream<T>) {
// From https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#reading_the_stream
const reader = stream.getReader();
return new IterableReadableStreamV0<T>({
start(controller) {
return pump();
function pump(): Promise<T | undefined> {
return reader.read().then(({ done, value }) => {
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
return;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
});
}
},
cancel() {
reader.releaseLock();
},
});
}

static fromAsyncGenerator<T>(generator: AsyncGenerator<T>) {
return new IterableReadableStreamV0<T>({
async pull(controller) {
const { value, done } = await generator.next();
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
}
// Fix: `else if (value)` will hang the streaming when nullish value (e.g. empty string) is pulled
controller.enqueue(value);
},
});
}
}

/**
* Base class for all types of messages in a conversation. It includes
* properties like `content`, `name`, and `additional_kwargs`. It also
* includes methods like `toDict()` and `_getType()`.
*/
export class AIMessageV0 {
lc_namespace = ["langchain_core", "messages"];

lc_serializable = true;

/** The content of the message. */
content: string;

/** The name of the message sender in a multi-user chat. */
name?: string;

/** The type of the message. */
_getType() {
return "ai";
};

constructor(
content: string
) {
this.content = content;
}
}

class RunnableV0 implements RunnableInterfaceV0<any, AIMessageV0> {
protected lc_runnable = true;

async invoke(input: any, _options?: Partial<any> | undefined): Promise<AIMessageV0> {
return new AIMessageV0(input.toString());
}

async batch(inputs: any[], options?: Partial<any> | Partial<any>[] | undefined, batchOptions?: (RunnableBatchOptionsV0 & { returnExceptions?: false | undefined; }) | undefined): Promise<any[]>;
async batch(inputs: any[], options?: Partial<any> | Partial<any>[] | undefined, batchOptions?: (RunnableBatchOptionsV0 & { returnExceptions: true; }) | undefined): Promise<any[]>;
async batch(inputs: any[], options?: Partial<any> | Partial<any>[] | undefined, batchOptions?: RunnableBatchOptionsV0 | undefined): Promise<any[]>;
async batch(inputs: any[], options?: Partial<any> | Partial<any>[] | undefined, batchOptions?: RunnableBatchOptionsV0 | undefined): Promise<any[]>;
async batch(inputs: unknown, _options?: unknown, _batchOptions?: unknown): Promise<any[]> {
return [inputs];
}

async stream(input: any, _options?: Partial<any> | undefined): Promise<IterableReadableStreamV0<any>> {
return input;
}

async *transform(generator: AsyncGenerator<any, any, unknown>, _options: Partial<any>): AsyncGenerator<any, any, unknown> {
yield *generator;
}
}

test("Pipe with a class that implements a runnable interface", async () => {
const promptTemplate = PromptTemplate.fromTemplate("{input}");
const llm = new RunnableV0();
const outputParser = new StringOutputParser();
const runnable = promptTemplate.pipe(llm).pipe(outputParser);
const result = await runnable.invoke({ input: "Hello world!!" });
console.log(result);
expect(result).toBe("Hello world!!");
});

test("Runnable sequence with a class that implements a runnable interface", async () => {
const promptTemplate = PromptTemplate.fromTemplate("{input}");
const llm = new RunnableV0();
const outputParser = new StringOutputParser();
const runnable = RunnableSequence.from([
promptTemplate,
llm,
outputParser,
])
const result = await runnable.invoke({ input: "Hello sequence!!" });
console.log(result);
expect(result).toBe("Hello sequence!!");
});
15 changes: 14 additions & 1 deletion langchain-core/src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
export interface IterableReadableStreamInterface<T> extends ReadableStream<T>, AsyncGenerator<T> {}

/*
* Support async iterator syntax for ReadableStreams in all environments.
* Source: https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
*/
export class IterableReadableStream<T> extends ReadableStream<T> {
export class IterableReadableStream<T> extends ReadableStream<T> implements IterableReadableStreamInterface<T> {
public reader: ReadableStreamDefaultReader<T>;

ensureReader() {
Expand Down Expand Up @@ -36,6 +38,17 @@ export class IterableReadableStream<T> extends ReadableStream<T> {
}
return { done: true, value: undefined as T }; // This cast fixes TS typing, and convention is to ignore final chunk value anyway
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
async throw(e: any): Promise<IteratorResult<T>> {
this.ensureReader();
if (this.locked) {
const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
this.reader.releaseLock(); // release lock first
await cancelPromise; // now await it
}
throw e;
}

[Symbol.asyncIterator]() {
return this;
Expand Down
Loading