Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled workflows #411

Merged
merged 59 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
0cedd87
First try at decorators
chuck-dbos Apr 24, 2024
96378d7
Add a test skeleton
chuck-dbos Apr 24, 2024
92db30a
Log registered kafka endpoints
chuck-dbos Apr 25, 2024
3d760b0
Registration and list
chuck-dbos Apr 25, 2024
d295468
Crontab parse
chuck-dbos Apr 25, 2024
3dfd59c
Add pattern validator
chuck-dbos Apr 25, 2024
e90eda3
More toward this
chuck-dbos Apr 25, 2024
1fa17a7
More conversion tests
chuck-dbos Apr 25, 2024
c525826
Validator
chuck-dbos Apr 25, 2024
825f7b5
Time matcher and test (doesn't work)
chuck-dbos Apr 26, 2024
0889b70
Silly
chuck-dbos Apr 26, 2024
8155475
One way
chuck-dbos Apr 26, 2024
95d6474
Maybe this is better
chuck-dbos Apr 26, 2024
a197675
So
chuck-dbos Apr 26, 2024
77937be
Loop w/ calendar
chuck-dbos Apr 26, 2024
e2163c7
Schedule loop is looping!
chuck-dbos Apr 26, 2024
5f3b265
Merge remote-tracking branch 'origin/main' into chuck/wfsched
chuck-dbos Apr 29, 2024
93d7d19
;
chuck-dbos Apr 29, 2024
fd52645
Invoke appropriate workflow
chuck-dbos Apr 29, 2024
4430f75
Logging
chuck-dbos Apr 29, 2024
22a7366
Add schema
chuck-dbos Apr 30, 2024
8fd5ff2
Idea behind system DB functions
chuck-dbos Apr 30, 2024
5d28869
Stub the new functions
chuck-dbos Apr 30, 2024
eeb6096
SQL, maybe
chuck-dbos Apr 30, 2024
99e504a
Make up missing work
chuck-dbos May 1, 2024
b833724
Slightly messy outstanding inst tracking
chuck-dbos May 1, 2024
3e8b3a1
Dead code
chuck-dbos May 2, 2024
008c94e
Merge remote-tracking branch 'origin/main' into chuck/wfsched
chuck-dbos May 2, 2024
4abe988
Fixes
chuck-dbos May 2, 2024
dd09b3d
Address review comments from Max
chuck-dbos May 3, 2024
e52d4ba
Update test
chuck-dbos May 3, 2024
b5c3beb
Merge remote-tracking branch 'origin/main' into chuck/wfsched
chuck-dbos May 3, 2024
02cbc97
Adjust test
chuck-dbos May 3, 2024
a2a801a
Set up sched in testing runtime
chuck-dbos May 3, 2024
b5c8587
Accidental imports
chuck-dbos May 3, 2024
0cfff39
Simplify teardown logic
chuck-dbos May 3, 2024
df6ebf2
Update src/scheduler/scheduler.ts
chuck-dbos May 4, 2024
2470855
Update src/scheduler/scheduler.ts
chuck-dbos May 4, 2024
ce85dbf
Adjust comment
chuck-dbos May 4, 2024
03258c8
Merge branch 'chuck/wfsched' of github.com:dbos-inc/operon into chuck…
chuck-dbos May 4, 2024
77b245b
Merge remote-tracking branch 'origin/main' into chuck/wfsched
chuck-dbos May 4, 2024
c9a714e
Simplify
chuck-dbos May 4, 2024
0ef0b50
Dead code
chuck-dbos May 4, 2024
a11b9fb
Merge remote-tracking branch 'origin/main' into chuck/wfsched
chuck-dbos May 6, 2024
d1806d0
Merge remote-tracking branch 'origin/main' into chuck/wfsched
chuck-dbos May 7, 2024
28b0afd
Remove running instance tracking
chuck-dbos May 7, 2024
630284b
Add a way to skip missed workflows
chuck-dbos May 7, 2024
35281c3
Unused imports
chuck-dbos May 7, 2024
e5539ad
Clarify that it is ms
chuck-dbos May 7, 2024
ff5da5f
Unused
chuck-dbos May 7, 2024
4266d27
Unneeded lintsmog
chuck-dbos May 8, 2024
e757cbe
Add a mode without makeup work
chuck-dbos May 8, 2024
1a000da
Renames
chuck-dbos May 8, 2024
0a7b2da
Merge remote-tracking branch 'origin/main' into chuck/wfsched
chuck-dbos May 9, 2024
3f14989
Remove config setting for scheduler
chuck-dbos May 9, 2024
3ff1e39
Remove method reg
chuck-dbos May 9, 2024
f10dbdd
A comment
chuck-dbos May 9, 2024
1172e11
Update src/scheduler/scheduler.ts
chuck-dbos May 9, 2024
bb9991a
Update src/scheduler/scheduler.ts
chuck-dbos May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dbos-config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@
}
}
},
"skip_missed_scheduled_workflows": {
qianl15 marked this conversation as resolved.
Show resolved Hide resolved
"type": "boolean"
},
"application": {},
"env": {},
"version": {
Expand Down
13 changes: 13 additions & 0 deletions migrations/20240430090000_tables.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
exports.up = function(knex) {
return knex.schema.withSchema('dbos')
.createTable('scheduler_state', function(table) {
table.text('wf_function').notNullable();
table.bigInteger('last_wf_sched_time').notNullable();
table.primary(['wf_function']);
})
chuck-dbos marked this conversation as resolved.
Show resolved Hide resolved
};

exports.down = function(knex) {
return knex.schema.withSchema('dbos')
.dropTableIfExists('scheduler_state');
};
5 changes: 5 additions & 0 deletions schemas/system_db_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ export interface workflow_inputs {
workflow_uuid: string;
inputs: string;
}

export interface scheduler_state {
wf_function: string;
last_wf_sched_time: number; // Time that has certainly been kicked off; others may have but OAOO will cover that
}
4 changes: 3 additions & 1 deletion src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export interface DBOSConfig {
readonly application?: object;
readonly debugProxy?: string;
readonly debugMode?: boolean;
readonly skipMissedScheduledWorkflows?: boolean;
readonly http?: {
readonly cors_middleware?: boolean;
readonly credentials?: boolean;
Expand Down Expand Up @@ -260,7 +261,7 @@ export class DBOSExecutor {
for (const ro of registeredClassOperations) {
if (ro.workflowConfig) {
const wf = ro.registeredFunction as Workflow<any, any>;
this.#registerWorkflow(wf, ro.workflowConfig);
this.#registerWorkflow(wf, {registrationObject: ro, ...ro.workflowConfig});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as the other one: can we simply pass schedulerConfig here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think for now we should pass in schedulerConfig only. Because it's confusing to pass both ro and ro.workflowConfig which is redundant.

this.logger.debug(`Registered workflow ${ro.name}`);
} else if (ro.txnConfig) {
const tx = ro.registeredFunction as Transaction<any, any>;
Expand Down Expand Up @@ -432,6 +433,7 @@ export class DBOSExecutor {

const runWorkflow = async () => {
let result: R;

// Execute the workflow.
try {
result = await wf(wCtxt, ...args);
Expand Down
2 changes: 2 additions & 0 deletions src/dbos-runtime/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export interface ConfigFile {
application: object;
env: Record<string, string>;
runtimeConfig?: DBOSRuntimeConfig;
skip_missed_scheduled_workflows?:boolean;
}

/*
Expand Down Expand Up @@ -195,6 +196,7 @@ export function parseConfigFile(cliOptions?: DBOSCLIStartOptions, useProxy: bool
telemetry: configFile.telemetry || undefined,
system_database: configFile.database.sys_db_name ?? `${poolConfig.database}_dbos_sys`,
application: configFile.application || undefined,
skipMissedScheduledWorkflows: configFile.skip_missed_scheduled_workflows ?? false,
env: configFile.env || {},
http: configFile.http
};
Expand Down
7 changes: 7 additions & 0 deletions src/dbos-runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import path from 'node:path';
import { Server } from 'http';
import { pathToFileURL } from 'url';
import { DBOSKafka } from '../kafka/kafka';
import { DBOSScheduler } from '../scheduler/scheduler';

interface ModuleExports {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -24,6 +25,7 @@ export class DBOSRuntime {
private dbosExec: DBOSExecutor | null = null;
private servers: { appServer: Server; adminServer: Server } | undefined;
private kafka: DBOSKafka | null = null;
private scheduler: DBOSScheduler | null = null;

constructor(dbosConfig: DBOSConfig, private readonly runtimeConfig: DBOSRuntimeConfig) {
// Initialize workflow executor.
Expand All @@ -44,6 +46,10 @@ export class DBOSRuntime {
this.dbosExec.logRegisteredHTTPUrls();
this.kafka = new DBOSKafka(this.dbosExec);
await this.kafka.initKafka();
this.kafka.logRegisteredKafkaEndpoints();
this.scheduler = new DBOSScheduler(this.dbosExec);
this.scheduler.initScheduler();
this.scheduler.logRegisteredSchedulerEndpoints();
} catch (error) {
this.dbosExec?.logger.error(error);
if (error instanceof DBOSFailLoadOperationsError) {
Expand Down Expand Up @@ -97,6 +103,7 @@ export class DBOSRuntime {
* Shut down the HTTP server and destroy workflow executor.
*/
async destroy() {
await this.scheduler?.destroyScheduler();
await this.kafka?.destroyKafka();
if (this.servers) {
this.servers.appServer.close();
Expand Down
10 changes: 9 additions & 1 deletion src/foundationdb/fdb_system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,15 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
}


async sleep(workflowUUID: string, functionID: number, durationSec: number): Promise<void> {
async sleep(_workflowUUID: string, _functionID: number, durationSec: number): Promise<void> {
await sleep(durationSec * 1000); // TODO: Implement
}

/* SCHEDULER */
getLastScheduledTime(_wfn: string): Promise<number | null> {
return Promise.resolve(null);
}
setLastScheduledTime(_wfn: string, _invtime: number): Promise<number | null> {
return Promise.resolve(null);
}
}
8 changes: 7 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,10 @@ export {
export {
Kafka,
KafkaConsume,
} from "./kafka/kafka"
} from "./kafka/kafka";

export {
SchedulerConcurrencyMode,
SchedulerConfig,
Scheduled,
} from "./scheduler/scheduler";
20 changes: 16 additions & 4 deletions src/kafka/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ export class DBOSKafka{
throw new DBOSError(`Error registering method ${defaults.name}.${ro.name}: Kafka configuration not found. Does class ${defaults.name} have an @Kafka decorator?`)
}
const kafka = new KafkaJS(defaults.kafkaConfig);
const consumerConfig = ro.consumerConfig ?? { groupId: `dbos-kafka-group-${ro.kafkaTopic}`}
const consumerConfig = ro.consumerConfig ?? { groupId: `dbos-kafka-group-${ro.kafkaTopic}`};
const consumer = kafka.consumer(consumerConfig);
await consumer.connect()
await consumer.subscribe({topic: ro.kafkaTopic, fromBeginning: true})
await consumer.connect();
await consumer.subscribe({topic: ro.kafkaTopic, fromBeginning: true});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// This combination uniquely identifies a message for a given Kafka cluster
const workflowUUID = `kafka-unique-id-${topic}-${partition}-${message.offset}`
const wfParams = { workflowUUID: workflowUUID };
// All operations annotated with Kafka decorators must take in these three arguments
const args: KafkaArgs = [topic, partition, message]
const args: KafkaArgs = [topic, partition, message];
// We can only guarantee exactly-once-per-message execution of transactions and workflows.
if (ro.txnConfig) {
// Execute the transaction
Expand All @@ -113,4 +113,16 @@ export class DBOSKafka{
await consumer.disconnect();
}
}

logRegisteredKafkaEndpoints() {
const logger = this.dbosExec.logger;
logger.info("Kafka endpoints supported:");
this.dbosExec.registeredOperations.forEach((registeredOperation) => {
const ro = registeredOperation as KafkaRegistration<unknown, unknown[], unknown>;
if (ro.kafkaTopic) {
const defaults = ro.defaults as KafkaDefaults;
logger.info(` ${ro.kafkaTopic} -> ${defaults.name}.${ro.name}`);
}
});
}
}
Loading