Skip to content

Commit

Permalink
Workflow Invocation Methods (#432)
Browse files Browse the repository at this point in the history
This PR introduces two new methods for invoking workflows from handlers.
For workflow bar in class Foo with argument arg:

- `ctxt.startWorkflow(Foo).bar(arg)` starts the workflow and returns its
handle. This is equivalent to the current `ctxt.invoke(Foo).bar(arg)`.
- `ctxt.invokeWorkflow(Foo).bar(arg)` executes the workflow and returns
its result. This is equivalent to the current
`ctxt.invoke(foo).bar(arg).then((x)=>x.getResult())`. The latter is very
unintuitive and is tripping users up.

To avoid confusion, we also deprecate calling `invoke` on workflows--use
one of the new methods instead.

My hope is that this makes it much clearer to users what is actually
happening when a handler invokes a workflow.
  • Loading branch information
kraftp authored May 13, 2024
1 parent 18aa06d commit 480507b
Show file tree
Hide file tree
Showing 19 changed files with 208 additions and 137 deletions.
3 changes: 3 additions & 0 deletions src/dbos-runtime/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ program
.option("-c, --configfile <string>", "Specify the config file path (DEPRECATED)")
.option("-d, --appDir <string>", "Specify the application root directory")
.action(async (options: DBOSCLIStartOptions) => {
if (options?.configfile) {
console.warn('\x1b[33m%s\x1b[0m', "The --configfile option is deprecated. Please use --appDir instead.");
}
const [dbosConfig, runtimeConfig]: [DBOSConfig, DBOSRuntimeConfig] = parseConfigFile(options);
const runtime = new DBOSRuntime(dbosConfig, runtimeConfig);
await runtime.initAndStart();
Expand Down
3 changes: 0 additions & 3 deletions src/dbos-runtime/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,6 @@ function prettyPrintAjvErrors(validate: ValidateFunction<unknown>) {
* Considers DBOSCLIStartOptions if provided, which takes precedence over config file
* */
export function parseConfigFile(cliOptions?: DBOSCLIStartOptions, useProxy: boolean = false): [DBOSConfig, DBOSRuntimeConfig] {
if (cliOptions?.configfile) {
console.warn('\x1b[33m%s\x1b[0m', "The --configfile option is deprecated. Please use --appDir instead.");
}
if (cliOptions?.appDir) {
process.chdir(cliOptions.appDir)
}
Expand Down
11 changes: 10 additions & 1 deletion src/debugger/debug_workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,21 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
}

// Invoke the debugWorkflow() function instead.
async childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
async startChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
const funcId = this.functionIDGetIncrement();
const childUUID: string = this.workflowUUID + "-" + funcId;
return this.#dbosExec.debugWorkflow(wf, { parentCtx: this, workflowUUID: childUUID }, this.workflowUUID, funcId, ...args);
}

async invokeChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<R> {
return this.startChildWorkflow(wf, ...args).then((handle) => handle.getResult());
}

// Deprecated
async childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
return this.startChildWorkflow(wf, ...args);
}

async send<T extends NonNullable<any>>(_destinationUUID: string, _message: T, _topic?: string | undefined): Promise<void> {
const functionID: number = this.functionIDGetIncrement();

Expand Down
55 changes: 40 additions & 15 deletions src/httpServer/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ import { APITypes, ArgSources } from "./handlerTypes";

// local type declarations for workflow functions
type WFFunc = (ctxt: WorkflowContext, ...args: any[]) => Promise<any>;
export type InvokeFuncs<T> = WFInvokeFuncs<T> & HandlerWfFuncs<T>;
export type InvokeFuncs<T> = WFInvokeFuncs<T> & AsyncHandlerWfFuncs<T>;

type HandlerWfFuncs<T> = {
export type AsyncHandlerWfFuncs<T> = {
[P in keyof T as T[P] extends WFFunc ? P : never]: T[P] extends WFFunc ? (...args: TailParameters<T[P]>) => Promise<WorkflowHandle<Awaited<ReturnType<T[P]>>>> : never;
}

export type SyncHandlerWfFuncs<T> = {
[P in keyof T as T[P] extends WFFunc ? P : never]: T[P] extends WFFunc ? (...args: TailParameters<T[P]>) => Promise<Awaited<ReturnType<T[P]>>> : never;
}

export interface HandlerContext extends DBOSContext {
readonly koaContext: Koa.Context;
invoke<T extends object>(targetClass: T, workflowUUID?: string): InvokeFuncs<T>;
invokeWorkflow<T extends object>(targetClass: T, workflowUUID?: string): SyncHandlerWfFuncs<T>;
startWorkflow<T extends object>(targetClass: T, workflowUUID?: string): AsyncHandlerWfFuncs<T>;
retrieveWorkflow<R>(workflowUUID: string): WorkflowHandle<R>;
send<T extends NonNullable<any>>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise<void>;
getEvent<T extends NonNullable<any>>(workflowUUID: string, key: string, timeoutSeconds?: number): Promise<T | null>;
Expand Down Expand Up @@ -111,27 +117,46 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex
* Generate a proxy object for the provided class that wraps direct calls (i.e. OpClass.someMethod(param))
* to use WorkflowContext.Transaction(OpClass.someMethod, param);
*/
invoke<T extends object>(object: T, workflowUUID?: string): InvokeFuncs<T> {
mainInvoke<T extends object>(object: T, workflowUUID: string | undefined, asyncWf: boolean): InvokeFuncs<T> {
const ops = getRegisteredOperations(object);

const proxy: any = {};
const params = { workflowUUID: workflowUUID, parentCtx: this };
for (const op of ops) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.txnConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#transaction(op.registeredFunction as Transaction<any[], any>, params, ...args)
: op.workflowConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#workflow(op.registeredFunction as Workflow<any[], any>, params, ...args)
: op.commConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#external(op.registeredFunction as Communicator<any[], any>, params, ...args)
: undefined;
if (asyncWf) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.txnConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#transaction(op.registeredFunction as Transaction<any[], any>, params, ...args)
: op.workflowConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#workflow(op.registeredFunction as Workflow<any[], any>, params, ...args)
: op.commConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#external(op.registeredFunction as Communicator<any[], any>, params, ...args)
: undefined;
} else {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.workflowConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#workflow(op.registeredFunction as Workflow<any[], any>, params, ...args).then((handle) => handle.getResult())
: undefined;
}
}
return proxy as InvokeFuncs<T>;
}

invoke<T extends object>(object: T, workflowUUID?: string): InvokeFuncs<T> {
return this.mainInvoke(object, workflowUUID, true);
}

startWorkflow<T extends object>(object: T, workflowUUID?: string): AsyncHandlerWfFuncs<T> {
return this.mainInvoke(object, workflowUUID, true);
}

invokeWorkflow<T extends object>(object: T, workflowUUID?: string): SyncHandlerWfFuncs<T> {
return this.mainInvoke(object, workflowUUID, false) as unknown as SyncHandlerWfFuncs<T>;
}

//////////////////////
/* PRIVATE METHODS */
/////////////////////
Expand Down
50 changes: 36 additions & 14 deletions src/testing/testing_runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Communicator } from "../communicator";
import { HTTPRequest, DBOSContextImpl } from "../context";
import { getRegisteredOperations } from "../decorators";
import { DBOSConfigKeyTypeError, DBOSError } from "../error";
import { InvokeFuncs } from "../httpServer/handler";
import { AsyncHandlerWfFuncs, InvokeFuncs, SyncHandlerWfFuncs } from "../httpServer/handler";
import { DBOSHttpServer } from "../httpServer/server";
import { DBOSExecutor, DBOSConfig } from "../dbos-executor";
import { dbosConfigFilePath, parseConfigFile } from "../dbos-runtime/config";
Expand All @@ -22,7 +22,7 @@ import { DBOSScheduler } from "../scheduler/scheduler";
* Create a testing runtime. Warn: this function will drop the existing system DB and create a clean new one. Don't run tests against your production database!
*/
export async function createTestingRuntime(userClasses: object[], configFilePath: string = dbosConfigFilePath, dropSysDB: boolean = true): Promise<TestingRuntime> {
const [ dbosConfig ] = parseConfigFile({configfile: configFilePath});
const [dbosConfig] = parseConfigFile({ configfile: configFilePath });

if (dropSysDB) {
// Drop system database. Testing runtime always uses Postgres for local testing.
Expand Down Expand Up @@ -50,6 +50,8 @@ export interface WorkflowInvokeParams {

export interface TestingRuntime {
invoke<T extends object>(targetClass: T, workflowUUID?: string, params?: WorkflowInvokeParams): InvokeFuncs<T>;
invokeWorkflow<T extends object>(targetClass: T, workflowUUID?: string, params?: WorkflowInvokeParams): SyncHandlerWfFuncs<T>;
startWorkflow<T extends object>(targetClass: T, workflowUUID?: string, params?: WorkflowInvokeParams): AsyncHandlerWfFuncs<T>;
retrieveWorkflow<R>(workflowUUID: string): WorkflowHandle<R>;
send<T extends NonNullable<any>>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise<void>;
getEvent<T extends NonNullable<any>>(workflowUUID: string, key: string, timeoutSeconds?: number): Promise<T | null>;
Expand Down Expand Up @@ -139,7 +141,7 @@ export class TestingRuntimeImpl implements TestingRuntime {
* Generate a proxy object for the provided class that wraps direct calls (i.e. OpClass.someMethod(param))
* to invoke workflows, transactions, and communicators;
*/
invoke<T extends object>(object: T, workflowUUID?: string, params?: WorkflowInvokeParams): InvokeFuncs<T> {
mainInvoke<T extends object>(object: T, workflowUUID: string | undefined, params: WorkflowInvokeParams | undefined, asyncWf: boolean): InvokeFuncs<T> {
const dbosExec = this.getDBOSExec();
const ops = getRegisteredOperations(object);

Expand All @@ -154,21 +156,41 @@ export class TestingRuntimeImpl implements TestingRuntime {

const wfParams: WorkflowParams = { workflowUUID: workflowUUID, parentCtx: oc };
for (const op of ops) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.txnConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => dbosExec.transaction(op.registeredFunction as Transaction<any[], any>, wfParams, ...args)
: op.workflowConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => dbosExec.workflow(op.registeredFunction as Workflow<any[], any>, wfParams, ...args)
: op.commConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => dbosExec.external(op.registeredFunction as Communicator<any[], any>, wfParams, ...args)
: undefined;
if (asyncWf) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.txnConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => dbosExec.transaction(op.registeredFunction as Transaction<any[], any>, wfParams, ...args)
: op.workflowConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => dbosExec.workflow(op.registeredFunction as Workflow<any[], any>, wfParams, ...args)
: op.commConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => dbosExec.external(op.registeredFunction as Communicator<any[], any>, wfParams, ...args)
: undefined;
} else {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.workflowConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => dbosExec.workflow(op.registeredFunction as Workflow<any[], any>, wfParams, ...args).then((handle) => handle.getResult())
: undefined;
}
}
return proxy as InvokeFuncs<T>;
}

invoke<T extends object>(object: T, workflowUUID?: string, params?: WorkflowInvokeParams): InvokeFuncs<T> {
return this.mainInvoke(object, workflowUUID, params, true);
}

startWorkflow<T extends object>(object: T, workflowUUID?: string, params?: WorkflowInvokeParams): AsyncHandlerWfFuncs<T> {
return this.mainInvoke(object, workflowUUID, params, true);
}

invokeWorkflow<T extends object>(object: T, workflowUUID?: string, params?: WorkflowInvokeParams): SyncHandlerWfFuncs<T> {
return this.mainInvoke(object, workflowUUID, params, false) as unknown as SyncHandlerWfFuncs<T>;
}

/**
* Return a request handler callback for node's native http/http2 server, which includes all registered HTTP endpoints.
*/
Expand Down
17 changes: 14 additions & 3 deletions src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ export const StatusString = {

export interface WorkflowContext extends DBOSContext {
invoke<T extends object>(targetClass: T): WFInvokeFuncs<T>;
childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>>;
startChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>>;
invokeChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<R>;
childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>>; // Deprecated, calls startChildWorkflow

send<T extends NonNullable<any>>(destinationUUID: string, message: T, topic?: string): Promise<void>;
recv<T extends NonNullable<any>>(topic?: string, timeoutSeconds?: number): Promise<T | null>;
Expand Down Expand Up @@ -237,16 +239,25 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont
/**
* Invoke another workflow as its child workflow and return a workflow handle.
* The child workflow is guaranteed to be executed exactly once, even if the workflow is retried with the same UUID.
* We pass in itself as a parent context adn assign the child workflow with a deterministic UUID "this.workflowUUID-functionID", which appends a function ID to its own UUID.
* We pass in itself as a parent context and assign the child workflow with a deterministic UUID "this.workflowUUID-functionID".
* We also pass in its own workflowUUID and function ID so the invoked handle is deterministic.
*/
async childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
async startChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
// Note: cannot use invoke for childWorkflow because of potential recursive types on the workflow itself.
const funcId = this.functionIDGetIncrement();
const childUUID: string = this.workflowUUID + "-" + funcId;
return this.#dbosExec.internalWorkflow(wf, { parentCtx: this, workflowUUID: childUUID }, this.workflowUUID, funcId, ...args);
}

async invokeChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<R> {
return this.startChildWorkflow(wf, ...args).then((handle) => handle.getResult());
}

// Deprecated
async childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
return this.startChildWorkflow(wf, ...args);
}

/**
* Execute a transactional function.
* The transaction is guaranteed to execute exactly once, even if the workflow is retried with the same UUID.
Expand Down
9 changes: 4 additions & 5 deletions tests/concurrency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ describe("concurrency-tests", () => {
// The second transaction should get the correct recorded execution without being executed.
const uuid = uuidv1();
await testRuntime
.invoke(ConcurrTestClass, uuid)
.testWorkflow()
.then((x) => x.getResult());
.invokeWorkflow(ConcurrTestClass, uuid)
.testWorkflow();
const handle = await testRuntime.invoke(ConcurrTestClass, uuid).testWorkflow();
await ConcurrTestClass.promise2;

Expand Down Expand Up @@ -83,8 +82,8 @@ describe("concurrency-tests", () => {
// It's a bit hard to trigger conflicting send because the transaction runs quickly.
const recvUUID = uuidv1();
const recvResPromise = Promise.allSettled([
testRuntime.invoke(ConcurrTestClass, recvUUID).receiveWorkflow( "testTopic", 2).then((x) => x.getResult()),
testRuntime.invoke(ConcurrTestClass, recvUUID).receiveWorkflow( "testTopic", 2).then((x) => x.getResult()),
testRuntime.invokeWorkflow(ConcurrTestClass, recvUUID).receiveWorkflow( "testTopic", 2),
testRuntime.invokeWorkflow(ConcurrTestClass, recvUUID).receiveWorkflow( "testTopic", 2),
]);

// Send would trigger both to receive, but only one can succeed.
Expand Down
Loading

0 comments on commit 480507b

Please sign in to comment.