Skip to content

Commit

Permalink
feat(internals): extends RestfulClient by emitting events
Browse files Browse the repository at this point in the history
Signed-off-by: Tomas Dvorak <[email protected]>
  • Loading branch information
Tomas2D committed Dec 4, 2024
1 parent a22a4d9 commit e7a95c0
Showing 1 changed file with 94 additions and 28 deletions.
122 changes: 94 additions & 28 deletions src/internals/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import {
} from "@ai-zen/node-fetch-event-source";
import { FetchEventSourceInit } from "@ai-zen/node-fetch-event-source/lib/cjs/fetch.js";
import { emitterToGenerator } from "@/internals/helpers/promise.js";
import { isPlainObject } from "remeda";
import { doNothing, isPlainObject } from "remeda";
import { Callback, Emitter } from "@/emitter/emitter.js";
import { shallowCopy } from "@/serializer/utils.js";

export class RestfulClientError extends FrameworkError {}

Expand Down Expand Up @@ -52,7 +54,36 @@ export function createURLParams(
return urlTokenParams;
}

interface FetchInput {
url: string;
options: RequestInit;
}

export interface StreamInput {
url: string;
options: FetchEventSourceInit;
}

export interface RestfulClientEvents {
fetchStart: Callback<{ input: FetchInput }>;
fetchError: Callback<{ error: Error; input: FetchInput }>;
fetchSuccess: Callback<{ response: Response; data: any; input: FetchInput }>;
fetchDone: Callback<{ input: FetchInput }>;

streamStart: Callback<{ input: StreamInput }>;
streamOpen: Callback<{ input: StreamInput }>;
streamSuccess: Callback<{ input: StreamInput }>;
streamMessage: Callback<{ data: EventSourceMessage; input: StreamInput }>;
streamError: Callback<{ error: Error; input: StreamInput }>;
streamDone: Callback<{ input: StreamInput }>;
}

export class RestfulClient<K extends Record<string, string>> extends Serializable {
public readonly emitter = Emitter.root.child<RestfulClientEvents>({
namespace: ["internals", "restfulClient"],
creator: this,
});

constructor(
protected input: {
baseUrl: string;
Expand All @@ -68,14 +99,28 @@ export class RestfulClient<K extends Record<string, string>> extends Serializabl
init: FetchEventSourceInit,
): AsyncGenerator<EventSourceMessage, void, void> {
const { paths, baseUrl, headers } = this.input;
const emitter = this.emitter.child({
groupId: "stream",
});

const target = new URL(paths[path] ?? path, baseUrl);
return yield* emitterToGenerator(async ({ emit }) =>
fetchEventSource(target.toString(), {
const input: StreamInput = {
url: new URL(paths[path] ?? path, baseUrl).toString(),
options: {
...init,
method: "POST",
headers: await headers().then((raw) => Object.fromEntries(raw.entries())),
headers: await headers().then((raw) =>
Object.assign(Object.fromEntries(raw.entries()), init?.headers),
),
},
};
await emitter.emit("streamStart", { input });

return yield* emitterToGenerator(async ({ emit }) =>
fetchEventSource(input.url, {
...input.options,
async onopen(response) {
if (response.ok && response.headers.get("content-type") === EventStreamContentType) {
await emitter.emit("streamOpen", { input });
return;
}
throw new RestfulClientError("Failed to stream!", [], {
Expand All @@ -87,25 +132,31 @@ export class RestfulClient<K extends Record<string, string>> extends Serializabl
isRetryable: response.status >= 400 && response.status < 500 && response.status !== 429,
});
},
onmessage(msg) {
async onmessage(msg) {
if (msg?.event === "error") {
throw new RestfulClientError(`Error during streaming has occurred.`, [], {
context: msg,
});
}
await emitter.emit("streamMessage", { input, data: msg });
emit(msg);
},
onclose() {},
onerror(err) {
throw new RestfulClientError(`Error during streaming has occurred.`, [err]);
},
...init,
fetch,
}),
})
.then(() => emitter.emit("streamSuccess", { input }))
.catch((error) => emitter.emit("streamError", { input, error }).catch(doNothing()))
.finally(() => emitter.emit("streamDone", { input })),
);
}

async fetch(path: keyof K, init?: RequestInit & { searchParams?: URLSearchParams }) {
const emitter = this.emitter.child({
groupId: "fetch",
});

const { paths, baseUrl, headers: getHeaders } = this.input;

const target = new URL(paths[path] ?? path, baseUrl);
Expand All @@ -115,31 +166,46 @@ export class RestfulClient<K extends Record<string, string>> extends Serializabl
}
}

const headers = await getHeaders().then((raw) =>
Object.assign(Object.fromEntries(raw.entries()), init?.headers),
);
const response = await fetch(target.toString(), {
...init,
headers,
});
const input: FetchInput = {
url: target.toString(),
options: {
...init,
headers: await getHeaders().then((raw) =>
Object.assign(Object.fromEntries(raw.entries()), init?.headers),
),
},
};

if (!response.ok) {
throw new RestfulClientError("Fetch has failed", [], {
context: {
url: response.url,
error: await response.text(),
response,
},
isRetryable: [408, 503].includes(response.status ?? 500),
});
}
await emitter.emit("fetchStart", { input });
try {
const response = await fetch(input.url, input.options);

return response.json();
if (!response.ok) {
throw new RestfulClientError("Fetch has failed", [], {
context: {
url: response.url,
error: await response.text(),
response,
},
isRetryable: [408, 503].includes(response.status ?? 500),
});
}

const data = await response.json();
await emitter.emit("fetchSuccess", { response, data, input });
return data;
} catch (error) {
await emitter.emit("fetchError", { error, input: input });
throw error;
} finally {
await emitter.emit("fetchDone", { input: input });
}
}

createSnapshot() {
return {
input: this.input,
input: shallowCopy(this.input),
emitter: this.emitter,
};
}

Expand Down

0 comments on commit e7a95c0

Please sign in to comment.