Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[minor]: Runnable with message history #3437

Merged
merged 39 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
df9a9f1
Runnable with message history
bracesproul Nov 29, 2023
a22855e
cr
bracesproul Nov 29, 2023
66ed823
Merge branch 'main' into brace/runnable-message-history
bracesproul Nov 29, 2023
add083f
cr
bracesproul Nov 29, 2023
05b71ea
adds withListeners method to runnables/callbacks
bracesproul Nov 29, 2023
2cf91d5
added entrypoint for root listener file
bracesproul Nov 29, 2023
21ce187
cr
bracesproul Nov 29, 2023
a8b2dbd
cr
bracesproul Nov 29, 2023
48e1f7e
cr
bracesproul Nov 29, 2023
a56cd19
cr
bracesproul Nov 29, 2023
89d689d
cr
bracesproul Nov 29, 2023
9fd9eaf
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Nov 29, 2023
be79ae6
support async listeners
bracesproul Nov 29, 2023
ad795af
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Nov 29, 2023
b71e30d
allow for run or run and config as args to listener funcs
bracesproul Nov 29, 2023
b7575a4
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Nov 29, 2023
e8432ac
cr
bracesproul Nov 30, 2023
00ab504
chore: lint files
bracesproul Nov 30, 2023
08698aa
cr
bracesproul Nov 30, 2023
b055337
cr
bracesproul Nov 30, 2023
8e23d5d
eslint disbale any
bracesproul Nov 30, 2023
a5576a8
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Nov 30, 2023
485b915
Merge branch 'main' into brace/with-listeners
bracesproul Nov 30, 2023
4394aa0
Merge branch 'main' into brace/with-listeners
bracesproul Dec 1, 2023
670f4f1
update types
bracesproul Dec 1, 2023
0a09cda
Merge branch 'main' into brace/with-listeners
bracesproul Dec 1, 2023
3b76fc4
Merge branch 'main' into brace/runnable-message-history
bracesproul Dec 1, 2023
bc8cdbf
Merge branch 'main' into brace/with-listeners
bracesproul Dec 1, 2023
7eb2ef9
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Dec 1, 2023
cec10ac
cr
bracesproul Dec 1, 2023
c9d3402
cr
bracesproul Dec 1, 2023
12b2d9b
merge main
bracesproul Dec 1, 2023
b465628
cr
bracesproul Dec 2, 2023
6df6e5d
Merge branch 'main' into brace/runnable-message-history
bracesproul Dec 2, 2023
dcbc640
cr
bracesproul Dec 2, 2023
5e113e9
cr
bracesproul Dec 2, 2023
1708764
Merge branch 'main' into brace/runnable-message-history
bracesproul Dec 2, 2023
52ceeea
cr
bracesproul Dec 2, 2023
f3f6c79
Style
jacoblee93 Dec 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions langchain-core/src/callbacks/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ export interface BaseCallbackConfig {
* Tags are passed to all callbacks, metadata is passed to handle*Start callbacks.
*/
callbacks?: Callbacks;

/**
* Runtime values for attributes previously made configurable on this Runnable,
* or sub-Runnables.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
configurable?: Record<string, any>;
}

export function parseCallbackConfigArg(
Expand Down Expand Up @@ -484,9 +491,9 @@ export class CallbackManager
extends BaseCallbackManager
implements BaseCallbackManagerMethods
{
handlers: BaseCallbackHandler[];
handlers: BaseCallbackHandler[] = [];

inheritableHandlers: BaseCallbackHandler[];
inheritableHandlers: BaseCallbackHandler[] = [];

tags: string[] = [];

Expand All @@ -500,10 +507,26 @@ export class CallbackManager

private readonly _parentRunId?: string;

constructor(parentRunId?: string) {
constructor(
parentRunId?: string,
options?: {
handlers?: BaseCallbackHandler[];
inheritableHandlers?: BaseCallbackHandler[];
tags?: string[];
inheritableTags?: string[];
metadata?: Record<string, unknown>;
inheritableMetadata?: Record<string, unknown>;
}
) {
super();
this.handlers = [];
this.inheritableHandlers = [];
this.handlers = options?.handlers ?? this.handlers;
this.inheritableHandlers =
options?.inheritableHandlers ?? this.inheritableHandlers;
this.tags = options?.tags ?? this.tags;
this.inheritableTags = options?.inheritableTags ?? this.inheritableTags;
this.metadata = options?.metadata ?? this.metadata;
this.inheritableMetadata =
options?.inheritableMetadata ?? this.inheritableMetadata;
this._parentRunId = parentRunId;
}

Expand Down
30 changes: 30 additions & 0 deletions langchain-core/src/chat_history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,33 @@ export abstract class BaseListChatMessageHistory extends Serializable {
return this.addMessage(new AIMessage(message));
}
}

export class FakeChatMessageHistory extends BaseChatMessageHistory {
lc_namespace = ["langchain", "core", "message", "fake"];

messages: Array<BaseMessage> = [];

constructor() {
super();
}

async getMessages(): Promise<BaseMessage[]> {
return this.messages;
}

async addMessage(message: BaseMessage): Promise<void> {
this.messages.push(message);
}

async addUserMessage(message: string): Promise<void> {
this.messages.push(new HumanMessage(message));
}

async addAIChatMessage(message: string): Promise<void> {
this.messages.push(new AIMessage(message));
}

async clear(): Promise<void> {
this.messages = [];
}
}
181 changes: 151 additions & 30 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@ import {
} from "../tracers/log_stream.js";
import { Serializable } from "../load/serializable.js";
import { IterableReadableStream } from "../utils/stream.js";
import { RunnableConfig, getCallbackMangerForConfig } from "./config.js";
import {
RunnableConfig,
getCallbackMangerForConfig,
mergeConfigs,
} from "./config.js";
import { AsyncCaller } from "../utils/async_caller.js";
import { Run } from "../tracers/base.js";
import { RootListenersTracer } from "../tracers/root_listener.js";

export type RunnableFunc<RunInput, RunOutput> = (
input: RunInput
input: RunInput,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
options?: Record<string, any> & {
config?: RunnableConfig;
}
) => RunOutput | Promise<RunOutput>;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -549,6 +559,45 @@ export abstract class Runnable<
static isRunnable(thing: any): thing is Runnable {
return thing ? thing.lc_runnable : false;
}

/**
* Bind lifecycle listeners to a Runnable, returning a new Runnable.
* The Run object contains information about the run, including its id,
* type, input, output, error, startTime, endTime, and any tags or metadata
* added to the run.
*
* @param {Object} params - The object containing the callback functions.
* @param {(run: Run) => void} params.onStart - Called before the runnable starts running, with the Run object.
* @param {(run: Run) => void} params.onEnd - Called after the runnable finishes running, with the Run object.
* @param {(run: Run) => void} params.onError - Called if the runnable throws an error, with the Run object.
*/
withListeners({
onStart,
onEnd,
onError,
}: {
onStart?: (run: Run, config?: RunnableConfig) => void | Promise<void>;
onEnd?: (run: Run, config?: RunnableConfig) => void | Promise<void>;
onError?: (run: Run, config?: RunnableConfig) => void | Promise<void>;
}): Runnable<RunInput, RunOutput, CallOptions> {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
return new RunnableBinding<RunInput, RunOutput, CallOptions>({
bound: this,
config: {},
configFactories: [
(config) => ({
callbacks: [
new RootListenersTracer({
config,
onStart,
onEnd,
onError,
}),
],
}),
],
});
}
}

export type RunnableBindingArgs<
Expand All @@ -557,8 +606,9 @@ export type RunnableBindingArgs<
CallOptions extends RunnableConfig
> = {
bound: Runnable<RunInput, RunOutput, CallOptions>;
kwargs: Partial<CallOptions>;
kwargs?: Partial<CallOptions>;
config: RunnableConfig;
configFactories?: Array<(config: RunnableConfig) => RunnableConfig>;
};

/**
Expand All @@ -581,31 +631,35 @@ export class RunnableBinding<

config: RunnableConfig;

protected kwargs: Partial<CallOptions>;
protected kwargs?: Partial<CallOptions>;

configFactories?: Array<
(config: RunnableConfig) => RunnableConfig | Promise<RunnableConfig>
>;

constructor(fields: RunnableBindingArgs<RunInput, RunOutput, CallOptions>) {
super(fields);
this.bound = fields.bound;
this.kwargs = fields.kwargs;
this.config = fields.config;
this.configFactories = fields.configFactories;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
_mergeConfig(options?: Record<string, any>) {
async _mergeConfig(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const copy: Record<string, any> = { ...this.config };
if (options) {
for (const key of Object.keys(options)) {
if (key === "metadata") {
copy[key] = { ...copy[key], ...options[key] };
} else if (key === "tags") {
copy[key] = (copy[key] ?? []).concat(options[key] ?? []);
} else {
copy[key] = options[key] ?? copy[key];
}
}
}
return copy as Partial<CallOptions>;
options?: Record<string, any>
): Promise<Partial<CallOptions>> {
const config = mergeConfigs<CallOptions>(this.config, options);
return mergeConfigs<CallOptions>(
config,
...(this.configFactories
? await Promise.all(
this.configFactories.map(
async (configFactory) => await configFactory(config)
)
)
: [])
);
}

bind(
Expand Down Expand Up @@ -645,7 +699,7 @@ export class RunnableBinding<
): Promise<RunOutput> {
return this.bound.invoke(
input,
this._mergeConfig({ ...options, ...this.kwargs })
await this._mergeConfig({ ...options, ...this.kwargs })
);
}

Expand Down Expand Up @@ -673,13 +727,15 @@ export class RunnableBinding<
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]> {
const mergedOptions = Array.isArray(options)
? options.map((individualOption) =>
this._mergeConfig({
...individualOption,
...this.kwargs,
})
? await Promise.all(
options.map(async (individualOption) =>
this._mergeConfig({
...individualOption,
...this.kwargs,
})
)
)
: this._mergeConfig({ ...options, ...this.kwargs });
: await this._mergeConfig({ ...options, ...this.kwargs });
return this.bound.batch(inputs, mergedOptions, batchOptions);
}

Expand All @@ -689,7 +745,7 @@ export class RunnableBinding<
) {
yield* this.bound._streamIterator(
input,
this._mergeConfig({ ...options, ...this.kwargs })
await this._mergeConfig({ ...options, ...this.kwargs })
);
}

Expand All @@ -699,7 +755,7 @@ export class RunnableBinding<
): Promise<IterableReadableStream<RunOutput>> {
return this.bound.stream(
input,
this._mergeConfig({ ...options, ...this.kwargs })
await this._mergeConfig({ ...options, ...this.kwargs })
);
}

Expand All @@ -710,7 +766,7 @@ export class RunnableBinding<
): AsyncGenerator<RunOutput> {
yield* this.bound.transform(
generator,
this._mergeConfig({ ...options, ...this.kwargs })
await this._mergeConfig({ ...options, ...this.kwargs })
);
}

Expand All @@ -721,6 +777,45 @@ export class RunnableBinding<
): thing is RunnableBinding<any, any, any> {
return thing.bound && Runnable.isRunnable(thing.bound);
}

/**
* Bind lifecycle listeners to a Runnable, returning a new Runnable.
* The Run object contains information about the run, including its id,
* type, input, output, error, startTime, endTime, and any tags or metadata
* added to the run.
*
* @param {Object} params - The object containing the callback functions.
* @param {(run: Run) => void} params.onStart - Called before the runnable starts running, with the Run object.
* @param {(run: Run) => void} params.onEnd - Called after the runnable finishes running, with the Run object.
* @param {(run: Run) => void} params.onError - Called if the runnable throws an error, with the Run object.
*/
withListeners({
onStart,
onEnd,
onError,
}: {
onStart?: (run: Run, config?: RunnableConfig) => void | Promise<void>;
onEnd?: (run: Run, config?: RunnableConfig) => void | Promise<void>;
onError?: (run: Run, config?: RunnableConfig) => void | Promise<void>;
}): Runnable<RunInput, RunOutput, CallOptions> {
return new RunnableBinding<RunInput, RunOutput, CallOptions>({
bound: this.bound,
kwargs: this.kwargs,
config: this.config,
configFactories: [
(config) => ({
callbacks: [
new RootListenersTracer({
config,
onStart,
onEnd,
onError,
}),
],
}),
],
});
}
}

/**
Expand Down Expand Up @@ -789,6 +884,32 @@ export class RunnableEach<
this._patchConfig(config, runManager?.getChild())
);
}

/**
* Bind lifecycle listeners to a Runnable, returning a new Runnable.
* The Run object contains information about the run, including its id,
* type, input, output, error, startTime, endTime, and any tags or metadata
* added to the run.
*
* @param {Object} params - The object containing the callback functions.
* @param {(run: Run) => void} params.onStart - Called before the runnable starts running, with the Run object.
* @param {(run: Run) => void} params.onEnd - Called after the runnable finishes running, with the Run object.
* @param {(run: Run) => void} params.onError - Called if the runnable throws an error, with the Run object.
*/
withListeners({
onStart,
onEnd,
onError,
}: {
onStart?: (run: Run, config?: RunnableConfig) => void | Promise<void>;
onEnd?: (run: Run, config?: RunnableConfig) => void | Promise<void>;
onError?: (run: Run, config?: RunnableConfig) => void | Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}): Runnable<any, any, CallOptions> {
return new RunnableEach<RunInputItem, RunOutputItem, CallOptions>({
bound: this.bound.withListeners({ onStart, onEnd, onError }),
});
}
}

/**
Expand Down Expand Up @@ -1382,7 +1503,7 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
config?: Partial<BaseCallbackConfig>,
runManager?: CallbackManagerForChainRun
) {
let output = await this.func(input);
let output = await this.func(input, { config });
if (output && Runnable.isRunnable(output)) {
output = await output.invoke(
input,
Expand Down
Loading