Skip to content

Commit

Permalink
Add worker page which shows workflows currently on that worker.
Browse files Browse the repository at this point in the history
  • Loading branch information
robholland committed Sep 6, 2024
1 parent a493e7a commit 369a1ef
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 7 deletions.
8 changes: 4 additions & 4 deletions game/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
"build.watch": "tsc --build --watch",
"lint": "eslint .",
"game-worker": "TEMPORAL_TASK_QUEUE=game ts-node src/game-worker.ts",
"red-worker1": "TEMPORAL_TASK_QUEUE=red-team ts-node src/team-worker.ts",
"red-worker2": "TEMPORAL_TASK_QUEUE=red-team ts-node src/team-worker.ts",
"blue-worker1": "TEMPORAL_TASK_QUEUE=blue-team ts-node src/team-worker.ts",
"blue-worker2": "TEMPORAL_TASK_QUEUE=blue-team ts-node src/team-worker.ts",
"red-worker1": "TEMPORAL_TASK_QUEUE=red-team TEMPORAL_WORKER_IDENTITY=red-1 ts-node src/team-worker.ts",
"red-worker2": "TEMPORAL_TASK_QUEUE=red-team TEMPORAL_WORKER_IDENTITY=red-2 ts-node src/team-worker.ts",
"blue-worker1": "TEMPORAL_TASK_QUEUE=blue-team TEMPORAL_WORKER_IDENTITY=blue-1 ts-node src/team-worker.ts",
"blue-worker2": "TEMPORAL_TASK_QUEUE=blue-team TEMPORAL_WORKER_IDENTITY=blue-2 ts-node src/team-worker.ts",
"start.watch": "nodemon src/worker.ts",
"workflow": "ts-node src/client.ts"
},
Expand Down
38 changes: 36 additions & 2 deletions game/src/team-worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import fs from 'fs/promises';
import { Worker, NativeConnection } from '@temporalio/worker';
import { Env, getEnv } from './interfaces/env';
import { Worker, InjectedSinks, NativeConnection } from '@temporalio/worker';
import { Env, getEnv, requiredEnv } from './interfaces/env';
import * as activities from './activities';
import { io } from 'socket.io-client';
import { SocketSinks } from './workflow-interceptors';

const socket = io('http://localhost:5173/workers');

/**
* Run a Worker with an mTLS connection, configuration is provided via environment variables.
Expand Down Expand Up @@ -46,12 +50,42 @@ async function run({
connection = await NativeConnection.connect({ address: address });
}

const sinks: InjectedSinks<SocketSinks> = {
emitter: {
workflowExecute: {
fn(workflowInfo) {
try {
socket.emit('workflow:execute', { identity: worker.options.identity, workflowInfo });
} catch (err) {
console.log('emit failed', err);
}
},
callDuringReplay: true, // The default
},
workflowComplete: {
fn(workflowInfo) {
try {
socket.emit('workflow:complete', { identity: worker.options.identity, workflowInfo });
} catch (err) {
console.log('emit failed', err);
}
},
callDuringReplay: false, // The default
},
},
};

const worker = await Worker.create({
connection,
namespace,
workflowsPath: require.resolve('./workflows'),
interceptors: {
workflowModules: [require.resolve('./workflow-interceptors')],
},
taskQueue,
activities: activities,
identity: requiredEnv('TEMPORAL_WORKER_IDENTITY'),
sinks,
// maxCachedWorkflows: 0,
maxConcurrentActivityTaskPolls: 4,
maxConcurrentActivityTaskExecutions: 4,
Expand Down
37 changes: 37 additions & 0 deletions game/src/workflow-interceptors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {
WorkflowExecuteInput,
Next,
WorkflowInboundCallsInterceptor,
workflowInfo,
Sinks,
proxySinks,
} from '@temporalio/workflow';

export interface SocketSinks extends Sinks {
emitter: {
workflowExecute(): void;
workflowComplete(): void;
};
}

const { emitter } = proxySinks<SocketSinks>();

class WorkflowTaskInterceptor
implements WorkflowInboundCallsInterceptor {
constructor(public readonly workflowType: string) { }

async execute(
input: WorkflowExecuteInput,
next: Next<WorkflowInboundCallsInterceptor, 'execute'>,
): Promise<unknown> {
emitter.workflowExecute();
const result = await next(input);
emitter.workflowComplete();
return result;
}
}

export const interceptors = () => ({
outbound: [],
inbound: [new WorkflowTaskInterceptor(workflowInfo().workflowType)],
});
1 change: 0 additions & 1 deletion snakes/src/routes/[id]/lobby/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import QR from '@svelte-put/qr/svg/QR.svelte';
import type { Lobby } from '$lib/snake/types';
import { onMount } from 'svelte';
import { base } from '$app/paths';
$: ({ id: workflowId } = $page.params);
Expand Down
43 changes: 43 additions & 0 deletions snakes/src/routes/worker/[identity]/+page.svelte
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<script lang="ts">
import { io, Socket } from 'socket.io-client';
import { onMount } from 'svelte';
import { page } from '$app/stores';
let online = false;
let workerSocket: Socket;
let workflows: Record<string, any> = {};
onMount(() => {
workerSocket = io("/workers");
workerSocket.on('connect', () => {
online = true;
});
workerSocket.on('workflow:execute', ({ identity, workflowInfo }) => {
console.log('execute', identity, $page.params.identity, workflowInfo);
if (identity === $page.params.identity) {
workflows[workflowInfo.id] = workflowInfo;
}
});
workerSocket.on('workflow:complete', ({ identity, workflowInfo }) => {
console.log('complete', identity, $page.params.identity, workflowInfo);
delete workflows[workflowInfo.id];
});
workerSocket.on('disconnect', () => {
online = false;
});
});
</script>

<section>
<h2 class="retro">Worker: { online ? "Online" : "Offline"}</h2>
<ul class="retro">
{#each Object.values(workflows) as workflow}
<li>{workflow.workflowId}</li>
{/each}
</ul>
</section>
11 changes: 11 additions & 0 deletions snakes/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ const webSocketServer = {
}
});
});

const workerIO = io.of("/workers");

workerIO.on('connection', (socket) => {
socket.on('workflow:execute', ({ identity, workflowInfo }) => {
workerIO.emit('workflow:execute', { identity, workflowInfo });
});
socket.on('workflow:complete', ({ identity, workflowInfo }) => {
workerIO.emit('workflow:complete', { identity, workflowInfo });
});
});
}
};

Expand Down

0 comments on commit 369a1ef

Please sign in to comment.