From 369a1ef3f8914a555b0467015e11de0c1e25256d Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Fri, 6 Sep 2024 17:20:33 +0100 Subject: [PATCH] Add worker page which shows workflows currently on that worker. --- game/package.json | 8 ++-- game/src/team-worker.ts | 38 +++++++++++++++- game/src/workflow-interceptors.ts | 37 ++++++++++++++++ snakes/src/routes/[id]/lobby/+page.svelte | 1 - .../src/routes/worker/[identity]/+page.svelte | 43 +++++++++++++++++++ snakes/vite.config.ts | 11 +++++ 6 files changed, 131 insertions(+), 7 deletions(-) create mode 100644 game/src/workflow-interceptors.ts create mode 100644 snakes/src/routes/worker/[identity]/+page.svelte diff --git a/game/package.json b/game/package.json index 48252b4..d83699c 100644 --- a/game/package.json +++ b/game/package.json @@ -7,10 +7,10 @@ "build.watch": "tsc --build --watch", "lint": "eslint .", "game-worker": "TEMPORAL_TASK_QUEUE=game ts-node src/game-worker.ts", - "red-worker1": "TEMPORAL_TASK_QUEUE=red-team ts-node src/team-worker.ts", - "red-worker2": "TEMPORAL_TASK_QUEUE=red-team ts-node src/team-worker.ts", - "blue-worker1": "TEMPORAL_TASK_QUEUE=blue-team ts-node src/team-worker.ts", - "blue-worker2": "TEMPORAL_TASK_QUEUE=blue-team ts-node src/team-worker.ts", + "red-worker1": "TEMPORAL_TASK_QUEUE=red-team TEMPORAL_WORKER_IDENTITY=red-1 ts-node src/team-worker.ts", + "red-worker2": "TEMPORAL_TASK_QUEUE=red-team TEMPORAL_WORKER_IDENTITY=red-2 ts-node src/team-worker.ts", + "blue-worker1": "TEMPORAL_TASK_QUEUE=blue-team TEMPORAL_WORKER_IDENTITY=blue-1 ts-node src/team-worker.ts", + "blue-worker2": "TEMPORAL_TASK_QUEUE=blue-team TEMPORAL_WORKER_IDENTITY=blue-2 ts-node src/team-worker.ts", "start.watch": "nodemon src/worker.ts", "workflow": "ts-node src/client.ts" }, diff --git a/game/src/team-worker.ts b/game/src/team-worker.ts index bbc1ad7..a375b70 100644 --- a/game/src/team-worker.ts +++ b/game/src/team-worker.ts @@ -1,7 +1,11 @@ import fs from 'fs/promises'; -import { Worker, NativeConnection } from '@temporalio/worker'; -import { Env, getEnv } from './interfaces/env'; +import { Worker, InjectedSinks, NativeConnection } from '@temporalio/worker'; +import { Env, getEnv, requiredEnv } from './interfaces/env'; import * as activities from './activities'; +import { io } from 'socket.io-client'; +import { SocketSinks } from './workflow-interceptors'; + +const socket = io('http://localhost:5173/workers'); /** * Run a Worker with an mTLS connection, configuration is provided via environment variables. @@ -46,12 +50,42 @@ async function run({ connection = await NativeConnection.connect({ address: address }); } + const sinks: InjectedSinks = { + emitter: { + workflowExecute: { + fn(workflowInfo) { + try { + socket.emit('workflow:execute', { identity: worker.options.identity, workflowInfo }); + } catch (err) { + console.log('emit failed', err); + } + }, + callDuringReplay: true, // The default + }, + workflowComplete: { + fn(workflowInfo) { + try { + socket.emit('workflow:complete', { identity: worker.options.identity, workflowInfo }); + } catch (err) { + console.log('emit failed', err); + } + }, + callDuringReplay: false, // The default + }, + }, + }; + const worker = await Worker.create({ connection, namespace, workflowsPath: require.resolve('./workflows'), + interceptors: { + workflowModules: [require.resolve('./workflow-interceptors')], + }, taskQueue, activities: activities, + identity: requiredEnv('TEMPORAL_WORKER_IDENTITY'), + sinks, // maxCachedWorkflows: 0, maxConcurrentActivityTaskPolls: 4, maxConcurrentActivityTaskExecutions: 4, diff --git a/game/src/workflow-interceptors.ts b/game/src/workflow-interceptors.ts new file mode 100644 index 0000000..c61b44f --- /dev/null +++ b/game/src/workflow-interceptors.ts @@ -0,0 +1,37 @@ +import { + WorkflowExecuteInput, + Next, + WorkflowInboundCallsInterceptor, + workflowInfo, + Sinks, + proxySinks, +} from '@temporalio/workflow'; + +export interface SocketSinks extends Sinks { + emitter: { + workflowExecute(): void; + workflowComplete(): void; + }; +} + +const { emitter } = proxySinks(); + +class WorkflowTaskInterceptor + implements WorkflowInboundCallsInterceptor { + constructor(public readonly workflowType: string) { } + + async execute( + input: WorkflowExecuteInput, + next: Next, + ): Promise { + emitter.workflowExecute(); + const result = await next(input); + emitter.workflowComplete(); + return result; + } +} + +export const interceptors = () => ({ + outbound: [], + inbound: [new WorkflowTaskInterceptor(workflowInfo().workflowType)], +}); \ No newline at end of file diff --git a/snakes/src/routes/[id]/lobby/+page.svelte b/snakes/src/routes/[id]/lobby/+page.svelte index 826f261..8b3e038 100644 --- a/snakes/src/routes/[id]/lobby/+page.svelte +++ b/snakes/src/routes/[id]/lobby/+page.svelte @@ -5,7 +5,6 @@ import QR from '@svelte-put/qr/svg/QR.svelte'; import type { Lobby } from '$lib/snake/types'; import { onMount } from 'svelte'; - import { base } from '$app/paths'; $: ({ id: workflowId } = $page.params); diff --git a/snakes/src/routes/worker/[identity]/+page.svelte b/snakes/src/routes/worker/[identity]/+page.svelte new file mode 100644 index 0000000..090ba40 --- /dev/null +++ b/snakes/src/routes/worker/[identity]/+page.svelte @@ -0,0 +1,43 @@ + + +
+

Worker: { online ? "Online" : "Offline"}

+
    + {#each Object.values(workflows) as workflow} +
  • {workflow.workflowId}
  • + {/each} +
+
\ No newline at end of file diff --git a/snakes/vite.config.ts b/snakes/vite.config.ts index a1584c2..9d5375b 100644 --- a/snakes/vite.config.ts +++ b/snakes/vite.config.ts @@ -198,6 +198,17 @@ const webSocketServer = { } }); }); + + const workerIO = io.of("/workers"); + + workerIO.on('connection', (socket) => { + socket.on('workflow:execute', ({ identity, workflowInfo }) => { + workerIO.emit('workflow:execute', { identity, workflowInfo }); + }); + socket.on('workflow:complete', ({ identity, workflowInfo }) => { + workerIO.emit('workflow:complete', { identity, workflowInfo }); + }); + }); } };