Skip to content

Commit

Permalink
fix: event should be emitted when task is created by scheduling worker
Browse files Browse the repository at this point in the history
Created task events are useful for long polling to be instantly notified
of a new created tasks.
  • Loading branch information
TBonnin committed Jun 13, 2024
1 parent b3bf65a commit 643fd53
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
10 changes: 10 additions & 0 deletions packages/scheduler/lib/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ export class Scheduler {
});
this.monitor.start();
this.scheduling = new SchedulingWorker({ databaseUrl: dbClient.url, databaseSchema: dbClient.schema });
this.scheduling.on(async (message) => {
const { ids } = message;
for (const taskId of ids) {
const fetched = await tasks.get(this.dbClient.db, taskId);
if (fetched.isOk()) {
const task = fetched.value;
this.onCallbacks[task.state](task);
}
}
});
// TODO: ensure there is only one instance of the scheduler
this.scheduling.start();
} else {
Expand Down
10 changes: 3 additions & 7 deletions packages/scheduler/lib/workers/monitor/monitor.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { setTimeout } from 'node:timers/promises';
import type knex from 'knex';
import { logger } from '../../utils/logger.js';

interface MessageOut {
interface ExpiredTasksMessage {
ids: string[];
}

Expand Down Expand Up @@ -47,7 +47,7 @@ export class MonitorWorker {
}
}

on(callback: (message: MessageOut) => void): void {
on(callback: (message: ExpiredTasksMessage) => void): void {
this.worker?.on('message', callback);
}
}
Expand Down Expand Up @@ -98,14 +98,10 @@ export class MonitorChild {
if (expired.value.length > 0) {
const taskIds = expired.value.map((t) => t.id);
if (taskIds.length > 0 && !this.cancelled) {
this.send({ ids: taskIds });
this.parent.postMessage({ ids: taskIds }); // Notifying parent that tasks have expired
}
logger.info(`Expired tasks: ${JSON.stringify(expired.value.map((t) => t.id))} `);
}
}
}

send(message: MessageOut) {
this.parent.postMessage(message);
}
}
13 changes: 13 additions & 0 deletions packages/scheduler/lib/workers/scheduling/scheduling.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import { logger } from '../../utils/logger.js';
import { dueSchedules } from './scheduling.js';
import * as tasks from '../../models/tasks.js';

interface CreatedTasksMessage {
ids: string[];
}

export class SchedulingWorker {
private worker: Worker | null;
constructor({ databaseUrl, databaseSchema }: { databaseUrl: string; databaseSchema: string }) {
Expand Down Expand Up @@ -43,6 +47,10 @@ export class SchedulingWorker {
this.worker = null;
}
}

on(callback: (message: CreatedTasksMessage) => void): void {
this.worker?.on('message', callback);
}
}

export class SchedulingChild {
Expand Down Expand Up @@ -90,6 +98,7 @@ export class SchedulingChild {
logger.error(`Failed to get due schedules: ${schedules.error}`);
return;
}
const taskIds = [];
for (const schedule of schedules.value) {
const task = await tasks.create(trx, {
scheduleId: schedule.id,
Expand All @@ -107,6 +116,10 @@ export class SchedulingChild {
logger.error(`Failed to create task for schedule: ${schedule.id}`);
return;
}
taskIds.push(task.value.id);
}
if (taskIds.length > 0) {
this.parent.postMessage({ ids: taskIds }); // notifying parent that tasks have been created
}
});
}
Expand Down

0 comments on commit 643fd53

Please sign in to comment.