Skip to content

Commit

Permalink
feat(agent): add generic line parser
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomas2D committed Sep 26, 2024
1 parent 326b309 commit e57cac3
Show file tree
Hide file tree
Showing 7 changed files with 872 additions and 12 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
"@ai-zen/node-fetch-event-source": "^2.1.4",
"@connectrpc/connect": "^1.4.0",
"@connectrpc/connect-node": "^1.4.0",
"@streamparser/json": "^0.0.21",
"ajv": "^8.17.1",
"ajv-formats": "^3.0.1",
"bee-proto": "0.0.2",
Expand All @@ -95,14 +96,15 @@
"joplin-turndown-plugin-gfm": "^1.0.12",
"js-yaml": "^4.1.0",
"json-schema-to-typescript": "^15.0.2",
"jsonrepair": "^3.8.1",
"mathjs": "^13.1.1",
"mustache": "^4.2.0",
"object-hash": "^3.0.0",
"p-queue": "^8.0.1",
"p-throttle": "^6.2.0",
"pino": "^9.3.2",
"promise-based-task": "^3.0.2",
"remeda": "^2.11.0",
"remeda": "^2.14.0",
"serialize-error": "^11.0.3",
"string-comparison": "^1.3.0",
"string-strip-html": "^13.4.8",
Expand Down
85 changes: 85 additions & 0 deletions src/agents/parsers/field.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright 2024 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { JSONParserField, ZodParserField } from "@/agents/parsers/field.js";
import { z } from "zod";
import { splitString } from "@/internals/helpers/string.js";

describe("Parser Fields", () => {
describe("JSON", () => {
it("Object", async () => {
const field = new JSONParserField({
schema: z.record(z.any()),
base: {},
});
const obj = { a: { b: { c: { d: 1 } } }, b: 2 };
const content = JSON.stringify(obj);
for (const chunk of splitString(content, { size: 5, overlap: 0 })) {
field.write(chunk);
}
await field.end();
expect(field.raw).toBe(content);
expect(JSON.stringify(field.get())).toMatchInlineSnapshot(
`"{"a":{"b":{"c":{"d":1}}},"b":2}"`,
);
});

it("String", async () => {
const field = new JSONParserField({
schema: z.string(),
base: "",
});
expect(field.getPartial()).toStrictEqual("");
field.write(`"Hello\\nworld!"`);
await field.end();
expect(field.get()).toStrictEqual(`Hello\nworld!`);
});

it("Array of booleans", async () => {
const field = new JSONParserField({
schema: z.array(z.coerce.boolean()),
base: [],
});
expect(field.getPartial()).toStrictEqual([]);
field.write("[true,false,true]");
expect(field.getPartial()).toStrictEqual([true, false, true]);
await field.end();
expect(field.get()).toStrictEqual([true, false, true]);
});
});

it("String", async () => {
const field = new ZodParserField(z.string());
const content = "Hello world!";
for (const chunk of splitString(content, { size: 2, overlap: 0 })) {
field.write(chunk);
}
await field.end();
expect(field.raw).toBe(content);
expect(field.get()).toStrictEqual(content);
});

describe("Zod", () => {
it("Number", async () => {
const field = new ZodParserField(z.coerce.number().int());
expect(field.getPartial()).toBe("");
field.write("1000");
expect(field.getPartial()).toBe("1000");
await field.end();
expect(field.get()).toBe(1000);
});
});
});
150 changes: 150 additions & 0 deletions src/agents/parsers/field.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* Copyright 2024 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { z, ZodSchema } from "zod";
import { setProp } from "@/internals/helpers/object.js";
import { ValueError } from "@/errors.js";
import { Serializable } from "@/internals/serializable.js";
import { JSONParser } from "@streamparser/json";
import { jsonrepairTransform } from "jsonrepair/stream";
import { Cache, SingletonCacheKeyFn } from "@/cache/decoratorCache.js";
import { shallowCopy } from "@/serializer/utils.js";

export abstract class ParserField<T, TPartial> extends Serializable {
public raw = "";

abstract get(): T;
abstract getPartial(): TPartial;

write(chunk: string) {
this.raw += chunk;
}

async end() {}

createSnapshot() {
return { raw: this.raw };
}

loadSnapshot(snapshot: ReturnType<typeof this.createSnapshot>) {
Object.assign(this, snapshot);
}
}

export class ZodParserField<T extends ZodSchema> extends ParserField<z.output<T>, string> {
static {
this.register();
}

constructor(protected readonly schema: T) {
super();
}

get() {
return this.schema.parse(this.raw);
}

getPartial() {
return this.raw;
}

createSnapshot() {
return { ...super.createSnapshot(), schema: this.schema };
}
}

export class JSONParserField<T extends ZodSchema> extends ParserField<
z.output<T>,
Partial<z.output<T>>
> {
protected stream!: ReturnType<typeof jsonrepairTransform>;
protected jsonParser!: JSONParser;
protected ref!: { value: z.output<T> };

constructor(protected readonly input: { schema: T; base: Partial<z.output<T>> }) {
super();
if (input.base === undefined) {
throw new ValueError(`Base must be defined!`);
}
this.init();
}

@Cache({ cacheKey: SingletonCacheKeyFn })
protected init() {
this.ref = { value: shallowCopy(this.input.base) };
this.jsonParser = new JSONParser({ emitPartialTokens: false, emitPartialValues: true });
this.stream = jsonrepairTransform();
this.stream.on("data", (chunk) => {
this.jsonParser.write(chunk.toString());
});
this.jsonParser.onValue = ({ value, key, stack }) => {
const keys = stack
.map((s) => s.key)
.concat(key)
.filter((s) => s !== undefined)
.map(String);

if (keys.length === 0 && value === undefined) {
return;
}
const prefix: keyof typeof this.ref = "value";
setProp(this.ref, [prefix, ...keys], value);
};
}

write(chunk: string) {
super.write(chunk);
this.stream.push(chunk);
}

get() {
return this.input.schema.parse(this.ref.value);
}

getPartial() {
return this.ref.value;
}

async end() {
if (this.stream.closed || this.jsonParser.isEnded) {
return;
}

return new Promise<void>((resolve, reject) => {
this.jsonParser.onEnd = resolve;
this.jsonParser.onError = reject;

this.stream.push(null);
this.jsonParser.end();
});
}

createSnapshot() {
return { ...super.createSnapshot(), input: this.input };
}

loadSnapshot({ raw, ...snapshot }: ReturnType<typeof this.createSnapshot>) {
Object.assign(this, { raw: "", ...snapshot });
this.init();
this.write(raw);
}
}

// eslint-disable-next-line @typescript-eslint/no-namespace
export namespace ParserField {
export type inferValue<T> = T extends ParserField<infer L, any> ? L : never;
export type inferPartialValue<T> = T extends ParserField<any, infer L> ? L : never;
}
Loading

0 comments on commit e57cac3

Please sign in to comment.