Skip to content

Commit

Permalink
Add priority-queue example for TS (#182)
Browse files Browse the repository at this point in the history
* Add priority-queue example for TS

* Named queues

* Add actual prioritization

* just queue

* Split out methods

* Add priority to readme

* Move run into op
  • Loading branch information
jackkleeman authored Sep 4, 2024
1 parent 0b48dcc commit 714767b
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Priority queue

An example of implementing your own priority queue using Restate state and
awakeables.

Run the example with `npm run app-dev`.

You can simulate adding work to the queue like this:
```shell
# add a single entry
curl localhost:8080/myService/expensiveMethod/send --json '{"left": 1, "right": 2, "priority": 1}'
# add lots
for i in $(seq 1 30); do curl localhost:8080/myService/expensiveMethod/send --json '{"left": 1, "right": 2, "priority": 2}'; done
```

As you do so, you can observe the logs; in flight requests will increase up to 10, beyond which items will be enqueued.

You can write your own queue item selection logic in `selectAndPopItem`; doing so is outside the scope of this example.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "@restatedev/example-pattern-priority-queue",
"version": "0.1.0",
"description": "A Restate example showing the implementation of a distributed priority queue",
"type": "commonjs",
"scripts": {
"build": "tsc --noEmitOnError",
"app-dev": "tsx --watch ./src/app.ts",
"app": "tsx ./src/app.ts"
},
"dependencies": {
"@restatedev/restate-sdk": "^1.2.1"
},
"devDependencies": {
"@types/node": "^20.12.7",
"tsx": "^4.17.0",
"typescript": "^5.0.2"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/blob/main/LICENSE
*/

import { endpoint } from "@restatedev/restate-sdk";

import { queue } from "./queue";
import { myService } from "./service";

endpoint().bind(queue).bind(myService).listen();
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/blob/main/LICENSE
*/

import { object, ObjectContext, TerminalError } from "@restatedev/restate-sdk";

type QueueState = {
items: QueueItem[];
inFlight: number;
};

type QueueItem = {
awakeable: string;
priority: number;
};

type TickCause =
| { type: "done" }
| { type: "push"; item: QueueItem }
| { type: "drop"; awakeable: string };

// Put your super clever queue fairness algorithm here
function selectAndPopItem<T>(items: QueueItem[]): QueueItem {
let lowest = { priority: Number.MAX_SAFE_INTEGER, index: 0 };
for (const [i, item] of items.entries()) {
if (item.priority < lowest.priority) {
lowest.priority = item.priority;
lowest.index = i;
}
}
const [item] = items.splice(lowest.index, 1);
return item;
}

const MAX_IN_FLIGHT = 10;

export const queue = object({
name: "queue",
handlers: {
done: async (ctx: ObjectContext<QueueState>): Promise<void> => {
const state = await getState(ctx);

state.inFlight--;

tick(ctx, state);

setState(ctx, state);
},
push: async (
ctx: ObjectContext<QueueState>,
item: QueueItem,
): Promise<void> => {
const state = await getState(ctx);

state.items.push(item);

tick(ctx, state);

setState(ctx, state);
},
drop: async (
ctx: ObjectContext<QueueState>,
awakeable: string,
): Promise<void> => {
const state = await getState(ctx);

const index = state.items.findIndex(
(item) => item.awakeable == awakeable,
);
if (index == -1) {
// we have already popped it; treat this as a 'done'
state.inFlight--;
} else {
// remove from the queue
state.items.splice(index, 1);
}

tick(ctx, state);

setState(ctx, state);
},
},
});

async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> {
return {
items: (await ctx.get("items")) ?? [],
inFlight: (await ctx.get("inFlight")) ?? 0,
};
}

function setState(ctx: ObjectContext<QueueState>, state: QueueState) {
ctx.set("items", state.items);
ctx.set("inFlight", state.inFlight);
}

function tick(ctx: ObjectContext<QueueState>, state: QueueState) {
while (state.inFlight < MAX_IN_FLIGHT && state.items.length > 0) {
let item = selectAndPopItem(state.items);
state.inFlight++;
ctx.resolveAwakeable(item.awakeable);
}

ctx.console.log(
`Tick end. Queue length: ${state.items.length}, In Flight: ${state.inFlight}`,
);
}

export type Queue = typeof queue;
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/blob/main/LICENSE
*/

import { Context, TerminalError } from "@restatedev/restate-sdk";
import type { Queue as QueueObject } from "./queue";

export interface Queue {
run<T>(priority: number, op: () => Promise<T>): Promise<T>;
}

export namespace Queue {
export function fromContext(ctx: Context, name: string): Queue {
return {
async run<T>(priority: number, op: () => Promise<T>): Promise<T> {
const client = ctx.objectSendClient<QueueObject>(
{ name: "queue" },
name,
);

const awakeable = ctx.awakeable();
client.push({
awakeable: awakeable.id,
priority,
});

try {
await awakeable.promise;
} catch (e) {
if (e instanceof TerminalError) {
// this should only happen on cancellation; inform the queue that we no longer need to be scheduled
client.drop(awakeable.id);
}
throw e;
}

try {
const result = await op();

client.done();

return result;
} catch (e) {
if (e instanceof TerminalError) {
client.done();
}
throw e;
}
},
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/blob/main/LICENSE
*/

import { Context, service } from "@restatedev/restate-sdk";
import { Queue } from "./queue_client";

const QUEUE_NAME = "myService/expensiveMethod";

export const myService = service({
name: "myService",
handlers: {
expensiveMethod: async (
ctx: Context,
params: { left: number; right: number; priority?: number },
): Promise<number> => {
const queue = Queue.fromContext(ctx, QUEUE_NAME);
return queue.run(params.priority ?? 1, () =>
expensiveOperation(ctx, params.left, params.right),
);
},
},
});

async function expensiveOperation(
ctx: Context,
left: number,
right: number,
): Promise<number> {
return ctx.run(async () => {
// very cpu heavy - important that the queue protects this
await new Promise((resolve) => setTimeout(resolve, 5_000));
return left + right;
});
}

export type MyService = typeof myService;
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"compilerOptions": {
"target": "esnext",
"lib": ["esnext"],
"module": "nodenext",
"allowJs": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"outDir": "./dist",
"allowSyntheticDefaultImports": true,
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"strict": true,
"skipDefaultLibCheck": true,
"skipLibCheck": true
}
}

0 comments on commit 714767b

Please sign in to comment.