From d8cf4989acdc142fbe40a8c222f128457804a12b Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 8 Aug 2022 18:12:49 +1000 Subject: [PATCH] wip prototyping queue, tasks and timer --- src/tasks/Queue.ts | 165 ++++++++++ src/tasks/Scheduler.ts | 561 ++++++++++++++++++++++++++++++++++ src/tasks/Task.ts | 120 ++++++++ src/tasks/errors.ts | 79 +++++ src/tasks/index.ts | 4 + src/tasks/types.ts | 72 +++++ src/tasks/utils.ts | 21 ++ src/timer/Timer.ts | 7 + src/tracing/Trace.ts | 2 + test-promises.ts | 35 +++ test-transaction-decorator.ts | 100 ++++++ tests/tasks/Scheduler.test.ts | 72 +++++ 12 files changed, 1238 insertions(+) create mode 100644 src/tasks/Queue.ts create mode 100644 src/tasks/Scheduler.ts create mode 100644 src/tasks/Task.ts create mode 100644 src/tasks/errors.ts create mode 100644 src/tasks/index.ts create mode 100644 src/tasks/types.ts create mode 100644 src/tasks/utils.ts create mode 100644 src/timer/Timer.ts create mode 100644 src/tracing/Trace.ts create mode 100644 test-promises.ts create mode 100644 test-transaction-decorator.ts create mode 100644 tests/tasks/Scheduler.test.ts diff --git a/src/tasks/Queue.ts b/src/tasks/Queue.ts new file mode 100644 index 0000000000..c0245c42d0 --- /dev/null +++ b/src/tasks/Queue.ts @@ -0,0 +1,165 @@ +import type { DB, DBTransaction, LevelPath } from '@matrixai/db'; +import type { TaskIdString } from './types'; +import type { PolykeyWorkerManagerInterface } from '../workers/types'; +import type { PromiseDeconstructed } from '../types'; +import Logger from '@matrixai/logger'; +import { + CreateDestroyStartStop, + ready, +} from '@matrixai/async-init/dist/CreateDestroyStartStop'; +import * as tasksUtils from './utils'; +import * as tasksErrors from './errors'; + +interface Queue extends CreateDestroyStartStop {} +@CreateDestroyStartStop( + new tasksErrors.ErrorQueueRunning(), + new tasksErrors.ErrorQueueDestroyed(), +) +class Queue { + + public static async createQueue({ + db, + logger = new Logger(this.name), + fresh = false, + }: { + db: DB; + logger?: Logger; + fresh?: boolean; + }) { + logger.info(`Creating ${this.name}`); + const queue = new this({ db, logger }); + await queue.start({ fresh }); + logger.info(`Created ${this.name}`); + return queue; + } + + protected logger: Logger; + protected db: DB; + protected workerManager?: PolykeyWorkerManagerInterface; + protected queueDbPath: LevelPath = [this.constructor.name]; + + // when the queue to execute the tasks + // the task id is generated outside + // you don't get a task id here + // you just "push" tasks there to be executed + // this is the "shared" set of promises to be maintained + protected promises: Map> = new Map(); + + // /** + // * Listeners for task execution + // * When a task is executed, these listeners are synchronously executed + // * The listeners are intended for resolving or rejecting task promises + // */ + // protected listeners: Map> = new Map(); + + public constructor({ + db, + logger + }: { + db: DB; + logger: Logger + }) { + this.logger = logger; + this.db = db; + } + + public setWorkerManager(workerManager: PolykeyWorkerManagerInterface) { + this.workerManager = workerManager; + } + + public unsetWorkerManager() { + delete this.workerManager; + } + + public async start({ + fresh = false, + }: { + fresh?: boolean; + } = {}): Promise { + this.logger.info(`Starting ${this.constructor.name}`); + if (fresh) { + await this.db.clear(this.queueDbPath); + } + this.logger.info(`Started ${this.constructor.name}`); + } + + public async stop(): Promise { + this.logger.info(`Stopping ${this.constructor.name}`); + this.logger.info(`Stopped ${this.constructor.name}`); + } + + public async destroy() { + this.logger.info(`Destroying ${this.constructor.name}`); + await this.db.clear(this.queueDbPath); + this.logger.info(`Destroyed ${this.constructor.name}`); + } + + // promises are "connected" to events + + /** + * IF a handler does not exist + * if the task is executed + * then an exception is thrown + * if listener exists, the exception is passed into this listener function + * if it doesn't exist, then it's just a reference exception in general, this can be logged + * There's nothing else to do + */ + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + protected registerListener( + taskId: TaskId, + taskListener: TaskListener + ): void { + const taskIdString = taskId.toString() as TaskIdString; + const taskListeners = this.listeners.get(taskIdString); + if (taskListeners != null) { + taskListeners.push(taskListener); + } else { + this.listeners.set(taskIdString, [taskListener]); + } + } + + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + protected deregisterListener( + taskId: TaskId, + taskListener: TaskListener + ): void { + const taskIdString = taskId.toString() as TaskIdString; + const taskListeners = this.listeners.get(taskIdString); + if (taskListeners == null || taskListeners.length < 1) return; + const index = taskListeners.indexOf(taskListener); + if (index !== -1) { + taskListeners.splice(index, 1); + } + } + +} + +export default Queue; + + +// epic queue +// need to do a couple things: +// 1. integrate fast-check +// 2. integrate span checks +// 3. might also consider span logs? +// 4. open tracing observability +// 5. structured logging +// 6. async hooks to get traced promises to understand the situation +// 7. do we also get fantasy land promises? and async cancellable stuff? +// 8. task abstractions? +// need to use the db for this +// 9. priority structure +// 10. timers +// abort controller + +// kinetic data structure +// the priority grows as a function of time +// order by priority <- this thing has a static value +// in a key value DB, you can maintain sorted index of values +// IDs can be lexicographically sortable + +// this is a persistent queue +// of tasks that should be EXECUTED right now +// the scheduler is a persistent scheduler of scheduled tasks +// tasks get pushed from the scheduler into the queue +// the queue connects to the WorkerManager diff --git a/src/tasks/Scheduler.ts b/src/tasks/Scheduler.ts new file mode 100644 index 0000000000..18d5675368 --- /dev/null +++ b/src/tasks/Scheduler.ts @@ -0,0 +1,561 @@ +import type { DB, DBTransaction, LevelPath } from '@matrixai/db'; +import type { TaskId, TaskHandlerId, TaskHandler, TaskData, TaskInfo, TaskIdString } from './types'; +import type KeyManager from '../keys/KeyManager'; +import type { PolykeyWorkerManagerInterface } from '../workers/types'; +import type { POJO, Callback, PromiseDeconstructed } from '../types'; +import Logger from '@matrixai/logger'; +import { IdInternal } from '@matrixai/id'; +import { extractTs } from '@matrixai/id/dist/IdSortable'; +import { + CreateDestroyStartStop, + ready, +} from '@matrixai/async-init/dist/CreateDestroyStartStop'; +import lexi from 'lexicographic-integer'; +import Queue from './Queue'; +import * as tasksUtils from './utils'; +import * as tasksErrors from './errors'; +import * as utils from '../utils'; + +interface Scheduler extends CreateDestroyStartStop {} +@CreateDestroyStartStop( + new tasksErrors.ErrorSchedulerRunning(), + new tasksErrors.ErrorSchedulerDestroyed(), +) +class Scheduler { + /** + * Create the scheduler, which will create its own Queue + * This will automatically start the scheduler + * If the scheduler needs to be started after the fact + * Make sure to construct it, and then call `start` manually + */ + public static async createScheduler({ + db, + keyManager, + queue, + logger = new Logger(this.name), + handlers = {}, + delay = false, + fresh = false, + }: { + db: DB; + keyManager: KeyManager; + queue?: Queue; + logger?: Logger; + handlers?: Record; + delay?: boolean; + fresh?: boolean; + }): Promise { + logger.info(`Creating ${this.name}`); + queue = queue ?? await Queue.createQueue({ + db, + logger: logger.getChild(Queue.name), + fresh + }); + const scheduler = new this({ db, keyManager, queue, logger }); + await scheduler.start({ handlers, delay, fresh }); + logger.info(`Created ${this.name}`); + return scheduler; + } + + protected logger: Logger; + protected db: DB; + protected keyManager: KeyManager; + protected queue: Queue; + protected handlers: Map = new Map(); + protected generateTaskId: () => TaskId; + protected processingTimer: ReturnType; + + protected schedulerDbPath: LevelPath = [this.constructor.name]; + // `tasks/{TaskId} -> {json(Task)}` + protected schedulerTasksDbPath: LevelPath = [this.constructor.name, 'tasks']; + // `time/{lexi(Timestamp)} -> {raw(TaskId)}` + protected schedulerTimeDbPath: LevelPath = [this.constructor.name, 'time']; + // `handlers/{TaskHandlerId}/{TaskId} -> {raw(TaskId)}` + protected schedulerHandlersDbPath: LevelPath = [this.constructor.name, 'handlers']; + + public constructor({ + db, + keyManager, + queue, + logger + }: { + db: DB; + keyManager: KeyManager; + queue: Queue; + logger: Logger + }) { + this.logger = logger; + this.db = db; + this.keyManager = keyManager; + this.queue = queue; + } + + public get isProcessing(): boolean { + return this.processingTimer != null; + } + + public setWorkerManager(workerManager: PolykeyWorkerManagerInterface): void { + this.queue?.setWorkerManager(workerManager); + } + + public unsetWorkerManager(): void { + this.queue?.unsetWorkerManager(); + } + + public async start({ + handlers = {}, + delay = false, + fresh = false, + }: { + handlers?: Record; + delay?: boolean; + fresh?: boolean; + } = {}): Promise { + this.logger.info(`Starting ${this.constructor.name}`); + if (fresh) { + this.handlers.clear(); + // this.promises.clear(); + await this.db.clear(this.schedulerDbPath); + } + for (const taskHandlerId in handlers) { + this.handlers.set(taskHandlerId as TaskHandlerId, handlers[taskHandlerId]); + } + const lastTaskId = await this.getLastTaskId(); + this.generateTaskId = tasksUtils.createTaskIdGenerator( + this.keyManager.getNodeId(), + lastTaskId, + ); + // Flip this to true to delay the processing start + // if task handler registration is done after the scheduler is created + if (!delay) { + await this.startProcessing(); + } + this.logger.info(`Started ${this.constructor.name}`); + } + + /** + * Stop the scheduler + * This does not clear the handlers nor promises + * This maintains any registered handlers and awaiting promises + */ + public async stop(): Promise { + this.logger.info(`Stopping ${this.constructor.name}`); + await this.stopProcessing(); + this.logger.info(`Stopped ${this.constructor.name}`); + } + + /** + * Destroys the scheduler + * This must first clear all handlers + * Then it needs to cancel all promises + * Finally destroys all underlying state + */ + public async destroy() { + this.logger.info(`Destroying ${this.constructor.name}`); + this.handlers.clear(); + + // TODO: cancel the task promises so that any function awaiting may receive a cancellation + // this.promises.clear(); + + await this.db.clear(this.schedulerDbPath); + this.logger.info(`Destroyed ${this.constructor.name}`); + } + + /** + * Starts the processing of the work + */ + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async startProcessing(): Promise { + // If already started, do nothing + if (this.processingTimer != null) return; + + // We actually need to find ht elast task + + await this.db.withTransactionF( + async (tran) => { + // we use the transaction here + // and we use it run our tasks + // every "execution" involves running it here + return; + } + ); + + // when we "pop" a task + // it is actually to peek at the latest task + // then to set a timeout + // the process is that we find tasks that are worth executing right now + // then we dispatch to execution + + + this.processingTimer = setTimeout(() => { + + }, 1000); + } + + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async stopProcessing(): Promise { + clearTimeout(this.processingTimer); + } + + public getHandler(handlerId: TaskHandlerId): TaskHandler | undefined { + return this.handlers.get(handlerId); + } + + public getHandlers(): Record { + return Object.fromEntries(this.handlers); + } + + /** + * Registers a handler for tasks with the same `TaskHandlerId` + * If tasks are dispatched without their respective handler, + * the scheduler will throw `tasksErrors.ErrorSchedulerHandlerMissing` + */ + public registerHandler(handlerId: TaskHandlerId, handler: TaskHandler) { + this.handlers.set(handlerId, handler); + } + + /** + * Deregisters a handler + */ + public deregisterHandler(handlerId: TaskHandlerId) { + this.handlers.delete(handlerId); + } + + /** + * Gets a scheduled task data + */ + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async getTaskData( + taskId: TaskId, + tran?: DBTransaction + ): Promise { + return await (tran ?? this.db).get( + [...this.schedulerTasksDbPath, taskId.toBuffer()] + ); + } + + /** + * Gets all scheduled task datas + * Tasks are sorted by the `TaskId` + */ + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async *getTaskDatas( + order: 'asc' | 'desc' = 'asc', + tran?: DBTransaction + ): AsyncGenerator<[TaskId, TaskData]> { + if (tran == null) { + return yield* this.db.withTransactionG( + (tran) => this.getTaskDatas(...arguments, tran) + ); + } + for await (const [keyPath, taskData] of tran.iterator( + this.schedulerTasksDbPath, + { valueAsBuffer: false, reverse: order !== 'asc' } + )) { + const taskId = IdInternal.fromBuffer(keyPath[0] as Buffer); + yield [ + taskId, + taskData + ]; + } + } + + // We may change the above system + // to also get you a "task" promise + // which you can also `await` on + // in doing so, it triggers a lazy promise + // That is `Task` itself is a lazy promise + // until you call `await` + + // Except where you `scheduleTask`, it's not a lazy promise + // it actually "adds" the event listener/promise into the system + // The queue manages this + + + // This should return a Record + // note that this would use the TaskIdString + // as it is supposed to be a POJO + // however the TaskId is has actual order + // so how do we deal with this + // I think this is an async generator + // to indicate order... + // becuase otherwise you have [Task, Task, Task] + // but on the otherhand, what does it mean to get order + // it isn't even the order of execution + // so task ids are unique + // and you can make use of it + // and we don't return Map() + // most of the time that is + // public async getTasks(): Record { + // } + + // I think we can avoid this + // actually + // what if we say it's not an enumerable property? + + /** + * Gets a task abstraction + */ + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async getTask(id: TaskId, tran?: DBTransaction) { + const taskData = await (tran ?? this.db).get([...this.queueTasksDbPath, id.toBuffer()]); + if (taskData == null) { + return; + } + const { p: taskP, resolveP, rejectP } = utils.promise(); + + // can we standardise on the unified listener + // that is 1 listener for every task is created automatically + // if 1000 tasks are inserted into the DB + // 1000 listeners are created automatically? + + // we can either... + // A standardise on the listener + // B standardise on the promise + + // if the creation of the promise is lazy + // then one can standardise on the promise + // the idea being if the promise exists, just return the promise + // if it doesn't exist, then first check if the task id still exists + // if so, create a promise out of that lazily + // now you need an object map locking to prevent race conditions on promise creation + // then there's only ever 1 promise for a given task + // any other cases, they always give back the same promise + + + const listener = (taskError, taskResult) => { + if (taskError != null) { + rejectP(taskError); + } else { + resolveP(taskResult); + } + this.deregisterListener(id, listener); + }; + this.registerListener(id, listener); + return taskP; + } + + // do we create a promise + // hod do we do this + // if we already have this + // then we need to assign it somehow? + + // if it is lazy + // we just give back a promise without additional things? + // or we don't bother with it + // because there is no event handler + + + protected async getTaskP(taskId: TaskId, lazy: boolean, tran: DBTransaction) { + + // does that mean we don't extend the promise? + // we just make it look like it + // i guess it works too + // if not lazy, then we do it immediately + // we would assert that we already have this + // so if it is not lazy + // it is not necessary to do this? + + if (!lazy) { + const taskData = await tran.get([...this.queueTasksDbPath, taskId.toBuffer()]); + if (taskData == null) { + return; + } + + } + + const { p: taskP, resolveP, rejectP } = utils.promise(); + const listener = (taskError, taskResult) => { + if (taskError != null) { + rejectP(taskError); + } else { + resolveP(taskResult); + } + this.deregisterListener(id, listener); + }; + this.registerListener(id, listener); + + // can we say that taskP + + return taskP; + } + + // we need task execution as well + // how is this done + // we will loop through the last major task + // and we know what the timeout is + + + + // pushTask + // is the task lazy or not? + + @ready(new tasksErrors.ErrorSchedulerNotRunning()) + public async scheduleTask( + name: string, + parameters: Array, + timeout: number = 0, + lazy: boolean = false, + tran?: DBTransaction + ): Promise { + if (tran == null) { + return this.db.withTransactionF( + (tran) => this.scheduleTask.apply(this, [...arguments, tran]) + ); + } + const taskId = this.generateTaskId(); + // Timestamp extracted from `IdSortable` is a floating point in seconds with subsecond fractionals + // multiply it by 1000 gives us millisecond resolution + const taskTimestamp = Math.trunc(extractTs(taskId) * 1000); + const taskData = { + name, + parameters, + timestamp: taskTimestamp, + timeout: timeout + }; + await tran.put( + [...this.queueTasksDbPath, taskId.toBuffer()], + taskData + ); + // Timestamp -> TaskId + const taskScheduledLexi = Buffer.from(lexi.pack(taskTimestamp + timeout)); + await tran.put( + [...this.queueTaskScheduleDbPath, taskScheduledLexi], + taskId.toBuffer(), + true + ); + + // I think I need to construct this queue + // here and slowly abstract it out + // and then doesn't use this this anyway + + // lazy means false as the default + // so that means we create the tracker immediately + // to do, an event must be registered for the task ID immediately + + if (!lazy) { + // task().then(onFullfill, onReject).finally(onFinally) + + + const { p: taskP, resolveP: resolveTaskP, rejectP: rejectTaskP } = utils.promise(); + this.promises.set( + taskId.toString() as TaskIdString, + { + taskP, + resolveTaskP, + rejectTaskP + } + ); + + // const taskListener = (taskError, taskResult) => { + // if (taskError != null) { + // resolveTaskP(taskError); + // } else { + // rejectTaskP(taskResult); + // } + // this.deregisterListener(taskId, taskListener); + // }; + // this.registerListener(taskId, taskListener); + } + + + + const task = { + id: taskId, + ...taskData, + then: async (onFulfilled, onRejected) => { + + // if this taskP already exists + // then there's no need to set it up? + const taskP = this.promises.get(taskId.toString() as TaskIdString); + + + + + // this is going to be bound to somnething? + // we need to create a promise for it? + // but this means you start doing this by default + + } + }; + + // We want to return 2 things: + // a promise + // it returns a promise yes + // but the promise resolves to 2 things + // 1. the task information (this is because the taskdata combined together) + // 2. the task promise - a cancellable promise + // the task promise resolves when the task is executed + + return [ + { + id: taskId, + ...taskData, + } + ]; + + } + + // we have to start the loop + // the `setTimeout` is what actually starts the execution + // Pop up the next highest priority + + // when pushing a task + // it is "scheduled" + // but that is not what happens here + + // instead scheduling is triggered in 2 ways + // one by starting the system + // and another when a task is entered into the system + // in both cases, a trigger takes place + + public async popTask(tran?: DBTransaction) { + if (tran == null) { + return this.db.withTransactionF( + (tran) => this.popTask.apply(this, [...arguments, tran]) + ); + } + let taskId: TaskId | undefined; + let taskData: TaskData | undefined; + for await (const [, taskIdBuffer] of tran.iterator( + this.queueTaskScheduleDbPath, + { + limit: 1, + keys: false + } + )) { + taskId = IdInternal.fromBuffer(taskIdBuffer); + taskData = await tran.get( + [...this.queueTasksDbPath, taskIdBuffer] + ); + } + return { + id: taskId, + ...taskData + }; + } + + + @ready(new tasksErrors.ErrorSchedulerNotRunning(), false, ['starting']) + public async getLastTaskId(tran?: DBTransaction): Promise { + if (tran == null) { + return this.db.withTransactionF( + (tran) => this.getLastTaskId.apply(this, [...arguments, tran]) + ); + } + let lastTaskId: TaskId | undefined; + for await (const [keyPath] of tran.iterator( + this.queueTasksDbPath, + { + limit: 1, + reverse: true, + values: false , + } + )) { + lastTaskId = IdInternal.fromBuffer(keyPath[0] as Buffer); + } + return lastTaskId; + } +} + +export default Scheduler; diff --git a/src/tasks/Task.ts b/src/tasks/Task.ts new file mode 100644 index 0000000000..6d94274500 --- /dev/null +++ b/src/tasks/Task.ts @@ -0,0 +1,120 @@ +import type Queue from "./Scheduler"; +import type { TaskId, TaskData } from "./types"; + +// do we say the task info +// no asynchronus creation for now +// this doesn't have a explicit destruction + +// new Task(this, id, taskData) +// new Task({ queue: this, id, blah, blah, }) +// note that the type is part of this +// so we can dicatate what we need here + +class Task extends Promise { + + public readonly id: TaskId; + public readonly name: string; + public readonly timestamp: number; + public readonly delay: number; + + protected queue: Queue; + protected resolveP: (value: T | PromiseLike) => void; + protected rejectP: (reason?: any) => void; + + constructor( + queue: Queue, + id: TaskId, + name: string, + timestamp: number, + delay: number + ) { + let resolveP, rejectP; + super((resolve, reject) => { + resolveP = resolve; + rejectP = reject; + }); + this.resolveP = resolveP; + this.rejectP = rejectP; + this.queue = queue; + this.id = id; + this.name = name; + this.timestamp = timestamp; + this.delay = delay; + } + + /** + * This is called when `await` is used + */ + public async then( + onFulfilled?: ((value: T) => TResult1 | PromiseLike) | undefined | null, + onRejected?: ((reason: any) => TResult2 | PromiseLike) | undefined | null + ): Promise { + + // this is the promise now + // we can say that we only do what is needed + // we can make this `then` asynchronous + // do we use the same db + // or ask the Task to have the same capability? + + + + return undefined as any; + } + + // public then( + // onFulfilled?: ((value: T) => TResult1 | PromiseLike) | undefined | null, + // onRejected?: ((reason: any) => TResult2 | PromiseLike) | undefined | null, + // ): Promise { + + // // these callbacks + // // how are they supposed to be used? + // // this is a promise + // return undefined as any; + + // } + +// then( +// onfulfilled?: ((value: T) => TResult1 | PromiseLike) | undefined | null, +// onrejected?: ((reason: any) => TResult2 | PromiseLike) | undefined | null): +// Promise; + + // public catch () { + + // } + + // public finally () { + + // } + +} + +// const t = new Task(); + +const p = new Promise((resolve, reject) => { + resolve(); +}); + +p.then +// p.catch +// p.finally +// /** +// * Represents the completion of an asynchronous operation +// */ +// interface Promise { +// /** +// * Attaches callbacks for the resolution and/or rejection of the Promise. +// * @param onfulfilled The callback to execute when the Promise is resolved. +// * @param onrejected The callback to execute when the Promise is rejected. +// * @returns A Promise for the completion of which ever callback is executed. +// */ + +// /** +// * Attaches a callback for only the rejection of the Promise. +// * @param onrejected The callback to execute when the Promise is rejected. +// * @returns A Promise for the completion of the callback. +// */ +// catch(onrejected?: ((reason: any) => TResult | PromiseLike) | undefined | null): Promise; +// } + + +export default Task; diff --git a/src/tasks/errors.ts b/src/tasks/errors.ts new file mode 100644 index 0000000000..d336203447 --- /dev/null +++ b/src/tasks/errors.ts @@ -0,0 +1,79 @@ +import { ErrorPolykey, sysexits } from '../errors'; + +class ErrorTasks extends ErrorPolykey {} + +class ErrorScheduler extends ErrorTasks {} + +class ErrorSchedulerRunning extends ErrorScheduler { + static description = 'Scheduler is running'; + exitCode = sysexits.USAGE; +} + +class ErrorSchedulerNotRunning extends ErrorScheduler { + static description = 'Scheduler is not running'; + exitCode = sysexits.USAGE; +} + +class ErrorSchedulerDestroyed extends ErrorScheduler { + static description = 'Scheduler is destroyed'; + exitCode = sysexits.USAGE; +} + +class ErrorSchedulerHandlerMissing extends ErrorScheduler { + static description = 'Scheduler task handler is not registered'; + exitCode = sysexits.USAGE; +} + +class ErrorQueue extends ErrorTasks {} + +class ErrorQueueRunning extends ErrorQueue { + static description = 'Queue is running'; + exitCode = sysexits.USAGE; +} + +class ErrorQueueNotRunning extends ErrorQueue { + static description = 'Queue is not running'; + exitCode = sysexits.USAGE; +} + +class ErrorQueueDestroyed extends ErrorQueue { + static description = 'Queue is destroyed'; + exitCode = sysexits.USAGE; +} + +class ErrorTask extends ErrorTasks { + static description = 'Task error'; + exitCode = sysexits.USAGE; +} + +class ErrorTaskRejected extends ErrorTask { + static description = 'Task handler threw an exception'; + exitCode = sysexits.USAGE; +} + +class ErrorTaskCancelled extends ErrorTask { + static description = 'Task has been cancelled'; + exitCode = sysexits.USAGE; +} + +class ErrorTaskMissing extends ErrorTask { + static description = 'Task does not (or never) existed anymore, it may have been fulfilled or cancelled'; + exitCode = sysexits.USAGE; +} + +export { + ErrorTasks, + ErrorScheduler, + ErrorSchedulerRunning, + ErrorSchedulerNotRunning, + ErrorSchedulerDestroyed, + ErrorSchedulerHandlerMissing, + ErrorQueue, + ErrorQueueRunning, + ErrorQueueNotRunning, + ErrorQueueDestroyed, + ErrorTask, + ErrorTaskRejected, + ErrorTaskCancelled, + ErrorTaskMissing, +}; diff --git a/src/tasks/index.ts b/src/tasks/index.ts new file mode 100644 index 0000000000..ae900e45bb --- /dev/null +++ b/src/tasks/index.ts @@ -0,0 +1,4 @@ +export { default as Scheduler } from './Scheduler'; +export * as types from './types'; +export * as utils from './utils'; +export * as errors from './errors'; diff --git a/src/tasks/types.ts b/src/tasks/types.ts new file mode 100644 index 0000000000..1e91d084b1 --- /dev/null +++ b/src/tasks/types.ts @@ -0,0 +1,72 @@ +import type { Id } from '@matrixai/id'; +import type { POJO, Opaque, Callback } from '../types'; + +type TaskId = Opaque<'TaskId', Id>; +type TaskIdString = Opaque<'TaskIdString', string>; +type TaskIdEncoded = Opaque<'TaskIdEncoded', string>; + +/** + * Task data to be persisted + */ +type TaskData = { + handler: TaskHandlerId; + parameters: Array; + timestamp: number; + delay: number; +}; + +/** + * Task information that is returned to the user + */ +type TaskInfo = TaskData & { + id: TaskId; +}; + +type TaskHandlerId = Opaque<'TaskHandlerId', string>; + +type TaskHandler< + P extends Array = [], + R = any +> = (...params: P) => Promise; + +// type TaskListener = Callback<[taskResult: any], void>; +// Make Task something that can be awaited on +// but when you "make" a promise or reference it +// you're for a promise +// that will resolve an event occurs +// or reject when an event occurs +// and the result of the execution +// now the exeuction of the event itself is is going to return ap romise +// something must be lisetning to it +// If you have a Record +// it has to be TaskIdString +// you can store things in it +// type X = Record; +// Task is the lowest level +// TaskData is low level +// TaskInfo is high level +// TaskId +// Task <- lazy promise +// TaskData <- low level data of a task (does not include id) +// TaskInfo <- high level (includes id) +// This is a lazy promise +// it's a promise of something that may not yet immediately executed +// type TaskPromise = Promise; +// Consider these variants... (should standardise what these are to be used) +// Task +// Tasks (usually a record, sometimes an array) +// TaskData - lower level data of a task +// TaskInfo - higher level information that is inclusive of data +// type TaskData = Record; + +export type { + TaskId, + TaskIdString, + TaskIdEncoded, + // Task, + TaskData, + TaskInfo, + TaskHandlerId, + TaskHandler, + // TaskListener +}; diff --git a/src/tasks/utils.ts b/src/tasks/utils.ts new file mode 100644 index 0000000000..50d7d34ecd --- /dev/null +++ b/src/tasks/utils.ts @@ -0,0 +1,21 @@ +import type { TaskId } from './types'; +import type { NodeId } from '../nodes/types'; +import { IdSortable } from '@matrixai/id'; + +/** + * Generates TaskId + * TaskIds are lexicographically sortable 128 bit IDs + * They are strictly monotonic and unique with respect to the `nodeId` + * When the `NodeId` changes, make sure to regenerate this generator + */ +function createTaskIdGenerator(nodeId: NodeId, lastTaskId?: TaskId) { + const generator = new IdSortable({ + lastId: lastTaskId, + nodeId, + }); + return () => generator.get(); +} + +export { + createTaskIdGenerator +}; diff --git a/src/timer/Timer.ts b/src/timer/Timer.ts new file mode 100644 index 0000000000..da0f288303 --- /dev/null +++ b/src/timer/Timer.ts @@ -0,0 +1,7 @@ +class Timer { + + protected timer: ReturnType; + public readonly timerP: Promise; + protected _timedOut: boolean; + +} diff --git a/src/tracing/Trace.ts b/src/tracing/Trace.ts new file mode 100644 index 0000000000..dd930ad72a --- /dev/null +++ b/src/tracing/Trace.ts @@ -0,0 +1,2 @@ +// creates Traces +// should be used to create `Span` diff --git a/test-promises.ts b/test-promises.ts new file mode 100644 index 0000000000..546d21f9b1 --- /dev/null +++ b/test-promises.ts @@ -0,0 +1,35 @@ +async function main () { + + const p = new Promise((resolve, reject) => { + setTimeout(() => { + reject(new Error('oh no')) + }, 500); + setTimeout(resolve, 1000); + }); + + const f = async () => { + await p; + return 'f'; + }; + + const g = async () => { + await p; + return 'g'; + }; + + const r = await Promise.allSettled([ + f(), + g() + ]); + + console.log(r); + + // @ts-ignore + console.log(r[0].reason === r[1].reason); // This is `true` + + // The same exception object is thrown to all awaiters + +} + +void main(); + diff --git a/test-transaction-decorator.ts b/test-transaction-decorator.ts new file mode 100644 index 0000000000..701488fb01 --- /dev/null +++ b/test-transaction-decorator.ts @@ -0,0 +1,100 @@ +const AsyncFunction = (async () => {}).constructor; +const GeneratorFunction = function* () {}.constructor; +const AsyncGeneratorFunction = async function* () {}.constructor; + +// function transaction any> (callback: T): T { +// return ((...args: [...any, number]) => { +// // we take all the arguments +// // it is necessary to take the last argument +// // how many arguments are we talking about +// // if it is undefined? +// // how does it work? +// if (args[args.length - 1] === undefined) { +// this.db.withTransactionF( +// (tran) => { +// return f.apply(this, ...args, tran); +// } +// ); + + +// } + +// }) as any; +// } + +// transactional (vs transaction) +// then we use the db +// @transaction(this.db) + +function transaction(db: { a: string }) { + return ( + target: any, + key: string, + descriptor: TypedPropertyDescriptor<(tran: string) => any> + ) => { + const f = descriptor.value; + if (typeof f !== 'function') { + throw new TypeError(`${key} is not a function`); + } + descriptor.value = function (tran: string) { + return f.call(this, tran ?? db.a); + }; + return descriptor; + }; +} + +// function dec(f: () => string) { +// return () => f(); +// } + +// // the method signature takes tran: DBTransaction +// // but the decorator cannot change the type +// // it would still be "required" + +// const task = ( +// target: any, +// propertyKey: string, +// descriptor: TypedPropertyDescriptor<(name: string) => void>, +// ) => { +// const original = descriptor.value +// descriptor.value = function () { +// return original?.call(this, 'Mark') +// } +// return descriptor +// } + + +class X { + + protected y: string = 'parameter'; + + @transaction({a: 'abc'}) + public async foo(name?: string): Promise { + return name! + this.y; + } + + @transaction({a: 'blah'}) + public bar(name?: string): string { + return name!; + } + + // @task + // public done(name: string) { + + // } + +} + +const x = new X(); + +async function main () { + + const r = await x.foo('OVER'); + console.log(r); + + + +} + + +void main(); diff --git a/tests/tasks/Scheduler.test.ts b/tests/tasks/Scheduler.test.ts new file mode 100644 index 0000000000..d0c95e9726 --- /dev/null +++ b/tests/tasks/Scheduler.test.ts @@ -0,0 +1,72 @@ +import os from 'os'; +import path from 'path'; +import fs from 'fs'; +import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; +import { DB } from '@matrixai/db'; +import KeyManager from '@/keys/KeyManager'; +import Queue from '@/tasks/Queue'; +import Scheduler from '@/tasks/Scheduler'; +import * as keysUtils from '@/keys/utils'; +import { globalRootKeyPems } from '../fixtures/globalRootKeyPems'; + +describe(Queue.name, () => { + const password = 'password'; + const logger = new Logger(`${Queue.name} test`, LogLevel.WARN, [ + new StreamHandler(), + ]); + let keyManager: KeyManager; + let dbKey: Buffer; + let dbPath: string; + let db: DB; + beforeAll(async () => { + dataDir = await fs.promises.mkdtemp( + path.join(os.tmpdir(), 'polykey-test-'), + ); + const keysPath = `${dataDir}/keys`; + keyManager = await KeyManager.createKeyManager({ + password, + keysPath, + logger, + privateKeyPemOverride: globalRootKeyPems[0], + }); + dbKey = await keysUtils.generateKey(); + dbPath = `${dataDir}/db`; + }); + beforeEach(async () => { + db = await DB.createDB({ + dbPath, + logger, + crypto: { + key: dbKey, + ops: { + encrypt: keysUtils.encryptWithKey, + decrypt: keysUtils.decryptWithKey, + }, + }, + }); + }); + afterEach(async () => { + await db.stop(); + await db.destroy(); + }); + test('do it', async () => { + const queue = await Scheduler.createScheduler({ + db, + keyManager, + logger, + }); + + await queue.registerHandler('somename', async () => { + console.log('hi'); + }); + + const result = await queue.pushTask('somename', [], 1000); + + console.log(result); + + + + await queue.stop(); + await queue.destroy(); + }); +});