Skip to content

Commit

Permalink
fix: streaming JSON types and error reporting tests (#315)
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Howard <[email protected]>
  • Loading branch information
stefl and codeincontext authored Nov 20, 2024
1 parent ba5618e commit 3e18b44
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 28 deletions.
25 changes: 21 additions & 4 deletions apps/nextjs/src/app/api/chat/errorHandling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ describe("handleChatException", () => {

const span = { setTag: jest.fn() } as unknown as TracingSpan;
const error = new AilaThreatDetectionError("user_abc", "test error");
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const prisma = {} as unknown as PrismaClientWithAccelerate;

const response = await handleChatException(
Expand All @@ -53,6 +54,7 @@ describe("handleChatException", () => {
it("should return an error chat message", async () => {
const span = { setTag: jest.fn() } as unknown as TracingSpan;
const error = new AilaAuthenticationError("test error");
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const prisma = {} as unknown as PrismaClientWithAccelerate;

const response = await handleChatException(
Expand All @@ -64,7 +66,12 @@ describe("handleChatException", () => {

expect(response.status).toBe(401);

const message = await consumeStream(response.body as ReadableStream);
invariant(
response.body instanceof ReadableStream,
"Expected response.body to be a ReadableStream",
);

const message = await consumeStream(response.body);
expect(message).toEqual("Unauthorized");
});
});
Expand All @@ -77,6 +84,7 @@ describe("handleChatException", () => {
100,
Date.now() + 3600 * 1000,
);
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const prisma = {} as unknown as PrismaClientWithAccelerate;

const response = await handleChatException(
Expand All @@ -88,7 +96,12 @@ describe("handleChatException", () => {

expect(response.status).toBe(200);

const consumed = await consumeStream(response.body as ReadableStream);
invariant(
response.body instanceof ReadableStream,
"Expected response.body to be a ReadableStream",
);

const consumed = await consumeStream(response.body);
const message = extractStreamMessage(consumed);

expect(message).toEqual({
Expand All @@ -104,6 +117,7 @@ describe("handleChatException", () => {
it("should return an error chat message", async () => {
const span = { setTag: jest.fn() } as unknown as TracingSpan;
const error = new UserBannedError("test error");
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const prisma = {} as unknown as PrismaClientWithAccelerate;

const response = await handleChatException(
Expand All @@ -115,9 +129,12 @@ describe("handleChatException", () => {

expect(response.status).toBe(200);

const message = extractStreamMessage(
await consumeStream(response.body as ReadableStream),
invariant(
response.body instanceof ReadableStream,
"Expected response.body to be a ReadableStream",
);

const message = extractStreamMessage(await consumeStream(response.body));
expect(message).toEqual({
type: "action",
action: "SHOW_ACCOUNT_LOCKED",
Expand Down
47 changes: 28 additions & 19 deletions apps/nextjs/src/app/api/chat/errorHandling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,43 +50,52 @@ async function handleThreatDetectionError(
async function handleAilaAuthenticationError(
span: TracingSpan,
e: AilaAuthenticationError,
) {
): Promise<Response> {
reportErrorTelemetry(span, e, "AilaAuthenticationError", "Unauthorized");
return new Response("Unauthorized", { status: 401 });
return Promise.resolve(new Response("Unauthorized", { status: 401 }));
}

export async function handleRateLimitError(
span: TracingSpan,
error: RateLimitExceededError,
) {
): Promise<Response> {
reportErrorTelemetry(span, error, "RateLimitExceededError", "Rate limited");

const timeRemainingHours = Math.ceil(
(error.reset - Date.now()) / 1000 / 60 / 60,
);
const hours = timeRemainingHours === 1 ? "hour" : "hours";

return streamingJSON({
type: "error",
value: error.message,
message: `**Unfortunately you’ve exceeded your fair usage limit for today.** Please come back in ${timeRemainingHours} ${hours}. If you require a higher limit, please [make a request](${process.env.RATELIMIT_FORM_URL}).`,
} as ErrorDocument);
return Promise.resolve(
streamingJSON({
type: "error",
value: error.message,
message: `**Unfortunately you’ve exceeded your fair usage limit for today.** Please come back in ${timeRemainingHours} ${hours}. If you require a higher limit, please [make a request](${process.env.RATELIMIT_FORM_URL}).`,
} as ErrorDocument),
);
}

async function handleUserBannedError() {
return streamingJSON({
type: "action",
action: "SHOW_ACCOUNT_LOCKED",
} as ActionDocument);
async function handleUserBannedError(): Promise<Response> {
return Promise.resolve(
streamingJSON({
type: "action",
action: "SHOW_ACCOUNT_LOCKED",
} as ActionDocument),
);
}

async function handleGenericError(span: TracingSpan, e: Error) {
async function handleGenericError(
span: TracingSpan,
e: Error,
): Promise<Response> {
reportErrorTelemetry(span, e, e.name, e.message);
return streamingJSON({
type: "error",
message: e.message,
value: `Sorry, an error occurred: ${e.message}`,
} as ErrorDocument);
return Promise.resolve(
streamingJSON({
type: "error",
message: e.message,
value: `Sorry, an error occurred: ${e.message}`,
} as ErrorDocument),
);
}

export async function handleChatException(
Expand Down
2 changes: 1 addition & 1 deletion apps/nextjs/src/app/api/chat/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export function streamingJSON(message: ErrorDocument | ActionDocument) {

return new StreamingTextResponse(
new ReadableStream({
async start(controller) {
start(controller) {
controller.enqueue(errorEncoder.encode(errorMessage));
controller.close();
},
Expand Down
30 changes: 26 additions & 4 deletions apps/nextjs/src/utils/testHelpers/consumeStream.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,47 @@
import { z } from "zod";

const UnknownStreamMessageSchema = z
.object({
type: z.string(),
})
.passthrough();

type UnknownStreamMessage = z.infer<typeof UnknownStreamMessageSchema>;
export async function consumeStream(
stream: ReadableStream<Uint8Array>,
): Promise<string> {
const reader = stream.getReader();
let result = "";
let isComplete = false;
const maxTimeout = 30000; // 30 seconds timeout
const startTime = Date.now();

while (true) {
while (!isComplete && Date.now() - startTime < maxTimeout) {
const { value, done } = await reader.read();
if (done) break;
if (done) {
isComplete = true;
break;
}
if (value) {
result += new TextDecoder().decode(value);
}
}

if (!isComplete) {
throw new Error("Stream reading timed out");
}

return result;
}

export function extractStreamMessage(streamedText: string) {
export function extractStreamMessage(
streamedText: string,
): UnknownStreamMessage {
const content = streamedText.match(/0:"(.*)"/);
if (!content?.[1]) {
throw new Error("No message found in streamed text");
}
const strippedContent = content[1].replace(/\\"/g, '"');
return JSON.parse(strippedContent);
const parsedMessage = JSON.parse(strippedContent) as unknown;
return UnknownStreamMessageSchema.parse(parsedMessage);
}

0 comments on commit 3e18b44

Please sign in to comment.