Skip to content

Commit

Permalink
Add endpoint for the Dev UI to subscribe to runtime updates
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelDoyle committed Nov 1, 2024
1 parent 629a23f commit 3e54fbd
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 16 deletions.
46 changes: 30 additions & 16 deletions genkit-tools/common/src/manager/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

import axios, { AxiosError } from 'axios';
import chokidar from 'chokidar';
import EventEmitter from 'events';
import fs from 'fs/promises';
import path from 'path';

import {
Action,
RunActionResponse,
Expand All @@ -28,18 +28,12 @@ import * as apis from '../types/apis';
import { TraceData } from '../types/trace';
import { logger } from '../utils/logger';
import { checkServerHealth, findRuntimesDir } from '../utils/utils';
import { GenkitToolsError, StreamingCallback } from './types';

interface RuntimeInfo {
/** Runtime ID (either user-set or `pid`). */
id: string;
/** Process ID of the runtime. */
pid: number;
/** URL of the reflection server. */
reflectionServerUrl: string;
/** Timestamp when the runtime was started. */
timestamp: string;
}
import {
GenkitToolsError,
RuntimeEvent,
RuntimeInfo,
StreamingCallback,
} from './types';

const STREAM_DELIMITER = '\n';
const HEALTH_CHECK_INTERVAL = 5000;
Expand All @@ -54,6 +48,7 @@ interface RuntimeManagerOptions {
export class RuntimeManager {
private filenameToRuntimeMap: Record<string, RuntimeInfo> = {};
private idToFileMap: Record<string, string> = {};
private eventEmitter = new EventEmitter();

private constructor(
readonly telemetryServerUrl?: string,
Expand Down Expand Up @@ -110,6 +105,23 @@ export class RuntimeManager {
);
}

/**
* Subscribe to changes to the available runtimes. e.g.) whenever a new
* runtime is added or removed.
*
* The `listener` will be called with the `eventType` that occured and the
* `runtime` to which it applies.
*
* @param listener the callback function.
*/
onRuntimeEvent(
listener: (eventType: RuntimeEvent, runtime: RuntimeInfo) => void
) {
Object.values(RuntimeEvent).forEach((event) =>
this.eventEmitter.on(event, (rt) => listener(event, rt))
);
}

/**
* Retrieves all runnable actions.
*/
Expand Down Expand Up @@ -305,6 +317,7 @@ export class RuntimeManager {
if (await checkServerHealth(runtimeInfo.reflectionServerUrl)) {
this.filenameToRuntimeMap[fileName] = runtimeInfo;
this.idToFileMap[runtimeInfo.id] = fileName;
this.eventEmitter.emit(RuntimeEvent.Add, runtimeInfo);
await this.notifyRuntime(runtimeInfo);
logger.debug(
`Added runtime with ID ${runtimeInfo.id} at URL: ${runtimeInfo.reflectionServerUrl}`
Expand All @@ -326,10 +339,11 @@ export class RuntimeManager {
private handleRemovedRuntime(filePath: string) {
const fileName = path.basename(filePath);
if (fileName in this.filenameToRuntimeMap) {
const id = this.filenameToRuntimeMap[fileName].id;
const runtime = this.filenameToRuntimeMap[fileName];
delete this.filenameToRuntimeMap[fileName];
delete this.idToFileMap[id];
logger.debug(`Removed runtime with id ${id}.`);
delete this.idToFileMap[runtime.id];
this.eventEmitter.emit(RuntimeEvent.Remove, runtime);
logger.debug(`Removed runtime with id ${runtime.id}.`);
}
}

Expand Down
16 changes: 16 additions & 0 deletions genkit-tools/common/src/manager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,19 @@ export class GenkitToolsError extends Error {

// Streaming callback function.
export type StreamingCallback<T> = (chunk: T) => void;

export interface RuntimeInfo {
/** Runtime ID (either user-set or `pid`). */
id: string;
/** Process ID of the runtime. */
pid: number;
/** URL of the reflection server. */
reflectionServerUrl: string;
/** Timestamp when the runtime was started. */
timestamp: string;
}

export enum RuntimeEvent {
Add = 'add',
Remove = 'remove',
}
34 changes: 34 additions & 0 deletions genkit-tools/common/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { Server } from 'http';
import os from 'os';
import path from 'path';
import { RuntimeManager } from '../manager/manager';
import { ServerEvent } from '../types';
import { logger } from '../utils/logger';
import { toolsPackage } from '../utils/package';
import { downloadAndExtractUiAssets } from '../utils/ui-assets';
Expand Down Expand Up @@ -79,6 +80,32 @@ export async function startServer(manager: RuntimeManager, port: number) {
res.end();
});

app.get('/api/sse', async (_, res) => {
res.writeHead(200, {
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache',
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
});

// On connection, immediately send the "current" runtime (i.e. most recent)
const runtimeInfo = JSON.stringify(getRuntimeUpdateEvent(manager));
res.write(`data: ${runtimeInfo}\n\n`);

// When runtimes are added or removed, notify the Dev UI which runtime
// is considered "current" (i.e. most recent). In the future, we will send
// updates and let the Dev UI decide which to use. For now, we are limiting
// to a single runtime.
manager.onRuntimeEvent(() => {
const runtimeInfo = JSON.stringify(getRuntimeUpdateEvent(manager));
res.write(`data: ${runtimeInfo}\n\n`);
});

res.on('close', () => {
res.end();
});
});

app.get('/api/__health', (_, res) => {
res.status(200).send('');
});
Expand Down Expand Up @@ -129,3 +156,10 @@ export async function startServer(manager: RuntimeManager, port: number) {
logger.info(`${clc.green(clc.bold('Genkit Developer UI:'))} ${uiUrl}`);
});
}

function getRuntimeUpdateEvent(manager: RuntimeManager): ServerEvent {
return {
eventName: 'current-runtime',
data: manager.getMostRecentRuntime(),
};
}
6 changes: 6 additions & 0 deletions genkit-tools/common/src/types/apis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,9 @@ export const RunNewEvaluationRequestSchema = z.object({
export type RunNewEvaluationRequest = z.infer<
typeof RunNewEvaluationRequestSchema
>;

export const ServerEventSchema = z.object({
eventName: z.enum(['current-runtime']),
data: z.any(),
});
export type ServerEvent = z.infer<typeof ServerEventSchema>;
1 change: 1 addition & 0 deletions genkit-tools/common/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

export { RuntimeEvent, RuntimeInfo } from '../manager/types';
export * from './action';
export * from './analytics';
export * from './apis';
Expand Down

0 comments on commit 3e54fbd

Please sign in to comment.