Skip to content

Commit

Permalink
Diagnostics / bug fix for WF queues (#654)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuck-dbos authored Oct 29, 2024
1 parent 9f293e5 commit 944dffe
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/eventreceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,4 @@ export interface DBOSEventReceiverQuery
endTime?: number;
startSeq?: bigint;
endSeq?: bigint;
}
}
2 changes: 2 additions & 0 deletions src/system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1034,10 +1034,12 @@ export class PostgresSystemDatabase implements SystemDatabase {
if (queue.rateLimit && numRecentQueries >= queue.rateLimit.limitPerPeriod) {
break;
}

const res = await trx<workflow_status>(`${DBOSExecutor.systemDBSchemaName}.workflow_status`)
.where('workflow_uuid', id)
.andWhere('status', StatusString.ENQUEUED)
.update('status', StatusString.PENDING);

if (res > 0) {
claimedIDs.push(id);
await trx<workflow_queue>(`${DBOSExecutor.systemDBSchemaName}.workflow_queue`)
Expand Down
18 changes: 12 additions & 6 deletions src/testing/testing_runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export interface TestingRuntime {

destroy(): Promise<void>; // Release resources after tests.
deactivateEventReceivers(): Promise<void>; // Deactivate event receivers.
initEventReceivers(): Promise<void>; // Init / reactivate event receivers.
}

/**
Expand Down Expand Up @@ -113,9 +114,6 @@ export class TestingRuntimeImpl implements TestingRuntime {
await this.#dbosExec.init(userClasses);
this.#server = new DBOSHttpServer(this.#dbosExec);
await this.initEventReceivers();
this.#scheduler = new DBOSScheduler(this.#dbosExec);
this.#scheduler.initScheduler();
this.#wfQueueRunner = wfQueueRunner.dispatchLoop(this.#dbosExec);
this.#applicationConfig = this.#dbosExec.config.application ?? {};
this.#isInitialized = true;
}
Expand All @@ -124,6 +122,9 @@ export class TestingRuntimeImpl implements TestingRuntime {
for (const evtRcvr of this.#dbosExec!.eventReceivers) {
await evtRcvr.initialize(this.#dbosExec!);
}
this.#scheduler = new DBOSScheduler(this.#dbosExec!);
this.#scheduler.initScheduler();
this.#wfQueueRunner = wfQueueRunner.dispatchLoop(this.#dbosExec!);
}

/**
Expand All @@ -140,7 +141,13 @@ export class TestingRuntimeImpl implements TestingRuntime {

async deactivateEventReceivers() {
for (const evtRcvr of this.#server?.dbosExec?.eventReceivers || []) {
await evtRcvr.destroy();
try {
await evtRcvr.destroy();
}
catch (err) {
const e = err as Error;
this.#server?.dbosExec?.logger.warn(`Error destroying event receiver: ${e.message}`);
}
}
await this.#scheduler?.destroyScheduler();
try {
Expand All @@ -150,8 +157,7 @@ export class TestingRuntimeImpl implements TestingRuntime {
catch (err) {
const e = err as Error;
this.#server?.dbosExec?.logger.warn(`Error destroying workflow queue runner: ${e.message}`);
}

}
}

/**
Expand Down
19 changes: 17 additions & 2 deletions src/wfqueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,29 @@ class WFQueueRunner

// Check queues
for (const [_qn, q] of this.wfQueuesByName) {
const wfids = await exec.systemDatabase.findAndMarkStartableWorkflows(q);
let wfids: string[];
try {
wfids = await exec.systemDatabase.findAndMarkStartableWorkflows(q);
}
catch (e) {
const err = e as Error;
exec.logger.warn(`Error getting startable workflows: ${err.message}`);
// On the premise that this was a transaction conflict error, just try again later.
wfids = [];
}

if (wfids.length > 0) {
await debugTriggerPoint(DEBUG_TRIGGER_WORKFLOW_QUEUE_START);
}

for (const wfid of wfids) {
const _wfh = await exec.executeWorkflowUUID(wfid);
try {
const _wfh = await exec.executeWorkflowUUID(wfid);
}
catch (e) {
exec.logger.warn(`Could not execute workflow with id ${wfid}`);
exec.logger.warn(e);
}
}
}
}
Expand Down

0 comments on commit 944dffe

Please sign in to comment.