diff --git a/migrations/20240430090000_tables.js b/migrations/20240430090000_tables.js new file mode 100644 index 000000000..dcc03f8bf --- /dev/null +++ b/migrations/20240430090000_tables.js @@ -0,0 +1,13 @@ +exports.up = function(knex) { + return knex.schema.withSchema('dbos') + .createTable('scheduler_state', function(table) { + table.text('workflow_fn_name').notNullable(); + table.bigInteger('last_run_time').notNullable(); + table.primary(['workflow_fn_name']); + }) +}; + +exports.down = function(knex) { + return knex.schema.withSchema('dbos') + .dropTableIfExists('scheduler_state'); +}; diff --git a/schemas/system_db_schema.ts b/schemas/system_db_schema.ts index b83497ae0..85b97e5f9 100644 --- a/schemas/system_db_schema.ts +++ b/schemas/system_db_schema.ts @@ -34,3 +34,8 @@ export interface workflow_inputs { workflow_uuid: string; inputs: string; } + +export interface scheduler_state { + workflow_fn_name: string; + last_run_time: number; // Time that has certainly been kicked off; others may have but OAOO will cover that +} diff --git a/src/dbos-executor.ts b/src/dbos-executor.ts index f8675e034..190c713ca 100644 --- a/src/dbos-executor.ts +++ b/src/dbos-executor.ts @@ -260,7 +260,7 @@ export class DBOSExecutor { for (const ro of registeredClassOperations) { if (ro.workflowConfig) { const wf = ro.registeredFunction as Workflow; - this.#registerWorkflow(wf, ro.workflowConfig); + this.#registerWorkflow(wf, {...ro.workflowConfig}); this.logger.debug(`Registered workflow ${ro.name}`); } else if (ro.txnConfig) { const tx = ro.registeredFunction as Transaction; @@ -432,6 +432,7 @@ export class DBOSExecutor { const runWorkflow = async () => { let result: R; + // Execute the workflow. try { result = await wf(wCtxt, ...args); diff --git a/src/dbos-runtime/runtime.ts b/src/dbos-runtime/runtime.ts index 944289d1d..74364315c 100644 --- a/src/dbos-runtime/runtime.ts +++ b/src/dbos-runtime/runtime.ts @@ -7,6 +7,7 @@ import path from 'node:path'; import { Server } from 'http'; import { pathToFileURL } from 'url'; import { DBOSKafka } from '../kafka/kafka'; +import { DBOSScheduler } from '../scheduler/scheduler'; interface ModuleExports { // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -24,6 +25,7 @@ export class DBOSRuntime { private dbosExec: DBOSExecutor | null = null; private servers: { appServer: Server; adminServer: Server } | undefined; private kafka: DBOSKafka | null = null; + private scheduler: DBOSScheduler | null = null; constructor(dbosConfig: DBOSConfig, private readonly runtimeConfig: DBOSRuntimeConfig) { // Initialize workflow executor. @@ -44,6 +46,10 @@ export class DBOSRuntime { this.dbosExec.logRegisteredHTTPUrls(); this.kafka = new DBOSKafka(this.dbosExec); await this.kafka.initKafka(); + this.kafka.logRegisteredKafkaEndpoints(); + this.scheduler = new DBOSScheduler(this.dbosExec); + this.scheduler.initScheduler(); + this.scheduler.logRegisteredSchedulerEndpoints(); } catch (error) { this.dbosExec?.logger.error(error); if (error instanceof DBOSFailLoadOperationsError) { @@ -97,6 +103,7 @@ export class DBOSRuntime { * Shut down the HTTP server and destroy workflow executor. */ async destroy() { + await this.scheduler?.destroyScheduler(); await this.kafka?.destroyKafka(); if (this.servers) { this.servers.appServer.close(); diff --git a/src/foundationdb/fdb_system_database.ts b/src/foundationdb/fdb_system_database.ts index 9449280ae..fb19708b4 100644 --- a/src/foundationdb/fdb_system_database.ts +++ b/src/foundationdb/fdb_system_database.ts @@ -388,7 +388,15 @@ export class FoundationDBSystemDatabase implements SystemDatabase { } - async sleep(workflowUUID: string, functionID: number, durationSec: number): Promise { + async sleep(_workflowUUID: string, _functionID: number, durationSec: number): Promise { await sleep(durationSec * 1000); // TODO: Implement } + + /* SCHEDULER */ + getLastScheduledTime(_wfn: string): Promise { + return Promise.resolve(null); + } + setLastScheduledTime(_wfn: string, _invtime: number): Promise { + return Promise.resolve(null); + } } diff --git a/src/index.ts b/src/index.ts index 7e49ed075..d24e6fdef 100644 --- a/src/index.ts +++ b/src/index.ts @@ -93,4 +93,10 @@ export { export { Kafka, KafkaConsume, -} from "./kafka/kafka" +} from "./kafka/kafka"; + +export { + SchedulerMode, + SchedulerConfig, + Scheduled, +} from "./scheduler/scheduler"; diff --git a/src/kafka/kafka.ts b/src/kafka/kafka.ts index 222c16e9f..aba7c25a8 100644 --- a/src/kafka/kafka.ts +++ b/src/kafka/kafka.ts @@ -82,17 +82,17 @@ export class DBOSKafka{ throw new DBOSError(`Error registering method ${defaults.name}.${ro.name}: Kafka configuration not found. Does class ${defaults.name} have an @Kafka decorator?`) } const kafka = new KafkaJS(defaults.kafkaConfig); - const consumerConfig = ro.consumerConfig ?? { groupId: `dbos-kafka-group-${ro.kafkaTopic}`} + const consumerConfig = ro.consumerConfig ?? { groupId: `dbos-kafka-group-${ro.kafkaTopic}`}; const consumer = kafka.consumer(consumerConfig); - await consumer.connect() - await consumer.subscribe({topic: ro.kafkaTopic, fromBeginning: true}) + await consumer.connect(); + await consumer.subscribe({topic: ro.kafkaTopic, fromBeginning: true}); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { // This combination uniquely identifies a message for a given Kafka cluster const workflowUUID = `kafka-unique-id-${topic}-${partition}-${message.offset}` const wfParams = { workflowUUID: workflowUUID }; // All operations annotated with Kafka decorators must take in these three arguments - const args: KafkaArgs = [topic, partition, message] + const args: KafkaArgs = [topic, partition, message]; // We can only guarantee exactly-once-per-message execution of transactions and workflows. if (ro.txnConfig) { // Execute the transaction @@ -113,4 +113,16 @@ export class DBOSKafka{ await consumer.disconnect(); } } + + logRegisteredKafkaEndpoints() { + const logger = this.dbosExec.logger; + logger.info("Kafka endpoints supported:"); + this.dbosExec.registeredOperations.forEach((registeredOperation) => { + const ro = registeredOperation as KafkaRegistration; + if (ro.kafkaTopic) { + const defaults = ro.defaults as KafkaDefaults; + logger.info(` ${ro.kafkaTopic} -> ${defaults.name}.${ro.name}`); + } + }); + } } diff --git a/src/scheduler/crontab.ts b/src/scheduler/crontab.ts new file mode 100644 index 000000000..012cab79c --- /dev/null +++ b/src/scheduler/crontab.ts @@ -0,0 +1,329 @@ +// This code was based on code from node-cron: +// https://github.com/node-cron/node-cron +/* +ISC License +Copyright (c) 2016, Lucas Merencia + +Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +function removeExtraSpaces(str: string): string { + return str.replace(/\s{2,}/g, ' ').trim(); +} + +function prependSecondExpression(expressions: string[]) : string[] +{ + if (expressions.length === 5) { + return ['0'].concat(expressions); + } + return expressions; +} + +const months = ['january','february','march','april','may','june','july', + 'august','september','october','november','december']; +const shortMonths = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', + 'sep', 'oct', 'nov', 'dec']; + +function convertMonthNameI(expression:string, items:string[]) : string { + for (let i = 0; i < items.length; i++){ + expression = expression.replace(new RegExp(items[i], 'gi'), `${i + 1}`); + } + return expression; +} + +function monthNamesConversion(monthExpression: string): string { + monthExpression = convertMonthNameI(monthExpression, months); + monthExpression = convertMonthNameI(monthExpression, shortMonths); + return monthExpression; +} + +const weekDays = ['sunday', 'monday', 'tuesday', 'wednesday', 'thursday', +'friday', 'saturday']; +const shortWeekDays = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat']; + +function convertWeekDayName(weekExpression: string, items: string[]){ + for(let i = 0; i < items.length; i++){ + weekExpression = weekExpression.replace(new RegExp(items[i], 'gi'), `${i}`); + } + return weekExpression; +} + +function weekDayNamesConversion(expression: string){ + expression = expression.replace('7', '0'); + expression = convertWeekDayName(expression, weekDays); + return convertWeekDayName(expression, shortWeekDays); +} + +function convertAsterisk(expression: string, replacement: string){ + if(expression.indexOf('*') !== -1){ + return expression.replace('*', replacement); + } + return expression; +} + +// Based on position in full crontab, convert asterisk to appropriate range +export function convertAsterisksToRanges(expressions: string[]) { + expressions[0] = convertAsterisk(expressions[0], '0-59'); + expressions[1] = convertAsterisk(expressions[1], '0-59'); + expressions[2] = convertAsterisk(expressions[2], '0-23'); + expressions[3] = convertAsterisk(expressions[3], '1-31'); + expressions[4] = convertAsterisk(expressions[4], '1-12'); + expressions[5] = convertAsterisk(expressions[5], '0-6'); + return expressions; +} + +function replaceWithRange(expression: string, text: string, init: string, end:string) { + const numbers: string[] = []; + let last = parseInt(end); + let first = parseInt(init); + + if(first > last){ + last = parseInt(init); + first = parseInt(end); + } + + for(let i = first; i <= last; i++) { + numbers.push(i.toString()); + } + + return expression.replace(new RegExp(text, 'i'), numbers.join()); +} + +function convertRange(expression: string){ + const rangeRegEx = /(\d+)-(\d+)/; + let match = rangeRegEx.exec(expression); + while(match !== null && match.length > 0){ + expression = replaceWithRange(expression, match[0], match[1], match[2]); + match = rangeRegEx.exec(expression); + } + return expression; +} + +function convertAllRanges(expressions: string[]){ + for(let i = 0; i < expressions.length; i++){ + expressions[i] = convertRange(expressions[i]); + } + return expressions; +} + +function convertSteps(expressions: string[]){ + const stepValuePattern = /^(.+)\/(\w+)$/; + for (let i = 0; i < expressions.length; i++){ + const match = stepValuePattern.exec(expressions[i]); + const isStepValue = match !== null && match.length > 0; + if (isStepValue){ + const baseDivider = match[2]; + if(isNaN(parseInt(baseDivider))){ + throw new Error(baseDivider + ' is not a valid step value'); + } + const values = match[1].split(','); + const stepValues: string[] = []; + const divider = parseInt(baseDivider, 10); + for (let j = 0; j <= values.length; j++) { + const value = parseInt(values[j], 10); + if (value % divider === 0){ + stepValues.push(`${value}`); + } + } + expressions[i] = stepValues.join(','); + } + } + return expressions; +} +// Function that takes care of normalization. +function normalizeIntegers(expressions: string[]) { + for (let i=0; i < expressions.length; i++){ + const numbers = expressions[i].split(','); + for (let j=0; j max)) || + !validationRegex.test(option) + ) { + return false; + } + } + + return true; +} + +function isInvalidSecond(expression: string) { return !isValidExpression(expression, 0, 59); } +function isInvalidMinute(expression: string) { return !isValidExpression(expression, 0, 59); } +function isInvalidHour(expression: string) { return !isValidExpression(expression, 0, 23); } +function isInvalidDayOfMonth(expression: string) { return !isValidExpression(expression, 1, 31); } +function isInvalidMonth(expression: string) { return !isValidExpression(expression, 1, 12); } +function isInvalidWeekDay(expression: string) { return !isValidExpression(expression, 0, 7); } + +function validateFields(patterns: string[], executablePatterns :string[]) { + if (isInvalidSecond(executablePatterns[0])) + throw new Error(`${patterns[0]} is a invalid expression for second`); + + if (isInvalidMinute(executablePatterns[1])) + throw new Error(`${patterns[1]} is a invalid expression for minute`); + + if (isInvalidHour(executablePatterns[2])) + throw new Error(`${patterns[2]} is a invalid expression for hour`); + + if (isInvalidDayOfMonth(executablePatterns[3])) + throw new Error( + `${patterns[3]} is a invalid expression for day of month` + ); + + if (isInvalidMonth(executablePatterns[4])) + throw new Error(`${patterns[4]} is a invalid expression for month`); + + if (isInvalidWeekDay(executablePatterns[5])) + throw new Error(`${patterns[5]} is a invalid expression for week day`); +} + +/** + * Validates a Cron-Job expression pattern. + * Throws on error. + */ +export function validateCrontab(pattern: string) { + if (typeof pattern !== 'string') + throw new TypeError('pattern must be a string!'); + + const patterns = pattern.split(' '); + const executablePatterns = convertExpression(pattern).split(' '); + + if (patterns.length === 5) patterns.unshift('0'); + + validateFields(patterns, executablePatterns); + + return executablePatterns.join(' '); +} + +/* +* The node-cron core allows only numbers (including multiple numbers e.g 1,2). +* This module is going to translate the month names, week day names and ranges +* to integers relatives. +* +* Month names example: +* - expression 0 1 1 January,Sep * +* - Will be translated to 0 1 1 1,9 * +* +* Week day names example: +* - expression 0 1 1 2 Monday,Sat +* - Will be translated to 0 1 1 1,5 * +* +* Ranges example: +* - expression 1-5 * * * * +* - Will be translated to 1,2,3,4,5 * * * * +*/ +export function convertExpression(crontab: string){ + let expressions = removeExtraSpaces(crontab).split(' '); + expressions = prependSecondExpression(expressions); + expressions[4] = monthNamesConversion(expressions[4]); + expressions[5] = weekDayNamesConversion(expressions[5]); + expressions = convertAsterisksToRanges(expressions); + expressions = convertAllRanges(expressions); + expressions = convertSteps(expressions); + + expressions = normalizeIntegers(expressions); + + return expressions.join(' '); +} + +////////// +/// Time matcher +////////// + +function matchPattern(pattern: string, nvalue: number){ + const value = `${nvalue}`; + if (pattern.indexOf(',') !== -1) { + const patterns = pattern.split(','); + return patterns.includes(value); + } + return pattern === value; +} + +export class TimeMatcher { + pattern: string; + expressions: string[]; + dtf: Intl.DateTimeFormat | null; + timezone?: string; + + constructor(pattern: string, timezone?: string) { + validateCrontab(pattern); + this.pattern = convertExpression(pattern); + this.timezone = timezone; + this.expressions = this.pattern.split(' '); + this.dtf = this.timezone + ? new Intl.DateTimeFormat('en-US', { + year: 'numeric', + month: '2-digit', + day: '2-digit', + hour: '2-digit', + minute: '2-digit', + second: '2-digit', + hourCycle: 'h23', + fractionalSecondDigits: 3, + timeZone: this.timezone + }) : null; + } + + match(date: Date){ + date = this.apply(date); + + const runOnSecond = matchPattern(this.expressions[0], date.getSeconds()); + const runOnMinute = matchPattern(this.expressions[1], date.getMinutes()); + const runOnHour = matchPattern(this.expressions[2], date.getHours()); + const runOnDay = this.runsThisDay(date); + + return runOnSecond && runOnMinute && runOnHour && runOnDay; + } + + private runsThisDay(date: Date) { + const runOnDay = matchPattern(this.expressions[3], date.getDate()); + const runOnMonth = matchPattern(this.expressions[4], date.getMonth() + 1); + const runOnWeekDay = matchPattern(this.expressions[5], date.getDay()); + + return runOnDay && runOnMonth && runOnWeekDay; + } + + nextWakeupTime(date: Date) { + // This is conservative. Some schedules never occur, such as the 30th of February, but you can ask for them + let msec = Math.round(date.getTime()); + // This can be optimized by skipping ahead, but unit test first + for (let maxIters = 3600; --maxIters; maxIters > 0) { + msec += 1000; + const nd = new Date(msec); + if (this.match(nd)) return nd; + } + return new Date(msec); + } + + apply(date: Date){ + if(this.dtf){ + return new Date(this.dtf.format(date)); + } + + return date; + } +} diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts new file mode 100644 index 000000000..6fd72a86e --- /dev/null +++ b/src/scheduler/scheduler.ts @@ -0,0 +1,195 @@ +import { WorkflowContext } from ".."; +import { DBOSExecutor } from "../dbos-executor"; +import { MethodRegistration, registerAndWrapFunction } from "../decorators"; +import { TimeMatcher } from "./crontab"; +import { Workflow } from "../workflow"; + +//// +// Configuration +//// + +export enum SchedulerMode { + ExactlyOncePerInterval = 'ExactlyOncePerInterval', + ExactlyOncePerIntervalWhenActive = 'ExactlyOncePerIntervalWhenActive', +} + +export class SchedulerConfig { + crontab: string = '* * * * *'; // Every minute + mode ?: SchedulerMode = SchedulerMode.ExactlyOncePerInterval; +} + +//// +// Method Decorator +//// + +// Scheduled Time. Actual Time. +export type ScheduledArgs = [Date, Date] + +export interface SchedulerRegistrationConfig { + schedulerConfig?: SchedulerConfig; +} + +export class SchedulerRegistration extends MethodRegistration + implements SchedulerRegistrationConfig +{ + schedulerConfig?: SchedulerConfig; + constructor(origFunc: (this: This, ...args: Args) => Promise) { + super(origFunc); + } +} + +export function Scheduled(schedulerConfig: SchedulerConfig) { + function scheddec( + target: object, + propertyKey: string, + inDescriptor: TypedPropertyDescriptor<(this: This, ctx: Ctx, ...args: ScheduledArgs) => Promise> + ) { + const { descriptor, registration } = registerAndWrapFunction(target, propertyKey, inDescriptor); + const schedRegistration = registration as unknown as SchedulerRegistration; + schedRegistration.schedulerConfig = schedulerConfig; + + return descriptor; + } + return scheddec; +} + +/////////////////////////// +// Scheduler Management +/////////////////////////// + +export class DBOSScheduler{ + constructor(readonly dbosExec: DBOSExecutor) {} + + schedLoops: DetachableLoop[] = []; + schedTasks: Promise [] = []; + + initScheduler() { + for (const registeredOperation of this.dbosExec.registeredOperations) { + const ro = registeredOperation as SchedulerRegistration; + if (ro.schedulerConfig) { + const loop = new DetachableLoop( + this.dbosExec, + ro.schedulerConfig.crontab ?? '* * * * *', + ro.schedulerConfig.mode ?? SchedulerMode.ExactlyOncePerInterval, + ro + ); + this.schedLoops.push(loop); + this.schedTasks.push(loop.startLoop()); + } + } + } + + async destroyScheduler() { + for (const l of this.schedLoops) { + l.setStopLoopFlag(); + } + this.schedLoops = []; + try { + await Promise.allSettled(this.schedTasks); + } + catch (e) { + // What gets caught here is the loop stopping, which is what we wanted. + } + this.schedTasks = []; + } + + logRegisteredSchedulerEndpoints() { + const logger = this.dbosExec.logger; + logger.info("Scheduled endpoints:"); + this.dbosExec.registeredOperations.forEach((registeredOperation) => { + const ro = registeredOperation as SchedulerRegistration; + if (ro.schedulerConfig) { + logger.info(` ${ro.name} @ ${ro.schedulerConfig.crontab}; ${ro.schedulerConfig.mode ?? SchedulerMode.ExactlyOncePerInterval}`); + } + }); + } +} + +class DetachableLoop { + private isRunning: boolean = false; + private interruptResolve?: () => void; + private lastExec: Date; + private timeMatcher: TimeMatcher; + + constructor(readonly dbosExec: DBOSExecutor, readonly crontab: string, readonly schedMode:SchedulerMode, + readonly scheduledMethod: SchedulerRegistration) + { + this.lastExec = new Date(); + this.lastExec.setMilliseconds(0); + this.timeMatcher = new TimeMatcher(crontab); + } + + async startLoop(): Promise { + // See if the exec time is available in durable storage... + if (this.schedMode !== SchedulerMode.ExactlyOncePerInterval) + { + const lasttm = await this.dbosExec.systemDatabase.getLastScheduledTime(this.scheduledMethod.name); + if (lasttm) { + this.lastExec = new Date(lasttm); + } + } + + this.isRunning = true; + while (this.isRunning) { + const nextExecTime = this.timeMatcher.nextWakeupTime(this.lastExec); + const sleepTime = nextExecTime.getTime() - new Date().getTime(); + + if (sleepTime > 0) { + // Wait for either the timeout or an interruption + await Promise.race([ + this.sleepms(sleepTime), + new Promise((_, reject) => this.interruptResolve = reject) + ]) + .catch(); // Interrupt sleep throws + } + + if (!this.isRunning) { + break; + } + + // Check crontab + // If this "wake up" time is not on the schedule, we shouldn't execute. + // (While ATOW this wake-up time is a scheduled run time, it is not + // contractually obligated to be so. If this is obligated to be a + // scheduled execution time, then we could make this an assertion + // instead of a check.) + if (!this.timeMatcher.match(nextExecTime)) { + this.lastExec = nextExecTime; + continue; + } + + // Init workflow + const workflowUUID = `sched-${this.scheduledMethod.name}-${nextExecTime.toISOString()}`; + this.dbosExec.logger.debug(`Executing scheduled workflow ${workflowUUID}`); + const wfParams = { workflowUUID: workflowUUID }; + // All operations annotated with Scheduled decorators must take in these four + const args: ScheduledArgs = [nextExecTime, new Date()]; + + // We currently only support scheduled workflows + if (this.scheduledMethod.workflowConfig) { + // Execute the workflow + await this.dbosExec.workflow(this.scheduledMethod.registeredFunction as Workflow, wfParams, ...args); + } + else { + this.dbosExec.logger.error(`Function ${this.scheduledMethod.name} is @scheduled but not a workflow`); + } + + // Record the time of the wf kicked off + const dbTime = await this.dbosExec.systemDatabase.setLastScheduledTime(this.scheduledMethod.name, nextExecTime.getTime()); + if (dbTime && dbTime > nextExecTime.getTime()) nextExecTime.setTime(dbTime); + this.lastExec = nextExecTime; + } + } + + setStopLoopFlag() { + if (!this.isRunning) return; + this.isRunning = false; + if (this.interruptResolve) { + this.interruptResolve(); // Trigger the interruption + } + } + + private sleepms(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } +} diff --git a/src/system_database.ts b/src/system_database.ts index 999e39d09..14f9898f4 100644 --- a/src/system_database.ts +++ b/src/system_database.ts @@ -5,7 +5,7 @@ import { DBOSExecutor, dbosNull, DBOSNull } from "./dbos-executor"; import { DatabaseError, Pool, PoolClient, Notification, PoolConfig, Client } from "pg"; import { DuplicateWorkflowEventError, DBOSWorkflowConflictUUIDError, DBOSNonExistentWorkflowError } from "./error"; import { StatusString, WorkflowStatus } from "./workflow"; -import { notifications, operation_outputs, workflow_status, workflow_events, workflow_inputs } from "../schemas/system_db_schema"; +import { notifications, operation_outputs, workflow_status, workflow_events, workflow_inputs, scheduler_state } from "../schemas/system_db_schema"; import { sleep, findPackageRoot, DBOSReplacer, DBOSReviver } from "./utils"; import { HTTPRequest } from "./context"; import { GlobalLogger as Logger } from "./telemetry/logs"; @@ -40,6 +40,11 @@ export interface SystemDatabase { setEvent>(workflowUUID: string, functionID: number, key: string, value: T): Promise; getEvent>(workflowUUID: string, key: string, timeoutSeconds: number, callerUUID?: string, functionID?: number): Promise; + + // Scheduler queries + // These two maintain exactly once - make sure we kick off the workflow at least once, and wf unique ID does the rest + getLastScheduledTime(wfn: string): Promise; // Last workflow we are sure we invoked + setLastScheduledTime(wfn: string, invtime: number): Promise; // We are now sure we invoked another } // For internal use, not serialized status. @@ -594,4 +599,29 @@ export class PostgresSystemDatabase implements SystemDatabase { }; this.notificationsClient.on("notification", handler); } + + /* SCHEDULER */ + async getLastScheduledTime(wfn: string): Promise { + const res = await this.pool.query(` + SELECT last_run_time + FROM ${DBOSExecutor.systemDBSchemaName}.scheduler_state + WHERE workflow_fn_name = $1; + `, [wfn]); + + let v = res.rows[0]?.last_run_time ?? null; + if (v!== null) v = parseInt(`${v}`); + return v; + } + + async setLastScheduledTime(wfn: string, invtime: number): Promise { + const res = await this.pool.query(` + INSERT INTO ${DBOSExecutor.systemDBSchemaName}.scheduler_state (workflow_fn_name, last_run_time) + VALUES ($1, $2) + ON CONFLICT (workflow_fn_name) + DO UPDATE SET last_run_time = GREATEST(EXCLUDED.last_run_time, scheduler_state.last_run_time) + RETURNING last_run_time; + `, [wfn, invtime]); + + return parseInt(`${res.rows[0].last_run_time}`); + } } diff --git a/src/testing/testing_runtime.ts b/src/testing/testing_runtime.ts index 4658d3c49..593afe158 100644 --- a/src/testing/testing_runtime.ts +++ b/src/testing/testing_runtime.ts @@ -16,6 +16,7 @@ import { SystemDatabase } from "../system_database"; import { get, set } from "lodash"; import { Client } from "pg"; import { DBOSKafka } from "../kafka/kafka"; +import { DBOSScheduler } from "../scheduler/scheduler"; /** * Create a testing runtime. Warn: this function will drop the existing system DB and create a clean new one. Don't run tests against your production database! @@ -83,6 +84,7 @@ export async function createInternalTestRuntime(userClasses: object[], testConfi export class TestingRuntimeImpl implements TestingRuntime { #server: DBOSHttpServer | null = null; #kafka: DBOSKafka | null = null; + #scheduler: DBOSScheduler | null = null; #applicationConfig: object = {}; #isInitialized = false; @@ -97,6 +99,8 @@ export class TestingRuntimeImpl implements TestingRuntime { this.#server = new DBOSHttpServer(dbosExec); this.#kafka = new DBOSKafka(dbosExec); await this.#kafka.initKafka(); + this.#scheduler = new DBOSScheduler(dbosExec); + this.#scheduler.initScheduler(); this.#applicationConfig = dbosExec.config.application ?? {}; this.#isInitialized = true; } @@ -107,6 +111,7 @@ export class TestingRuntimeImpl implements TestingRuntime { async destroy() { // Only release once. if (this.#isInitialized) { + await this.#scheduler?.destroyScheduler(); await this.#kafka?.destroyKafka(); await this.#server?.dbosExec.destroy(); this.#isInitialized = false; diff --git a/src/workflow.ts b/src/workflow.ts index 4d6be12ec..1386a6c12 100644 --- a/src/workflow.ts +++ b/src/workflow.ts @@ -33,7 +33,6 @@ export interface WorkflowParams { } export interface WorkflowConfig { - // TODO: add workflow config here. } export interface WorkflowStatus { diff --git a/tests/scheduler/crontab.test.ts b/tests/scheduler/crontab.test.ts new file mode 100644 index 000000000..dd270fd11 --- /dev/null +++ b/tests/scheduler/crontab.test.ts @@ -0,0 +1,596 @@ +// This code was based on code from node-cron: +// https://github.com/node-cron/node-cron +/* +ISC License +Copyright (c) 2016, Lucas Merencia + +Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +import {validateCrontab as validate, convertExpression as conversion, TimeMatcher} from '../../src/scheduler/crontab'; + +////////////////// +// Conversion tests +////////////////// +describe('asterisk-to-range-conversion', () => { + it('should convert * to ranges', () => { + const expressions = '* * * * * *'; + const expression = conversion(expressions); + expect(expression).toBe('0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31 1,2,3,4,5,6,7,8,9,10,11,12 0,1,2,3,4,5,6'); + }); + + it('should convert * to ranges', () => { + const expressions = '* * * * *'; + const expression = conversion(expressions); + expect(expression).toBe('0 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31 1,2,3,4,5,6,7,8,9,10,11,12 0,1,2,3,4,5,6'); + }); +}); + +describe('names-conversion', () => { + it('should convert month names', () => { + const expression = conversion('* * * * January,February *'); + const expressions = expression.split(' '); + expect(expressions[4]).toBe('1,2'); + }); + + it('should convert week day names', () => { + const expression = conversion('* * * * * Mon,Sun'); + const expressions = expression.split(' '); + expect(expressions[5]).toBe('1,0'); + }); +}); + +describe('month-names-conversion', () => { + it('should convert month names', () => { + const months = conversion('* * * * January,February,March,April,May,June,July,August,September,October,November,December').split(' ')[4]; + expect(months).toBe('1,2,3,4,5,6,7,8,9,10,11,12'); + }); + + it('should convert month names', () => { + const months = conversion('* * * * Jan,Feb,Mar,Apr,May,Jun,Jul,Aug,Sep,Oct,Nov,Dec').split(' ')[4]; + expect(months).toBe('1,2,3,4,5,6,7,8,9,10,11,12'); + }); +}); + +describe('range-conversion', () => { + it('should convert ranges to numbers', () => { + const expressions = '0-3 0-3 0-2 1-3 1-2 0-3'; + const expression = conversion(expressions); + expect(expression).toBe('0,1,2,3 0,1,2,3 0,1,2 1,2,3 1,2 0,1,2,3'); + }); + + it('should convert ranges to numbers', () => { + const expressions = '0-3 0-3 8-10 1-3 1-2 0-3'; + const expression = conversion(expressions); + expect(expression).toBe('0,1,2,3 0,1,2,3 8,9,10 1,2,3 1,2 0,1,2,3'); + }); + + it('should convert comma delimited ranges to numbers', () => { + const expressions = '0-2,10-23 * * * * *'; + const expression = conversion(expressions).split(' ')[0]; + expect(expression).toBe('0,1,2,10,11,12,13,14,15,16,17,18,19,20,21,22,23'); + }); +}); + +describe('step-values-conversion', () => { + it('should convert step values', () => { + const expression = '1,2,3,4,5,6,7,8,9,10/2 0,1,2,3,4,5,6,7,8,9/5 */3 * * *'; + const expressions = conversion(expression).split(' '); + expect(expressions[0]).toBe('2,4,6,8,10'); + expect(expressions[1]).toBe('0,5'); + expect(expressions[2]).toBe('0,3,6,9,12,15,18,21'); + }); + + it('should throw an error if step value is not a number', () => { + const expressions = '1,2,3,4,5,6,7,8,9,10/someString 0,1,2,3,4,5,6,7,8,9/5 * * * *'; + expect(() => conversion(expressions)).toThrow('someString is not a valid step value'); + }); +}); + +describe('week-day-names-conversion', () => { + it('should convert week day names names', () => { + const weekDays = conversion('* * * * Monday,Tuesday,Wednesday,Thursday,Friday,Saturday,Sunday').split(' ')[5]; + expect(weekDays).toBe('1,2,3,4,5,6,0'); + }); + + it('should convert short week day names names', () => { + const weekDays = conversion('* * * * Mon,Tue,Wed,Thu,Fri,Sat,Sun').split(' ')[5]; + expect(weekDays).toBe('1,2,3,4,5,6,0'); + }); + + it('should convert 7 to 0', () => { + const weekDays = conversion('* * * * 7').split(' ')[5]; + expect(weekDays).toBe('0'); + }); +}); + +///////////// +// Validation tests +///////////// + +describe('pattern-validation', () => { + describe('validate day of month', () => { + it('should fail with invalid day of month', () => { + expect(() => { + validate('* * 32 * *'); + }).toThrow('32 is a invalid expression for day of month'); + }); + + it('should not fail with valid day of month', () => { + expect(() => { + validate('0 * * 15 * *'); + }).not.toThrow(); + }); + + it('should not fail with * for day of month', () => { + expect(() => { + validate('* * * * * *'); + }).not.toThrow(); + }); + + it('should not fail with */2 for day of month', () => { + expect(() => { + validate('* * */2 * *'); + }).not.toThrow(); + }); + }); +}); + +describe('pattern-validation', () => { + describe('validate hour', () => { + it('should fail with invalid hour', () => { + expect(() => { + validate('* 25 * * *'); + }).toThrow('25 is a invalid expression for hour'); + }); + + it('should not fail with valid hour', () => { + expect(() => { + validate('* 12 * * *'); + }).not.toThrow(); + }); + + it('should not fail with * for hour', () => { + expect(() => { + validate('* * * * * *'); + }).not.toThrow(); + }); + + it('should not fail with */2 for hour', () => { + expect(() => { + validate('* */2 * * *'); + }).not.toThrow(); + }); + + it('should accept range for hours', () => { + expect(() => { + validate('* 3-20 * * *'); + }).not.toThrow(); + }); + }); +}); + +describe('pattern-validation', () => { + describe('validate minutes', () => { + it('should fail with invalid minute', () => { + expect(() => { + validate('63 * * * *'); + }).toThrow('63 is a invalid expression for minute'); + }); + + it('should not fail with valid minute', () => { + expect(() => { + validate('30 * * * *'); + }).not.toThrow(); + }); + + it('should not fail with *', () => { + expect(() => { + validate('* * * * *'); + }).not.toThrow(); + }); + + it('should not fail with */2', () => { + expect(() => { + validate('*/2 * * * *'); + }).not.toThrow(); + }); + }); +}); + +describe('pattern-validation', () => { + describe('validate month', () => { + it('should fail with invalid month', () => { + expect( () => { + validate('* * * 13 *'); + }).toThrow('13 is a invalid expression for month'); + }); + + it('should fail with invalid month name', () => { + expect( () => { + validate('* * * foo *'); + }).toThrow('foo is a invalid expression for month'); + }); + + it('should not fail with valid month', () => { + expect( () => { + validate('* * * 10 *'); + }).not.toThrow(); + }); + + it('should not fail with valid month name', () => { + expect( () => { + validate('* * * September *'); + }).not.toThrow(); + }); + + it('should not fail with * for month', () => { + expect( () => { + validate('* * * * *'); + }).not.toThrow(); + }); + + it('should not fail with */2 for month', () => { + expect( () => { + validate('* * * */2 *'); + }).not.toThrow(); + }); + }); +}); + +describe('pattern-validation', () => { + describe('validate seconds', () => { + it('should fail with invalid second', () => { + expect(() => { + validate('63 * * * * *'); + }).toThrow('63 is a invalid expression for second'); + }); + + it('should not fail with valid second', () => { + expect(() => { + validate('30 * * * * *'); + }).not.toThrow(); + }); + + it('should not fail with * for second', () => { + expect(() => { + validate('* * * * * *'); + }).not.toThrow(); + }); + + it('should not fail with */2 for second', () => { + expect(() => { + validate('*/2 * * * * *'); + }).not.toThrow(); + }); + }); +}); + +describe('pattern-validation', () => { + it('should succeed with a valid expression', () => { + expect(() => { + validate('59 * * * *'); + }).not.toThrow(); + }); + + it('should fail with an invalid expression', () => { + expect(() => { + validate('60 * * * *'); + }).toThrow('60 is a invalid expression for minute'); + }); + + it('should fail without a string', () => { + expect(() => { + validate(50 as unknown as string); + }).toThrow('pattern must be a string!'); + }); +}); + +describe('pattern-validation', () => { + describe('validate week day', () => { + it('should fail with invalid week day', () => { + expect(() => { + validate('* * * * 9'); + }).toThrow('9 is a invalid expression for week day'); + }); + + it('should fail with invalid week day name', () => { + expect(() => { + validate('* * * * foo'); + }).toThrow('foo is a invalid expression for week day'); + }); + + it('should not fail with valid week day', () => { + expect(() => { + validate('* * * * 5'); + }).not.toThrow(); + }); + + it('should not fail with valid week day name', () => { + expect(() => { + validate('* * * * Friday'); + }).not.toThrow(); + }); + + it('should not fail with * for week day', () => { + expect(() => { + validate('* * * * *'); + }).not.toThrow(); + }); + + it('should not fail with */2 for week day', () => { + expect(() => { + validate('* * * */2 *'); + }).not.toThrow(); + }); + + it('should not fail with Monday-Sunday for week day', () => { + expect(() => { + validate('* * * * Monday-Sunday'); + }).not.toThrow(); + }); + + it('should not fail with 1-7 for week day', () => { + expect(() => { + validate('0 0 1 1 1-7'); + }).not.toThrow(); + }); + }); +}); + +///////// +// Time matcher +///////// + +describe('TimeMatcher', () => { + describe('wildcard', () => { + it('should accept wildcard for second', () => { + const matcher = new TimeMatcher('* * * * * *'); + expect(matcher.match(new Date())).toBe(true); + }); + + it('should accept wildcard for minute', () => { + const matcher = new TimeMatcher('0 * * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 10, 20, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 10, 20, 1))).toBe(false); + }); + + it('should accept wildcard for hour', () => { + const matcher = new TimeMatcher('0 0 * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 10, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 10, 1, 0))).toBe(false); + }); + + it('should accept wildcard for day', () => { + const matcher = new TimeMatcher('0 0 0 * * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 1, 0, 0))).toBe(false); + }); + + it('should accept wildcard for month', () => { + const matcher = new TimeMatcher('0 0 0 1 * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 2, 0, 0, 0))).toBe(false); + }); + + it('should accept wildcard for week day', () => { + const matcher = new TimeMatcher('0 0 0 1 4 *'); + expect(matcher.match(new Date(2018, 3, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 3, 2, 0, 0, 0))).toBe(false); + }); + }); + + describe('single value', () => { + it('should accept single value for second', () => { + const matcher = new TimeMatcher('5 * * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 5))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 6))).toBe(false); + }); + + it('should accept single value for minute', () => { + const matcher = new TimeMatcher('0 5 * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 5, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 6, 0))).toBe(false); + }); + + it('should accept single value for hour', () => { + const matcher = new TimeMatcher('0 0 5 * * *'); + expect(matcher.match(new Date(2018, 0, 1, 5, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 6, 0, 0))).toBe(false); + }); + + it('should accept single value for day', () => { + const matcher = new TimeMatcher('0 0 0 5 * *'); + expect(matcher.match(new Date(2018, 0, 5, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 6, 0, 0, 0))).toBe(false); + }); + + it('should accept single value for month', () => { + const matcher = new TimeMatcher('0 0 0 1 5 *'); + expect(matcher.match(new Date(2018, 4, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 5, 1, 0, 0, 0))).toBe(false); + }); + + it('should accept single value for week day', () => { + const matcher = new TimeMatcher('0 0 0 * * monday'); + expect(matcher.match(new Date(2018, 4, 7, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 4, 8, 0, 0, 0))).toBe(false); + }); + }); + + describe('multiple values', () => { + it('should accept multiple values for second', () => { + const matcher = new TimeMatcher('5,6 * * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 5))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 6))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 7))).toBe(false); + }); + + it('should accept multiple values for minute', () => { + const matcher = new TimeMatcher('0 5,6 * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 5, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 6, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 7, 0))).toBe(false); + }); + + it('should accept multiple values for hour', () => { + const matcher = new TimeMatcher('0 0 5,6 * * *'); + expect(matcher.match(new Date(2018, 0, 1, 5, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 6, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 7, 0, 0))).toBe(false); + }); + + it('should accept multiple values for day', () => { + const matcher = new TimeMatcher('0 0 0 5,6 * *'); + expect(matcher.match(new Date(2018, 0, 5, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 6, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 7, 0, 0, 0))).toBe(false); + }); + + it('should accept multiple values for month', () => { + const matcher = new TimeMatcher('0 0 0 1 may,june *'); + expect(matcher.match(new Date(2018, 4, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 5, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 6, 1, 0, 0, 0))).toBe(false); + }); + + it('should accept multiple values for week day', () => { + const matcher = new TimeMatcher('0 0 0 * * monday,tue'); + expect(matcher.match(new Date(2018, 4, 7, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 4, 8, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 4, 9, 0, 0, 0))).toBe(false); + }); + }); + + describe('range', () => { + it('should accept range for second', () => { + const matcher = new TimeMatcher('5-7 * * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 5))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 6))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 7))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 8))).toBe(false); + }); + + it('should accept range for minute', () => { + const matcher = new TimeMatcher('0 5-7 * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 5, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 6, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 7, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 8, 0))).toBe(false); + }); + + it('should accept range for hour', () => { + const matcher = new TimeMatcher('0 0 5-7 * * *'); + expect(matcher.match(new Date(2018, 0, 1, 5, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 6, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 7, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 8, 0, 0))).toBe(false); + }); + + it('should accept range for day', () => { + const matcher = new TimeMatcher('0 0 0 5-7 * *'); + expect(matcher.match(new Date(2018, 0, 5, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 6, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 7, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 8, 0, 0, 0))).toBe(false); + }); + + it('should accept range for month', () => { + const matcher = new TimeMatcher('0 0 0 1 may-july *'); + expect(matcher.match(new Date(2018, 4, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 5, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 6, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 7, 1, 0, 0, 0))).toBe(false); + }); + + it('should accept range for week day', () => { + const matcher = new TimeMatcher('0 0 0 * * monday-wed'); + expect(matcher.match(new Date(2018, 4, 7, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 4, 8, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 4, 9, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 4, 10, 0, 0, 0))).toBe(false); + }); + }); + + describe('step values', () => { + it('should accept step values for second', () => { + const matcher = new TimeMatcher('*/2 * * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 2))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 6))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 0, 7))).toBe(false); + }); + + it('should accept step values for minute', () => { + const matcher = new TimeMatcher('0 */2 * * * *'); + expect(matcher.match(new Date(2018, 0, 1, 0, 2, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 6, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 0, 7, 0))).toBe(false); + }); + + it('should accept step values for hour', () => { + const matcher = new TimeMatcher('0 0 */2 * * *'); + expect(matcher.match(new Date(2018, 0, 1, 2, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 6, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 1, 7, 0, 0))).toBe(false); + }); + + it('should accept step values for day', () => { + const matcher = new TimeMatcher('0 0 0 */2 * *'); + expect(matcher.match(new Date(2018, 0, 2, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 6, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 0, 7, 0, 0, 0))).toBe(false); + }); + + it('should accept step values for month', () => { + const matcher = new TimeMatcher('0 0 0 1 */2 *'); + expect(matcher.match(new Date(2018, 1, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 5, 1, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 6, 1, 0, 0, 0))).toBe(false); + }); + + it('should accept step values for week day', () => { + const matcher = new TimeMatcher('0 0 0 * * */2'); + expect(matcher.match(new Date(2018, 4, 6, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 4, 8, 0, 0, 0))).toBe(true); + expect(matcher.match(new Date(2018, 4, 9, 0, 0, 0))).toBe(false); + }); + }); + + describe('timezone', ()=>{ + it('should match with timezone America/Sao_Paulo', () => { + const matcher = new TimeMatcher('0 0 0 * * *', 'America/Sao_Paulo'); + const utcTime = new Date('Thu Oct 11 2018 03:00:00Z'); + expect(matcher.match(utcTime)).toBe(true); + }); + + it('should match with timezone Europe/Rome', () => { + const matcher = new TimeMatcher('0 0 0 * * *', 'Europe/Rome'); + const utcTime = new Date('Thu Oct 11 2018 22:00:00Z'); + expect(matcher.match(utcTime)).toBe(true); + }); + + /* + it('should match with all available timezone of moment-timezone', () => { + const allTimeZone = moment.tz.names(); + for (const zone in allTimeZone) { + const tmp = moment(); + const expected = moment.tz(tmp,allTimeZone[zone]); + const pattern = expected.second() + ' ' + expected.minute() + ' ' + expected.hour() + ' ' + expected.date() + ' ' + (expected.month()+1) + ' ' + expected.day(); + const matcher = new TimeMatcher(pattern, allTimeZone[zone]); + const utcTime = new Date(tmp.year(), tmp.month(), tmp.date(), tmp.hour(), tmp.minute(), tmp.second(), tmp.millisecond()); + expect(matcher.match(utcTime)); + } + }); + */ + }); +}); \ No newline at end of file diff --git a/tests/scheduler/scheduler.test.ts b/tests/scheduler/scheduler.test.ts new file mode 100644 index 000000000..f6c2d1e3b --- /dev/null +++ b/tests/scheduler/scheduler.test.ts @@ -0,0 +1,47 @@ +import { Scheduled, SchedulerMode, TestingRuntime, Workflow, WorkflowContext } from "../../src"; +import { DBOSConfig } from "../../src/dbos-executor"; +import { createInternalTestRuntime } from "../../src/testing/testing_runtime"; +import { sleep } from "../../src/utils"; +import { generateDBOSTestConfig, setUpDBOSTestDb } from "../helpers"; + +describe("scheduled-wf-tests", () => { + let config: DBOSConfig; + let testRuntime: TestingRuntime; + + beforeAll(async () => { + config = generateDBOSTestConfig(); + await setUpDBOSTestDb(config); + }); + + beforeEach(async () => { + testRuntime = await createInternalTestRuntime([DBOSSchedTestClass], config); + }); + + afterEach(async () => { + await testRuntime.destroy(); + }, 10000); + + test("wf-scheduled", async () => { + await sleep(3000); + expect(DBOSSchedTestClass.nCalls).toBeGreaterThanOrEqual(2); + expect(DBOSSchedTestClass.nTooEarly).toBe(0); + expect(DBOSSchedTestClass.nTooLate).toBe(0); + }); +}); + +class DBOSSchedTestClass { + static nCalls = 0; + static nTooEarly = 0; + static nTooLate = 0; + + @Scheduled({crontab: '* * * * * *', mode: SchedulerMode.ExactlyOncePerIntervalWhenActive}) + @Workflow() + static async scheduledDefault(ctxt: WorkflowContext, schedTime: Date, startTime: Date) { + DBOSSchedTestClass.nCalls++; + + if (schedTime.getTime() > startTime.getTime()) DBOSSchedTestClass.nTooEarly++; + if (startTime.getTime() - schedTime.getTime() > 1500) DBOSSchedTestClass.nTooLate++; + + await ctxt.sleep(2); + } +}