Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: distinguish between awaiting Promise and waiting for subtask, simplify route rebuilding tasks #7203

Merged
merged 2 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions packages/cc/src/lib/Values.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import {
ValueMetadata,
} from "@zwave-js/core";
import type { ZWaveApplicationHost } from "@zwave-js/host";
import {
type FnOrStatic,
type ReturnTypeOrStatic,
evalOrStatic,
} from "@zwave-js/shared/safe";
import type { Overwrite } from "alcalzone-shared/types";
import type { ValueIDProperties } from "./API";

Expand Down Expand Up @@ -128,23 +133,13 @@ type ToDynamicCCValues<
}
>;

type FnOrStatic<TArgs extends any[], TReturn> =
| ((...args: TArgs) => TReturn)
| TReturn;

type ReturnTypeOrStatic<T> = T extends (...args: any[]) => infer R ? R : T;

type InferArgs<T extends FnOrStatic<any, any>[]> = T extends [
(...args: infer A) => any,
...any,
] ? A
: T extends [any, ...infer R] ? InferArgs<R>
: [];

function evalOrStatic<T>(fnOrConst: T, ...args: any[]): ReturnTypeOrStatic<T> {
return typeof fnOrConst === "function" ? fnOrConst(...args) : fnOrConst;
}

/** Defines a single static CC values that belong to a CC */
function defineStaticCCValue<
TCommandClass extends CommandClasses,
Expand Down
2 changes: 1 addition & 1 deletion packages/serial/src/message/Constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export enum FunctionType {
GetProtocolStatus = 0xbf, // Request the current status of the protocol running on the Z-Wave module

FUNC_ID_ZW_SET_PROMISCUOUS_MODE = 0xd0, // Set controller into promiscuous mode to listen to all messages
FUNC_ID_PROMISCUOUS_APPLICATION_COMMAND_HANDLER = 0xd1,
FUNC_ID_PROMISCUOUS_APPLICATION_COMMAND_HANDLER = 0xd1, // deprecated, replaced with a flag for the ApplicationCommandHandler

StartWatchdog = 0xd2, // Start Hardware Watchdog (700 series and newer)
StopWatchdog = 0xd3, // Stop Hardware Watchdog (700 series and newer)
Expand Down
14 changes: 14 additions & 0 deletions packages/shared/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,17 @@ export function sum(values: number[]): number {
export function noop(): void {
// intentionally empty
}

export type FnOrStatic<TArgs extends any[], TReturn> =
| ((...args: TArgs) => TReturn)
| TReturn;

export type ReturnTypeOrStatic<T> = T extends (...args: any[]) => infer R ? R
: T;

export function evalOrStatic<T>(
fnOrConst: T,
...args: any[]
): ReturnTypeOrStatic<T> {
return typeof fnOrConst === "function" ? fnOrConst(...args) : fnOrConst;
}
99 changes: 60 additions & 39 deletions packages/zwave-js/src/lib/controller/Controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4688,10 +4688,14 @@ export class ZWaveController

async function* doRebuildRoutes(nodeId: number) {
// Await the process for each node and convert errors to a non-successful result
const result: boolean = yield () =>
self.rebuildNodeRoutesInternal(nodeId).catch(() =>
false
);
let result: boolean;
try {
const node = self.nodes.getOrThrow(nodeId);
result = yield () =>
self.getRebuildNodeRoutesTask(node);
} catch {
result = false;
}

// Track the success in a map
self._rebuildRoutesProgress.set(
Expand Down Expand Up @@ -4736,29 +4740,42 @@ export class ZWaveController
"Rebuilding routes for sleeping nodes when they wake up",
);

const tasks = todoSleeping.map((nodeId) =>
const sleepingNodes = todoSleeping.map((nodeId) =>
self.nodes.get(nodeId)
).filter((node) => node != undefined)
.map((node) => {
const sleepingNodeTask: TaskBuilder<void> = {
priority: TaskPriority.Lower - 1,
tag: {
id: "rebuild-node-routes-wakeup",
nodeId: node.id,
},
task:
async function* rebuildSleepingNodeRoutesTask() {
// Pause the task until the node wakes up
yield () => node.waitForWakeup();
yield* doRebuildRoutes(node.id);
},
};
return self.driver.scheduler.queueTask(
sleepingNodeTask,
).filter((node) => node != undefined);

const wakeupPromises = new Map(
sleepingNodes.map((node) =>
[
node.id,
node.waitForWakeup().then(() => node),
] as const
),
);

// As long as there are sleeping nodes that haven't had their routes rebuilt yet,
// wait for any of them to wake up
while (wakeupPromises.size > 0) {
const wakeUpPromise = Promise.race(
wakeupPromises.values(),
);
const wokenUpNode = (
yield () => wakeUpPromise
) as Awaited<typeof wakeUpPromise>;
if (wokenUpNode.status === NodeStatus.Asleep) {
// The node has gone to sleep again since the promise was resolved. Wait again
wakeupPromises.set(
wokenUpNode.id,
wokenUpNode.waitForWakeup().then(() =>
wokenUpNode
),
);
});
// Pause until all sleeping nodes have been processed
yield () => Promise.all(tasks);
continue;
}
// Once the node has woken up, remove it from the list and rebuild its routes
wakeupPromises.delete(wokenUpNode.id);
yield* doRebuildRoutes(wokenUpNode.id);
}
}

self.driver.controllerLog.print(
Expand Down Expand Up @@ -4857,36 +4874,40 @@ export class ZWaveController
private rebuildNodeRoutesInternal(
nodeId: number,
): Promise<boolean> {
// Don't start the process twice
const existingTask = this.driver.scheduler.findTask<boolean>((t) =>
t.tag?.id === "rebuild-node-routes" && t.tag.nodeId === nodeId
);
if (existingTask) return existingTask;

const node = this.nodes.getOrThrow(nodeId);
return this.driver.scheduler.queueTask(
this.getRebuildNodeRoutesTask(node),
);
const task = this.getRebuildNodeRoutesTask(node);
if (task instanceof Promise) return task;

return this.driver.scheduler.queueTask(task);
}

private getRebuildNodeRoutesTask(
node: ZWaveNode,
): TaskBuilder<boolean> {
let keepAwake: boolean;
): Promise<boolean> | TaskBuilder<boolean> {
// This task should only run once at a time
const existingTask = this.driver.scheduler.findTask<boolean>((t) =>
t.tag?.id === "rebuild-node-routes" && t.tag.nodeId === node.id
);
if (existingTask) return existingTask;

const self = this;
let keepAwake: boolean;

return {
// This task is executed by users and by the network-wide route rebuilding process.
// Since it can possibly be spawned by a "wait for wakeup" task aswell, we need to
// increment the priority by 2 here to avoid blocking.
priority: TaskPriority.Lower - 2,
priority: TaskPriority.Lower,
tag: { id: "rebuild-node-routes", nodeId: node.id },
task: async function* rebuildNodeRoutesTask() {
// Keep battery powered nodes awake during the process
keepAwake = node.keepAwake;
node.keepAwake = true;

if (
node.canSleep && node.supportsCC(CommandClasses["Wake Up"])
) {
yield () => node.waitForWakeup();
}

self.driver.controllerLog.logNode(node.id, {
message: `Rebuilding routes...`,
direction: "none",
Expand Down
3 changes: 1 addition & 2 deletions packages/zwave-js/src/lib/controller/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,5 @@ export function sdkVersionLte(
/** Checks if a task belongs to a route rebuilding process */
export function isRebuildRoutesTask(t: Task<unknown>): boolean {
return t.tag?.id === "rebuild-routes"
|| t.tag?.id === "rebuild-node-routes"
|| t.tag?.id === "rebuild-node-routes-wakeup";
|| t.tag?.id === "rebuild-node-routes";
}
96 changes: 76 additions & 20 deletions packages/zwave-js/src/lib/driver/Task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
type TaskBuilder,
TaskInterruptBehavior,
TaskPriority,
type TaskReturnType,
TaskScheduler,
} from "./Task";

Expand Down Expand Up @@ -686,8 +687,7 @@ test("Tasks can yield-queue higher-priority tasks", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
yield () => inner;
yield innerBuilder;
order.push("outer");
},
});
Expand Down Expand Up @@ -715,8 +715,7 @@ test("Tasks can yield-queue same-priority tasks", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
yield () => inner;
yield innerBuilder;
order.push("outer");
},
});
Expand Down Expand Up @@ -744,15 +743,71 @@ test.failing("Tasks cannot yield-queue lower-priority tasks", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
yield () => inner;
yield innerBuilder;
order.push("outer");
},
});

await outer;
});

test("Yielding tasks multiple levels deep works", async (t) => {
const scheduler = new TaskScheduler();
scheduler.start();

const order: string[] = [];
const yieldedPromise = createDeferredPromise<void>();

const innerinnerBuilder: TaskBuilder<void> = {
name: "innerinner",
priority: TaskPriority.Normal,
task: async function*() {
yield;
order.push("innerinner1");
yield () => yieldedPromise;
order.push("innerinner2");
},
};

const innerBuilder: TaskBuilder<void> = {
name: "inner",
priority: TaskPriority.Normal,
task: async function*() {
yield;
order.push("inner1");
yield innerinnerBuilder;
order.push("inner2");
},
};

const outer = scheduler.queueTask({
name: "outer",
priority: TaskPriority.Normal,
task: async function*() {
order.push("outer1");
yield innerBuilder;
order.push("outer2");
},
});

// Wait long enough that the task is definitely waiting for the promise
await wait(10);
t.deepEqual(order, ["outer1", "inner1", "innerinner1"]);

// Run to completion
yieldedPromise.resolve();
await outer;

t.deepEqual(order, [
"outer1",
"inner1",
"innerinner1",
"innerinner2",
"inner2",
"outer2",
]);
});

test("Tasks receive the result of yielded tasks", async (t) => {
const scheduler = new TaskScheduler();
scheduler.start();
Expand All @@ -769,8 +824,9 @@ test("Tasks receive the result of yielded tasks", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
const result = (yield () => inner) as Awaited<typeof inner>;
const result = (yield innerBuilder) as TaskReturnType<
typeof innerBuilder
>;
return result;
},
});
Expand Down Expand Up @@ -803,11 +859,13 @@ test("Tasks receive the result of yielded tasks, part 2", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner1 = scheduler.queueTask(inner1Builder);
const result1 = (yield () => inner1) as Awaited<typeof inner1>;
const result1 = (yield inner1Builder) as TaskReturnType<
typeof inner1Builder
>;
const result2 = (yield) as any;
const inner3 = scheduler.queueTask(inner3Builder);
const result3 = (yield () => inner3) as Awaited<typeof inner3>;
const result3 = (yield inner3Builder) as TaskReturnType<
typeof inner3Builder
>;
return result1 + (result2 ?? "") + result3;
},
});
Expand All @@ -830,12 +888,11 @@ test("Tasks receive the result of yielded tasks, part 3", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
try {
const ret = (yield () => inner) as any;
return ret;
yield innerBuilder;
throw new Error("This should not happen");
} catch (e) {
return e;
return e as Error;
}
},
});
Expand Down Expand Up @@ -1186,8 +1243,7 @@ test("Canceling nested tasks works", async (t) => {
priority: TaskPriority.Normal,
task: async function*() {
order.push("1a");
const inner = scheduler.queueTask(innerBuilder);
yield () => inner;
yield innerBuilder;
order.push("1b");
},
}).catch(noop);
Expand Down Expand Up @@ -1219,9 +1275,8 @@ test("Canceling nested tasks works, part 2", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
try {
yield () => inner;
yield innerBuilder;
} catch (e) {
return "canceled";
}
Expand All @@ -1232,6 +1287,7 @@ test("Canceling nested tasks works, part 2", async (t) => {
await wait(10);

// Cancel all tasks
// FIXME: Restore parent tasks when removing nested tasks
await scheduler.removeTasks((t) => t.name === "inner");

t.is(await outer, "canceled");
Expand Down
Loading
Loading