From e7a95c019ccdcb5e88a4e4b61c3af63f937a05d2 Mon Sep 17 00:00:00 2001 From: Tomas Dvorak Date: Wed, 4 Dec 2024 19:36:48 +0100 Subject: [PATCH] feat(internals): extends RestfulClient by emitting events Signed-off-by: Tomas Dvorak --- src/internals/fetcher.ts | 122 ++++++++++++++++++++++++++++++--------- 1 file changed, 94 insertions(+), 28 deletions(-) diff --git a/src/internals/fetcher.ts b/src/internals/fetcher.ts index 5fc15f0e..3da398bd 100644 --- a/src/internals/fetcher.ts +++ b/src/internals/fetcher.ts @@ -23,7 +23,9 @@ import { } from "@ai-zen/node-fetch-event-source"; import { FetchEventSourceInit } from "@ai-zen/node-fetch-event-source/lib/cjs/fetch.js"; import { emitterToGenerator } from "@/internals/helpers/promise.js"; -import { isPlainObject } from "remeda"; +import { doNothing, isPlainObject } from "remeda"; +import { Callback, Emitter } from "@/emitter/emitter.js"; +import { shallowCopy } from "@/serializer/utils.js"; export class RestfulClientError extends FrameworkError {} @@ -52,7 +54,36 @@ export function createURLParams( return urlTokenParams; } +interface FetchInput { + url: string; + options: RequestInit; +} + +export interface StreamInput { + url: string; + options: FetchEventSourceInit; +} + +export interface RestfulClientEvents { + fetchStart: Callback<{ input: FetchInput }>; + fetchError: Callback<{ error: Error; input: FetchInput }>; + fetchSuccess: Callback<{ response: Response; data: any; input: FetchInput }>; + fetchDone: Callback<{ input: FetchInput }>; + + streamStart: Callback<{ input: StreamInput }>; + streamOpen: Callback<{ input: StreamInput }>; + streamSuccess: Callback<{ input: StreamInput }>; + streamMessage: Callback<{ data: EventSourceMessage; input: StreamInput }>; + streamError: Callback<{ error: Error; input: StreamInput }>; + streamDone: Callback<{ input: StreamInput }>; +} + export class RestfulClient> extends Serializable { + public readonly emitter = Emitter.root.child({ + namespace: ["internals", "restfulClient"], + creator: this, + }); + constructor( protected input: { baseUrl: string; @@ -68,14 +99,28 @@ export class RestfulClient> extends Serializabl init: FetchEventSourceInit, ): AsyncGenerator { const { paths, baseUrl, headers } = this.input; + const emitter = this.emitter.child({ + groupId: "stream", + }); - const target = new URL(paths[path] ?? path, baseUrl); - return yield* emitterToGenerator(async ({ emit }) => - fetchEventSource(target.toString(), { + const input: StreamInput = { + url: new URL(paths[path] ?? path, baseUrl).toString(), + options: { + ...init, method: "POST", - headers: await headers().then((raw) => Object.fromEntries(raw.entries())), + headers: await headers().then((raw) => + Object.assign(Object.fromEntries(raw.entries()), init?.headers), + ), + }, + }; + await emitter.emit("streamStart", { input }); + + return yield* emitterToGenerator(async ({ emit }) => + fetchEventSource(input.url, { + ...input.options, async onopen(response) { if (response.ok && response.headers.get("content-type") === EventStreamContentType) { + await emitter.emit("streamOpen", { input }); return; } throw new RestfulClientError("Failed to stream!", [], { @@ -87,25 +132,31 @@ export class RestfulClient> extends Serializabl isRetryable: response.status >= 400 && response.status < 500 && response.status !== 429, }); }, - onmessage(msg) { + async onmessage(msg) { if (msg?.event === "error") { throw new RestfulClientError(`Error during streaming has occurred.`, [], { context: msg, }); } + await emitter.emit("streamMessage", { input, data: msg }); emit(msg); }, onclose() {}, onerror(err) { throw new RestfulClientError(`Error during streaming has occurred.`, [err]); }, - ...init, - fetch, - }), + }) + .then(() => emitter.emit("streamSuccess", { input })) + .catch((error) => emitter.emit("streamError", { input, error }).catch(doNothing())) + .finally(() => emitter.emit("streamDone", { input })), ); } async fetch(path: keyof K, init?: RequestInit & { searchParams?: URLSearchParams }) { + const emitter = this.emitter.child({ + groupId: "fetch", + }); + const { paths, baseUrl, headers: getHeaders } = this.input; const target = new URL(paths[path] ?? path, baseUrl); @@ -115,31 +166,46 @@ export class RestfulClient> extends Serializabl } } - const headers = await getHeaders().then((raw) => - Object.assign(Object.fromEntries(raw.entries()), init?.headers), - ); - const response = await fetch(target.toString(), { - ...init, - headers, - }); + const input: FetchInput = { + url: target.toString(), + options: { + ...init, + headers: await getHeaders().then((raw) => + Object.assign(Object.fromEntries(raw.entries()), init?.headers), + ), + }, + }; - if (!response.ok) { - throw new RestfulClientError("Fetch has failed", [], { - context: { - url: response.url, - error: await response.text(), - response, - }, - isRetryable: [408, 503].includes(response.status ?? 500), - }); - } + await emitter.emit("fetchStart", { input }); + try { + const response = await fetch(input.url, input.options); - return response.json(); + if (!response.ok) { + throw new RestfulClientError("Fetch has failed", [], { + context: { + url: response.url, + error: await response.text(), + response, + }, + isRetryable: [408, 503].includes(response.status ?? 500), + }); + } + + const data = await response.json(); + await emitter.emit("fetchSuccess", { response, data, input }); + return data; + } catch (error) { + await emitter.emit("fetchError", { error, input: input }); + throw error; + } finally { + await emitter.emit("fetchDone", { input: input }); + } } createSnapshot() { return { - input: this.input, + input: shallowCopy(this.input), + emitter: this.emitter, }; }