Skip to content

Commit

Permalink
Move snakes onto meta-workers.
Browse files Browse the repository at this point in the history
Ground work for allowing apples to kill workers.
  • Loading branch information
robholland committed Sep 7, 2024
1 parent 820d9da commit 13dc249
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 62 deletions.
10 changes: 5 additions & 5 deletions game/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
"build": "tsc --build",
"build.watch": "tsc --build --watch",
"lint": "eslint .",
"game-worker": "TEMPORAL_TASK_QUEUE=game ts-node src/game-worker.ts",
"red-worker1": "TEMPORAL_TASK_QUEUE=red-team TEMPORAL_WORKER_IDENTITY=red-1 ts-node src/team-worker.ts",
"red-worker2": "TEMPORAL_TASK_QUEUE=red-team TEMPORAL_WORKER_IDENTITY=red-2 ts-node src/team-worker.ts",
"blue-worker1": "TEMPORAL_TASK_QUEUE=blue-team TEMPORAL_WORKER_IDENTITY=blue-1 ts-node src/team-worker.ts",
"blue-worker2": "TEMPORAL_TASK_QUEUE=blue-team TEMPORAL_WORKER_IDENTITY=blue-2 ts-node src/team-worker.ts",
"game-worker": "ts-node src/game-worker.ts",
"red-worker1": "TEAM=red IDENTITY=red-manager-1 ts-node src/team-worker.ts",
"red-worker2": "TEAM=red IDENTITY=red-manager-2 ts-node src/team-worker.ts",
"blue-worker1": "TEAM=blue IDENTITY=blue-manager-1 ts-node src/team-worker.ts",
"blue-worker2": "TEAM=blue IDENTITY=blue-manager-2 ts-node src/team-worker.ts",
"start.watch": "nodemon src/worker.ts",
"workflow": "ts-node src/client.ts"
},
Expand Down
75 changes: 70 additions & 5 deletions game/src/activities.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,90 @@
import { Snake, Round } from './workflows';
import { io } from 'socket.io-client';
import { Worker, InjectedSinks, NativeConnection } from '@temporalio/worker';
import { heartbeat, cancelled } from '@temporalio/activity';
import { SocketSinks } from './workflow-interceptors';

const socket = io('http://localhost:5173');

export function buildWorkerActivities(namespace: string, connection: NativeConnection, team: string) {
return {
snakeWorker: async(identity: string) => {
const workerSocket = io('http://localhost:5173/workers', {
auth: { identity }
});

const sinks: InjectedSinks<SocketSinks> = {
emitter: {
workflowExecute: {
fn(workflowInfo) {
try {
workerSocket.emit('workflow:execute', { team, identity, workflowInfo });
} catch (err) {
console.log('emit failed', err);
}
},
callDuringReplay: true,
},
workflowComplete: {
fn(workflowInfo) {
try {
workerSocket.emit('workflow:complete', { team, identity, workflowInfo });
} catch (err) {
console.log('emit failed', err);
}
},
callDuringReplay: false,
},
},
};

const worker = await Worker.create({
connection,
namespace,
workflowsPath: require.resolve('./workflows'),
taskQueue: `${team}-team-snakes`,
activities: { snakeNom },
identity,
interceptors: {
workflowModules: [require.resolve('./workflow-interceptors')],
},
sinks,
stickyQueueScheduleToStartTimeout: '1s',
})

workerSocket.on('worker:stop', async({ identity: targetIdentity }) => {
if (identity === targetIdentity) {
worker.shutdown();
}
});
cancelled().catch(() => worker.shutdown());

const heartbeater = setInterval(heartbeat, 1000);

await worker.run();

clearInterval(heartbeater);
},
}
}

export async function snakeNom(snakeId: string, durationMs: number) {
await new Promise((resolve) => setTimeout(resolve, durationMs));
socket.emit('snakeNom', { snakeId });
}
};

export async function snakeMovedNotification(snake: Snake) {
socket.emit('snakeMoved', { snakeId: snake.id, segments: snake.segments });
}
};

export async function roundStartedNotification(round: Round) {
socket.emit('roundStarted', { round });
}
};

export async function roundUpdateNotification(round: Round) {
socket.emit('roundUpdate', { round });
}
};

export async function roundFinishedNotification(round: Round) {
socket.emit('roundFinished', { round });
}
};
4 changes: 1 addition & 3 deletions game/src/game-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ async function run({
connection,
namespace,
workflowsPath: require.resolve('./workflows'),
identity: 'game-worker',
taskQueue,
activities: activities,
// maxCachedWorkflows: 0,
// maxConcurrentActivityTaskPolls: 1,
// maxConcurrentActivityTaskExecutions: 1,
});
console.log('Worker connection successfully established');

Expand Down
49 changes: 7 additions & 42 deletions game/src/team-worker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import fs from 'fs/promises';
import { Worker, InjectedSinks, NativeConnection } from '@temporalio/worker';
import { Worker, NativeConnection } from '@temporalio/worker';
import { Env, getEnv, requiredEnv } from './interfaces/env';
import * as activities from './activities';
import { io } from 'socket.io-client';
import { SocketSinks } from './workflow-interceptors';

const socket = io('http://localhost:5173/workers');
import { buildWorkerActivities } from './activities';

/**
* Run a Worker with an mTLS connection, configuration is provided via environment variables.
Expand Down Expand Up @@ -50,46 +46,15 @@ async function run({
connection = await NativeConnection.connect({ address: address });
}

const sinks: InjectedSinks<SocketSinks> = {
emitter: {
workflowExecute: {
fn(workflowInfo) {
try {
socket.emit('workflow:execute', { identity: worker.options.identity, workflowInfo });
} catch (err) {
console.log('emit failed', err);
}
},
callDuringReplay: true, // The default
},
workflowComplete: {
fn(workflowInfo) {
try {
socket.emit('workflow:complete', { identity: worker.options.identity, workflowInfo });
} catch (err) {
console.log('emit failed', err);
}
},
callDuringReplay: false, // The default
},
},
};

const team = requiredEnv('TEAM');
const identity = requiredEnv('IDENTITY');
const worker = await Worker.create({
connection,
namespace,
workflowsPath: require.resolve('./workflows'),
interceptors: {
workflowModules: [require.resolve('./workflow-interceptors')],
},
taskQueue,
activities: activities,
identity: requiredEnv('TEMPORAL_WORKER_IDENTITY'),
sinks,
// maxCachedWorkflows: 0,
maxConcurrentActivityTaskPolls: 4,
maxConcurrentActivityTaskExecutions: 4,
stickyQueueScheduleToStartTimeout: '1s',
identity,
taskQueue: `${team}-team`,
activities: buildWorkerActivities(namespace, connection, team),
});
console.log('Worker connection successfully established');

Expand Down
33 changes: 32 additions & 1 deletion game/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,27 @@ import {
} from '@temporalio/workflow';

import type * as activities from './activities';
import { buildWorkerActivities } from './activities';

const ROUND_WF_ID = 'SnakeGameRound';
const APPLE_POINTS = 10;
const SNAKE_MOVES_BEFORE_CAN = 500;
const SNAKE_WORKER_DOWN_TIME = '5 seconds';
const SNAKE_WORKERS_PER_TEAM = 4;

const { snakeMovedNotification, roundStartedNotification, roundUpdateNotification, roundFinishedNotification } = proxyLocalActivities<typeof activities>({
startToCloseTimeout: '1 seconds',
});

const { snakeWorker } = proxyActivities<ReturnType<typeof buildWorkerActivities>>({
startToCloseTimeout: '1 day',
heartbeatTimeout: '2 seconds',
retry: {
initialInterval: SNAKE_WORKER_DOWN_TIME,
backoffCoefficient: 1,
},
});

type GameConfig = {
width: number;
height: number;
Expand Down Expand Up @@ -127,6 +139,15 @@ export async function GameWorkflow(config: GameConfig): Promise<void> {
return game;
});

const workerManagers = config.teamNames.map((team) => {
return startChild(SnakeWorkerManagerWorkflow, {
workflowId: `${team}-worker-manager`,
taskQueue: `${team}-team`,
args: [{ team: team, count: SNAKE_WORKERS_PER_TEAM }],
});
});
await Promise.all(workerManagers);

let newRound: RoundWorkflowInput | undefined;
setHandler(roundStartSignal, async ({ duration, snakes }) => {
newRound = { config, teams: buildRoundTeams(game), duration, snakes };
Expand Down Expand Up @@ -239,6 +260,16 @@ export async function SnakeWorkflow({ roundId, id, direction, nomsPerMove, nomDu
}
}

type SnakeWorkerManagerWorkflowInput = {
team: string;
count: number;
};

export async function SnakeWorkerManagerWorkflow({ team, count }: SnakeWorkerManagerWorkflowInput): Promise<void> {
const workers = Array.from({ length: count }).map((_, i) => snakeWorker(`${team}-snake-worker-${i+1}`));
await Promise.all(workers);
};

function moveSnake(round: Round, snake: Snake, direction: Direction) {
const config = round.config;

Expand Down Expand Up @@ -375,7 +406,7 @@ async function startSnakes(config: GameConfig, snakes: Snakes) {
const commands = Object.values(snakes).map((snake) =>
startChild(SnakeWorkflow, {
workflowId: snake.id,
taskQueue: `${snake.teamName}-team`,
taskQueue: `${snake.teamName}-team-snakes`,
args: [{
roundId: ROUND_WF_ID,
id: snake.id,
Expand Down
8 changes: 6 additions & 2 deletions snakes/src/routes/worker/[identity]/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
workerSocket.on('workflow:execute', ({ identity, workflowInfo }) => {
if (identity === $page.params.identity) {
workflows[workflowInfo.id] = workflowInfo;
workflows = { ...workflows, [workflowInfo.workflowId]: workflowInfo };
} else {
delete workflows[workflowInfo.workflowId];
workflows = workflows; // Let Svelte know to update the UI
}
});
workerSocket.on('workflow:complete', ({ identity, workflowInfo }) => {
delete workflows[workflowInfo.id];
delete workflows[workflowInfo.workflowId];
workflows = workflows; // Let Svelte know to update the UI
});
workerSocket.on('disconnect', () => {
Expand Down
8 changes: 4 additions & 4 deletions snakes/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ const webSocketServer = {
const workerIO = io.of("/workers");

workerIO.on('connection', (socket) => {
socket.on('workflow:execute', ({ identity, workflowInfo }) => {
workerIO.emit('workflow:execute', { identity, workflowInfo });
socket.on('workflow:execute', ({ team, identity, workflowInfo }) => {
workerIO.emit('workflow:execute', { team, identity, workflowInfo });
});
socket.on('workflow:complete', ({ identity, workflowInfo }) => {
workerIO.emit('workflow:complete', { identity, workflowInfo });
socket.on('workflow:complete', ({ team, identity, workflowInfo }) => {
workerIO.emit('workflow:complete', { team, identity, workflowInfo });
});
});
}
Expand Down

0 comments on commit 13dc249

Please sign in to comment.