Skip to content

Commit

Permalink
feat(agent): improve invalid json parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomas2D committed Sep 26, 2024
1 parent 59036b1 commit da7c7e6
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 13 deletions.
21 changes: 21 additions & 0 deletions src/agents/parsers/field.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ describe("Parser Fields", () => {
});
});

describe("Invalid JSON", () => {
it("Object", async () => {
const field = new JSONParserField({
schema: z.object({}).passthrough(),
base: {},
matchPair: ["{", "}"],
});

const validPart = `{"a":{"b":{"c":{"d":1}}},"b":2}`;
const invalidPart = `{"a":{"b":{"c":{"d":1}}},"b":2,}`;

const content = `Here is the object that you were asking for: ${invalidPart} Thank you!`;
for (const chunk of splitString(content, { size: 4, overlap: 0 })) {
field.write(chunk);
}
await field.end();
expect(field.raw).toBe(invalidPart);
expect(JSON.stringify(field.get())).toBe(validPart);
});
});

it("String", async () => {
const field = new ZodParserField(z.string());
const content = "Hello world!";
Expand Down
60 changes: 54 additions & 6 deletions src/agents/parsers/field.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import { JSONParser } from "@streamparser/json";
import { jsonrepairTransform } from "jsonrepair/stream";
import { Cache, SingletonCacheKeyFn } from "@/cache/decoratorCache.js";
import { shallowCopy } from "@/serializer/utils.js";
import { parseBrokenJson } from "@/internals/helpers/schema.js";
import { findFirstPair } from "@/internals/helpers/string.js";

export abstract class ParserField<T, TPartial> extends Serializable {
public raw = "";
Expand Down Expand Up @@ -69,9 +71,16 @@ export class ZodParserField<T> extends ParserField<T, string> {
export class JSONParserField<T> extends ParserField<T, Partial<T>> {
protected stream!: ReturnType<typeof jsonrepairTransform>;
protected jsonParser!: JSONParser;
protected errored = false;
protected ref!: { value: Partial<T> };

constructor(protected readonly input: { schema: ZodSchema<T>; base: Partial<T> }) {
constructor(
protected readonly input: {
schema: ZodSchema<T>;
base: Partial<T>;
matchPair?: [string, string];
},
) {
super();
if (input.base === undefined) {
throw new ValueError(`Base must be defined!`);
Expand All @@ -85,7 +94,15 @@ export class JSONParserField<T> extends ParserField<T, Partial<T>> {
this.jsonParser = new JSONParser({ emitPartialTokens: false, emitPartialValues: true });
this.stream = jsonrepairTransform();
this.stream.on("data", (chunk) => {
this.jsonParser.write(chunk.toString());
if (this.errored) {
return;
}

try {
this.jsonParser.write(chunk.toString());
} catch {
this.errored = true;
}
});
this.jsonParser.onValue = ({ value, key, stack }) => {
const keys = stack
Expand All @@ -103,20 +120,51 @@ export class JSONParserField<T> extends ParserField<T, Partial<T>> {
}

write(chunk: string) {
if (this.input.matchPair) {
if (!this.raw) {
const startChar = this.input.matchPair[0];
const index = chunk.indexOf(startChar);
if (index === -1) {
return;
}
chunk = chunk.substring(index);
} else {
const merged = this.raw.concat(chunk);
const match = findFirstPair(merged, this.input.matchPair);
if (match) {
const end = match[1];
if (end < this.raw.length) {
return;
}
chunk = merged.substring(this.raw.length, end + 1);
}
}
}

super.write(chunk);
this.stream.push(chunk);
try {
this.stream.push(chunk);
} catch {
this.errored = true;
}
}

get() {
return this.input.schema.parse(this.ref.value);
const inputToParse = this.errored
? parseBrokenJson(this.raw, {
pair: this.input.matchPair,
})
: this.ref.value;

return this.input.schema.parse(inputToParse);
}

getPartial() {
return this.ref.value;
}

async end() {
if (this.stream.closed || this.jsonParser.isEnded) {
if (this.stream.closed || this.jsonParser.isEnded || this.errored) {
return;
}

Expand All @@ -130,7 +178,7 @@ export class JSONParserField<T> extends ParserField<T, Partial<T>> {
}

createSnapshot() {
return { ...super.createSnapshot(), input: this.input };
return { ...super.createSnapshot(), input: this.input, errored: this.errored };
}

loadSnapshot({ raw, ...snapshot }: ReturnType<typeof this.createSnapshot>) {
Expand Down
26 changes: 19 additions & 7 deletions src/agents/parsers/linePrefix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { shallowCopy } from "@/serializer/utils.js";
import { Cache } from "@/cache/decoratorCache.js";
import { ParserField } from "@/agents/parsers/field.js";
import { Callback, InferCallbackValue } from "@/emitter/types.js";
import { ZodError } from "zod";

export interface ParserNode<T extends string, P extends ParserField<any, any>> {
prefix: string;
Expand Down Expand Up @@ -251,13 +252,24 @@ export class LinePrefixParser<
if (key in this.finalState) {
throw new LinePrefixParserError(`Duplicated key '${key}'`);
}
const value = field.get();
this.finalState[key] = value;
await this.emitter.emit("update", {
key,
field,
value,
});

try {
const value = field.get();
this.finalState[key] = value;
await this.emitter.emit("update", {
key,
field,
value,
});
} catch (e) {
if (e instanceof ZodError) {
throw new LinePrefixParserError(
`Value for ${key} cannot be retrieved because it's value does not adhere to the appropriate schema.`,
[e],
);
}
throw e;
}
}

@Cache()
Expand Down

0 comments on commit da7c7e6

Please sign in to comment.