From d1ac7a3385785f6f2176c37163323aad5e13fc28 Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Thu, 30 May 2024 14:58:50 -0400 Subject: [PATCH] feat(orchestrator): implement task processor (#2221) ## Describe your changes This PR is implementing the orchestrator processor which, for now, is being run by jobs. The processor is behind a flag `flag:orchestrator:dryrun:process:global`. It is still in dryrun mode since no real processing is happening, tasks are either immediately succeeded or failed. It is the last step before actually processing webhook/actions ### Notes - I have added orchestrator endpoints to support dequeueing, set task state, heartbeat and their respective functions in the orchestrator client - A processor dequeues tasks in a infinite loop and process the task via the `process` functions passed as argument. - A processor also check if pending tasks have been completed (ie: they might have expired or be cancelled) and send abort signal. - In jobs, 2 processor worker_thread are being started. One for action and one for webhook to reproduce what we have right now with webhooks being in a separate temporal queue Tested in staging ## Checklist before requesting a review (skip if just adding/editing APIs & templates) - [x] I added tests, otherwise the reason is: - [ ] I added observability, otherwise the reason is: - [ ] I added analytics, otherwise the reason is: --- .env.example | 3 + package-lock.json | 13 + packages/jobs/Dockerfile | 1 + packages/jobs/lib/app.ts | 28 +- packages/jobs/lib/constants.ts | 1 - packages/jobs/lib/env.ts | 3 + packages/jobs/lib/processor/handler.ts | 34 +++ packages/jobs/lib/processor/processor.ts | 50 ++++ .../lib/processor/processor.worker.boot.ts | 11 + .../jobs/lib/processor/processor.worker.ts | 94 +++++++ packages/jobs/lib/temporal.ts | 3 +- packages/jobs/nodemon.json | 2 +- packages/jobs/package.json | 1 + packages/orchestrator/lib/client.ts | 166 ----------- .../{ => clients}/client.integration.test.ts | 138 ++++++++- packages/orchestrator/lib/clients/client.ts | 264 ++++++++++++++++++ .../lib/clients/processor.integration.test.ts | 143 ++++++++++ .../orchestrator/lib/clients/processor.ts | 146 ++++++++++ packages/orchestrator/lib/clients/types.ts | 106 +++++++ packages/orchestrator/lib/clients/validate.ts | 53 ++++ packages/orchestrator/lib/events.ts | 19 +- packages/orchestrator/lib/index.ts | 4 +- .../lib/routes/{health.ts => getHealth.ts} | 7 +- .../orchestrator/lib/routes/v1/postDequeue.ts | 86 ++++++ .../v1/{schedule.ts => postSchedule.ts} | 74 +++-- .../orchestrator/lib/routes/v1/postSearch.ts | 56 ++++ .../lib/routes/v1/tasks/putTaskId.ts | 67 +++++ .../output.ts => tasks/taskId/getOutput.ts} | 21 +- .../routes/v1/tasks/taskId/postHeartbeat.ts | 45 +++ packages/orchestrator/lib/server.ts | 20 +- packages/orchestrator/lib/types.ts | 2 + .../lib/models/tasks.integration.test.ts | 16 +- packages/scheduler/lib/models/tasks.ts | 10 +- packages/scheduler/lib/scheduler.ts | 12 +- packages/scheduler/lib/types.ts | 3 +- packages/server/Dockerfile | 1 + packages/shared/lib/clients/orchestrator.ts | 10 +- packages/utils/lib/environment/parse.ts | 2 +- packages/utils/lib/result.ts | 12 + 39 files changed, 1462 insertions(+), 265 deletions(-) delete mode 100644 packages/jobs/lib/constants.ts create mode 100644 packages/jobs/lib/env.ts create mode 100644 packages/jobs/lib/processor/handler.ts create mode 100644 packages/jobs/lib/processor/processor.ts create mode 100644 packages/jobs/lib/processor/processor.worker.boot.ts create mode 100644 packages/jobs/lib/processor/processor.worker.ts delete mode 100644 packages/orchestrator/lib/client.ts rename packages/orchestrator/lib/{ => clients}/client.integration.test.ts (54%) create mode 100644 packages/orchestrator/lib/clients/client.ts create mode 100644 packages/orchestrator/lib/clients/processor.integration.test.ts create mode 100644 packages/orchestrator/lib/clients/processor.ts create mode 100644 packages/orchestrator/lib/clients/types.ts create mode 100644 packages/orchestrator/lib/clients/validate.ts rename packages/orchestrator/lib/routes/{health.ts => getHealth.ts} (70%) create mode 100644 packages/orchestrator/lib/routes/v1/postDequeue.ts rename packages/orchestrator/lib/routes/v1/{schedule.ts => postSchedule.ts} (58%) create mode 100644 packages/orchestrator/lib/routes/v1/postSearch.ts create mode 100644 packages/orchestrator/lib/routes/v1/tasks/putTaskId.ts rename packages/orchestrator/lib/routes/v1/{task/taskId/output.ts => tasks/taskId/getOutput.ts} (79%) create mode 100644 packages/orchestrator/lib/routes/v1/tasks/taskId/postHeartbeat.ts create mode 100644 packages/orchestrator/lib/types.ts diff --git a/.env.example b/.env.example index d7080ae60e..833a7ab761 100644 --- a/.env.example +++ b/.env.example @@ -99,3 +99,6 @@ MAILGUN_API_KEY= # Redis (optional) NANGO_REDIS_URL= + +# Orchestrator +ORCHESTRATOR_SERVICE_URL="http://localhost:3008" diff --git a/package-lock.json b/package-lock.json index f9b19a0c17..5368dd2168 100644 --- a/package-lock.json +++ b/package-lock.json @@ -33449,10 +33449,23 @@ "@types/md5": "^2.3.2", "@types/node": "^20.12.2", "nodemon": "^3.0.1", + "type-fest": "4.14.0", "typescript": "^5.3.3", "vitest": "0.33.0" } }, + "packages/jobs/node_modules/type-fest": { + "version": "4.14.0", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.14.0.tgz", + "integrity": "sha512-on5/Cw89wwqGZQu+yWO0gGMGu8VNxsaW9SB2HE8yJjllEk7IDTwnSN1dUVldYILhYPN5HzD7WAaw2cc/jBfn0Q==", + "dev": true, + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "packages/logs": { "name": "@nangohq/logs", "version": "1.0.0", diff --git a/packages/jobs/Dockerfile b/packages/jobs/Dockerfile index 5c4a053959..a2942c77bc 100644 --- a/packages/jobs/Dockerfile +++ b/packages/jobs/Dockerfile @@ -16,6 +16,7 @@ COPY packages/data-ingestion/ packages/data-ingestion/ COPY packages/jobs/ packages/jobs/ COPY packages/logs/ packages/logs/ COPY packages/runner/ packages/runner/ +COPY packages/scheduler/ packages/scheduler/ COPY packages/orchestrator/ packages/orchestrator/ COPY package*.json ./ diff --git a/packages/jobs/lib/app.ts b/packages/jobs/lib/app.ts index 43348e80c9..c7fbe7a58a 100644 --- a/packages/jobs/lib/app.ts +++ b/packages/jobs/lib/app.ts @@ -1,25 +1,41 @@ import './tracer.js'; import { Temporal } from './temporal.js'; +import { Processor } from './processor/processor.js'; import { server } from './server.js'; import { cronAutoIdleDemo } from './crons/autoIdleDemo.js'; import { deleteOldActivityLogs } from './crons/deleteOldActivities.js'; import { deleteSyncsData } from './crons/deleteSyncsData.js'; import { reconcileTemporalSchedules } from './crons/reconcileTemporalSchedules.js'; import { getLogger, stringifyError } from '@nangohq/utils'; -import { JOBS_PORT } from './constants.js'; -import { db } from '@nangohq/shared'; +import { db, featureFlags } from '@nangohq/shared'; +import { envs } from './env.js'; const logger = getLogger('Jobs'); try { - server.listen(JOBS_PORT); - logger.info(`🚀 service ready at http://localhost:${JOBS_PORT}`); - const temporalNs = process.env['TEMPORAL_NAMESPACE'] || 'default'; + const port = envs.NANGO_JOBS_PORT; + const temporalNs = envs.TEMPORAL_NAMESPACE; + const orchestratorServiceUrl = envs.ORCHESTRATOR_SERVICE_URL; + server.listen(port); + logger.info(`🚀 service ready at http://localhost:${port}`); const temporal = new Temporal(temporalNs); + const processor = new Processor(orchestratorServiceUrl); // This promise never resolve void temporal.start(); + // Start processor + const getFlag = () => featureFlags.isEnabled('orchestrator:dryrun:process', 'global', false, false); + const processorFlagTimer = setInterval(async () => { + const isProcessorEnabled = await getFlag(); + if (isProcessorEnabled && processor.isStopped()) { + processor.start(); + } + if (!isProcessorEnabled && !processor.isStopped()) { + processor.stop(); + } + }, 1000); + db.enableMetrics(); // Register recurring tasks @@ -31,6 +47,8 @@ try { // handle SIGTERM process.on('SIGTERM', () => { temporal.stop(); + processor.stop(); + clearInterval(processorFlagTimer); server.server.close(() => { process.exit(0); }); diff --git a/packages/jobs/lib/constants.ts b/packages/jobs/lib/constants.ts deleted file mode 100644 index 81dedef6b1..0000000000 --- a/packages/jobs/lib/constants.ts +++ /dev/null @@ -1 +0,0 @@ -export const JOBS_PORT = parseInt(process.env['NANGO_JOBS_PORT'] || '') || 3005; diff --git a/packages/jobs/lib/env.ts b/packages/jobs/lib/env.ts new file mode 100644 index 0000000000..e7c605d286 --- /dev/null +++ b/packages/jobs/lib/env.ts @@ -0,0 +1,3 @@ +import { ENVS, parseEnvs } from '@nangohq/utils'; + +export const envs = parseEnvs(ENVS.required({ ORCHESTRATOR_SERVICE_URL: true })); diff --git a/packages/jobs/lib/processor/handler.ts b/packages/jobs/lib/processor/handler.ts new file mode 100644 index 0000000000..a7e8462c8e --- /dev/null +++ b/packages/jobs/lib/processor/handler.ts @@ -0,0 +1,34 @@ +import type { OrchestratorTask, TaskWebhook, TaskAction } from '@nangohq/nango-orchestrator'; +import type { JsonValue } from 'type-fest'; +import { Err, Ok } from '@nangohq/utils'; +import type { Result } from '@nangohq/utils'; + +export async function handler(task: OrchestratorTask): Promise> { + task.abortController.signal.onabort = () => { + abort(task); + }; + if (task.isAction()) { + return action(task); + } + if (task.isWebhook()) { + return webhook(task); + } + return Err(`Unreachable`); +} + +async function abort(_task: OrchestratorTask): Promise> { + // TODO: Implement abort processing + return Ok(undefined); +} + +async function action(task: TaskAction): Promise> { + // TODO: Implement action processing + // Returning a successful result for now + return Ok({ taskId: task.id, dryrun: true }); +} + +async function webhook(task: TaskWebhook): Promise> { + // TODO: Implement action processing + // Returning an error for now + return Err(`Not implemented: ${JSON.stringify({ taskId: task.id })}`); +} diff --git a/packages/jobs/lib/processor/processor.ts b/packages/jobs/lib/processor/processor.ts new file mode 100644 index 0000000000..7dd41e9714 --- /dev/null +++ b/packages/jobs/lib/processor/processor.ts @@ -0,0 +1,50 @@ +import { getLogger } from '@nangohq/utils'; +import { ProcessorWorker } from './processor.worker.js'; + +const logger = getLogger('jobs.processor'); + +export class Processor { + private orchestratorServiceUrl: string; + private workers: ProcessorWorker[]; + private stopped: boolean; + + constructor(orchestratorServiceUrl: string) { + this.orchestratorServiceUrl = orchestratorServiceUrl; + this.workers = []; + this.stopped = true; + } + + isStopped() { + return this.stopped; + } + + start() { + logger.info('Starting task processors'); + try { + const actionWorker = new ProcessorWorker({ + orchestratorUrl: this.orchestratorServiceUrl, + groupKey: 'action', + maxConcurrency: 100 + }); + actionWorker.start(); + + const webhookWorker = new ProcessorWorker({ + orchestratorUrl: this.orchestratorServiceUrl, + groupKey: 'webhook', + maxConcurrency: 50 + }); + webhookWorker.start(); + this.workers = [actionWorker, webhookWorker]; + this.stopped = false; + } catch (e) { + logger.error(e); + } + } + + stop() { + if (this.workers) { + this.workers.forEach((worker) => worker.stop()); + } + this.stopped = true; + } +} diff --git a/packages/jobs/lib/processor/processor.worker.boot.ts b/packages/jobs/lib/processor/processor.worker.boot.ts new file mode 100644 index 0000000000..ae18b7ba2c --- /dev/null +++ b/packages/jobs/lib/processor/processor.worker.boot.ts @@ -0,0 +1,11 @@ +import { isMainThread, parentPort, workerData } from 'node:worker_threads'; +import { getLogger } from '@nangohq/utils'; +import { ProcessorChild } from './processor.worker.js'; + +const logger = getLogger('processor.worker.boot'); + +if (!isMainThread && parentPort) { + new ProcessorChild(parentPort, workerData); +} else { + logger.error('Processor should not be instantiated in the main thread'); +} diff --git a/packages/jobs/lib/processor/processor.worker.ts b/packages/jobs/lib/processor/processor.worker.ts new file mode 100644 index 0000000000..71b7a85bd2 --- /dev/null +++ b/packages/jobs/lib/processor/processor.worker.ts @@ -0,0 +1,94 @@ +import * as fs from 'fs'; +import type { MessagePort } from 'node:worker_threads'; +import { Worker, isMainThread } from 'node:worker_threads'; +import { getLogger, stringifyError } from '@nangohq/utils'; +import { OrchestratorClient, OrchestratorProcessor } from '@nangohq/nango-orchestrator'; +import { handler } from './handler.js'; + +const logger = getLogger('jobs.processor.worker'); + +export class ProcessorWorker { + private worker: Worker | null; + constructor({ orchestratorUrl, groupKey, maxConcurrency }: { orchestratorUrl: string; groupKey: string; maxConcurrency: number }) { + if (isMainThread) { + const url = new URL('../../dist/processor/processor.worker.boot.js', import.meta.url); + if (!fs.existsSync(url)) { + throw new Error(`Processor worker boot script not found at ${url}`); + } + this.worker = new Worker(url, { workerData: { orchestratorUrl, groupKey, maxConcurrency } }); + this.worker.on('error', (err) => { + logger.error(`ProcessorWorker exited with error: ${stringifyError(err)}`); + }); + this.worker.on('exit', (code) => { + if (code !== 0) { + logger.error(`ProcessorWorker exited with exit code: ${code}`); + } + }); + } else { + throw new Error('ProcessorWorker should be instantiated in the main thread'); + } + } + + start(): void { + this.worker?.postMessage('start'); + } + + stop(): void { + if (this.worker) { + this.worker.postMessage('stop'); + this.worker = null; + } + } +} + +export class ProcessorChild { + private parent: MessagePort; + private processor: OrchestratorProcessor; + private opts: { + orchestratorUrl: string; + groupKey: string; + maxConcurrency: number; + }; + + constructor(parent: MessagePort, workerData: { orchestratorUrl: string; groupKey: string; maxConcurrency: number }) { + if (isMainThread) { + throw new Error('Processor should not be instantiated in the main thread'); + } + if (!workerData.orchestratorUrl || !workerData.groupKey || workerData.maxConcurrency <= 0) { + throw new Error( + `Missing required options for processor worker. Expecting orchestratorUrl, groupKey, maxConcurrency > 0, got: ${JSON.stringify(workerData)}` + ); + } + this.opts = workerData; + this.parent = parent; + this.parent.on('message', async (msg: 'start' | 'stop') => { + switch (msg) { + case 'start': + await this.start(); + break; + case 'stop': + this.stop(); + break; + } + }); + const client = new OrchestratorClient({ baseUrl: this.opts.orchestratorUrl }); + this.processor = new OrchestratorProcessor({ + handler, + opts: { + orchestratorClient: client, + groupKey: this.opts.groupKey, + maxConcurrency: this.opts.maxConcurrency + } + }); + } + + async start(): Promise { + logger.info(`Starting Processor: ${JSON.stringify(this.opts)}`); + this.processor.start(); + } + + stop(): void { + logger.info(`Stopping Processor: ${JSON.stringify(this.opts)}`); + this.processor.stop(); + } +} diff --git a/packages/jobs/lib/temporal.ts b/packages/jobs/lib/temporal.ts index b71e81a299..a883df7e64 100644 --- a/packages/jobs/lib/temporal.ts +++ b/packages/jobs/lib/temporal.ts @@ -5,10 +5,11 @@ import { createRequire } from 'module'; import * as activities from './activities.js'; import { SYNC_TASK_QUEUE, WEBHOOK_TASK_QUEUE } from '@nangohq/shared'; import { isProd, isEnterprise, getLogger } from '@nangohq/utils'; +import { envs } from './env.js'; const logger = getLogger('Jobs.Temporal'); -const TEMPORAL_WORKER_MAX_CONCURRENCY = parseInt(process.env['TEMPORAL_WORKER_MAX_CONCURRENCY'] || '0') || 500; +const TEMPORAL_WORKER_MAX_CONCURRENCY = envs.TEMPORAL_WORKER_MAX_CONCURRENCY; export class Temporal { namespace: string; diff --git a/packages/jobs/nodemon.json b/packages/jobs/nodemon.json index 146f312628..55169b7959 100644 --- a/packages/jobs/nodemon.json +++ b/packages/jobs/nodemon.json @@ -1,5 +1,5 @@ { - "watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../../.env"], + "watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../orchestrator/dist", "../../.env"], "ext": "js,ts,json", "ignore": ["lib/**/*.spec.ts"], "exec": "tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env", diff --git a/packages/jobs/package.json b/packages/jobs/package.json index 4858171601..fe1dfa7344 100644 --- a/packages/jobs/package.json +++ b/packages/jobs/package.json @@ -47,6 +47,7 @@ "@types/node": "^20.12.2", "nodemon": "^3.0.1", "typescript": "^5.3.3", + "type-fest": "4.14.0", "vitest": "0.33.0" } } diff --git a/packages/orchestrator/lib/client.ts b/packages/orchestrator/lib/client.ts deleted file mode 100644 index 8cbb521c93..0000000000 --- a/packages/orchestrator/lib/client.ts +++ /dev/null @@ -1,166 +0,0 @@ -import type { JsonValue, SetOptional } from 'type-fest'; -import { route as scheduleRoute } from './routes/v1/schedule.js'; -import { route as outputRoute } from './routes/v1/task/taskId/output.js'; -import type { Result, Route } from '@nangohq/utils'; -import { Ok, Err, routeFetch } from '@nangohq/utils'; -import type { Endpoint } from '@nangohq/types'; - -interface SchedulingProps { - name: string; - groupKey: string; - retry: { - count: number; - max: number; - }; - timeoutSettingsInSecs: { - createdToStarted: number; - startedToCompleted: number; - heartbeat: number; - }; - args: JsonValue & { type: 'action' | 'webhook' | 'sync' }; -} - -interface TExecuteActionArgs { - args: { - name: string; - connection: { - id: number; - provider_config_key: string; - environment_id: number; - }; - activityLogId: number; - input: JsonValue; - }; -} -interface TExecuteWebhookArgs { - args: { - name: string; - parentSyncName: string; - connection: { - id: number; - provider_config_key: string; - environment_id: number; - }; - activityLogId: number | null; - input: JsonValue; - }; -} - -interface ClientError extends Error { - name: string; - payload: JsonValue; -} - -export type TExecuteProps = SetOptional; -export type TExecuteReturn = Result; -export type TExecuteActionProps = Omit & TExecuteActionArgs; -export type TExecuteWebhookProps = Omit & TExecuteWebhookArgs; - -export class OrchestratorClient { - private baseUrl: string; - - constructor({ baseUrl }: { baseUrl: string }) { - this.baseUrl = baseUrl; - } - - private routeFetch>(route: Route) { - return routeFetch(this.baseUrl, route); - } - - private async schedule(props: SchedulingProps): Promise> { - const res = await this.routeFetch(scheduleRoute)({ - body: { - scheduling: 'immediate', - name: props.name, - groupKey: props.groupKey, - retry: props.retry, - timeoutSettingsInSecs: props.timeoutSettingsInSecs, - args: props.args - } - }); - if ('error' in res) { - return Err({ - name: res.error.code, - message: res.error.message || `Error scheduling tasks`, - payload: {} // TODO - }); - } else { - return Ok(res); - } - } - - private async execute(props: TExecuteProps): Promise { - const scheduleProps = { - retry: { count: 0, max: 0 }, - timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, - ...props - } as SchedulingProps; - const res = await this.schedule(scheduleProps); - if (res.isErr()) { - return res; - } - const taskId = res.value.taskId; - const getOutput = await this.routeFetch(outputRoute)({ params: { taskId }, query: { waitForCompletion: true } }); - if ('error' in getOutput) { - return Err({ - name: getOutput.error.code, - message: getOutput.error.message || `Error fetching task '${taskId}' output`, - payload: {} - }); - } else { - switch (getOutput.state) { - case 'CREATED': - case 'STARTED': - return Err({ - name: 'task_in_progress_error', - message: `Task ${taskId} is in progress`, - payload: getOutput.output - }); - case 'SUCCEEDED': - return Ok(getOutput.output); - case 'FAILED': - return Err({ - name: 'task_failed_error', - message: `Task ${taskId} failed`, - payload: getOutput.output - }); - case 'EXPIRED': - return Err({ - name: 'task_expired_error', - message: `Task ${taskId} expired`, - payload: getOutput.output - }); - case 'CANCELLED': - return Err({ - name: 'task_cancelled_error', - message: `Task ${taskId} cancelled`, - payload: getOutput.output - }); - } - } - } - - public async executeAction(props: TExecuteActionProps): Promise { - const { args, ...rest } = props; - const schedulingProps = { - ...rest, - args: { - ...args, - type: 'action' as const - } - }; - return this.execute(schedulingProps); - } - - public async executeWebhook(props: TExecuteWebhookProps): Promise { - const { args, ...rest } = props; - const schedulingProps = { - ...rest, - args: { - ...args, - type: 'webhook' as const - } - }; - return this.execute(schedulingProps); - } -} diff --git a/packages/orchestrator/lib/client.integration.test.ts b/packages/orchestrator/lib/clients/client.integration.test.ts similarity index 54% rename from packages/orchestrator/lib/client.integration.test.ts rename to packages/orchestrator/lib/clients/client.integration.test.ts index 7310af72df..b1b9d4138b 100644 --- a/packages/orchestrator/lib/client.integration.test.ts +++ b/packages/orchestrator/lib/clients/client.integration.test.ts @@ -1,19 +1,19 @@ import { expect, describe, it, beforeAll, afterAll } from 'vitest'; import type { Task } from '@nangohq/scheduler'; import { getTestDbClient, Scheduler } from '@nangohq/scheduler'; -import { getServer } from './server.js'; +import { getServer } from '../server.js'; import { OrchestratorClient } from './client.js'; import getPort from 'get-port'; -import { EventsHandler } from './events.js'; +import { EventsHandler } from '../events.js'; const dbClient = getTestDbClient(); const eventsHandler = new EventsHandler({ - CREATED: (task) => console.log(`Task ${task.id} created`), - STARTED: (task) => console.log(`Task ${task.id} started`), - SUCCEEDED: (task) => console.log(`Task ${task.id} succeeded`), - FAILED: (task) => console.log(`Task ${task.id} failed`), - EXPIRED: (task) => console.log(`Task ${task.id} expired`), - CANCELLED: (task) => console.log(`Task ${task.id} cancelled`) + CREATED: () => {}, + STARTED: () => {}, + SUCCEEDED: () => {}, + FAILED: () => {}, + EXPIRED: () => {}, + CANCELLED: () => {} }); const scheduler = new Scheduler({ dbClient, @@ -34,6 +34,34 @@ describe('OrchestratorClient', async () => { scheduler.stop(); await dbClient.clearDatabase(); }); + describe('heartbeat', () => { + it('should be successful', async () => { + const scheduledTask = await client.schedule({ + name: 'Task', + groupKey: rndStr(), + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + args: { + type: 'action', + name: rndStr(), + connection: { + id: 123, + provider_config_key: 'P', + environment_id: 456 + }, + activityLogId: 789, + input: { foo: 'bar' } + } + }); + const taskId = scheduledTask.unwrap().taskId; + const beforeTask = await scheduler.get({ taskId }); + const res = await client.heartbeat({ taskId }); + const after = await scheduler.get({ taskId }); + + expect(res.isOk(), `heartbeat failed: ${res.isErr() ? JSON.stringify(res.error) : ''}`).toBe(true); + expect(after.unwrap().lastHeartbeatAt.getTime()).toBeGreaterThan(beforeTask.unwrap().lastHeartbeatAt.getTime()); + }); + }); describe('executeAction', () => { it('should be successful when action task succeed', async () => { @@ -167,6 +195,98 @@ describe('OrchestratorClient', async () => { } }); }); + describe('search', () => { + it('should returns task by ids', async () => { + const groupKey = rndStr(); + const actionA = await client.schedule({ + name: 'Task', + groupKey, + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + args: { + type: 'action', + name: `action-a`, + connection: { + id: 123, + provider_config_key: 'P', + environment_id: 456 + }, + activityLogId: 789, + input: { foo: 'bar' } + } + }); + const actionB = await client.schedule({ + name: 'Task', + groupKey, + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + args: { + type: 'action', + name: `action-b`, + connection: { + id: 123, + provider_config_key: 'P', + environment_id: 456 + }, + activityLogId: 789, + input: { foo: 'bar' } + } + }); + const ids = [actionA.unwrap().taskId, actionB.unwrap().taskId]; + const res = await client.search({ ids }); + expect(res.unwrap().length).toBe(2); + expect(res.unwrap().map((task) => task.id)).toEqual(ids); + }); + }); + describe('dequeue', () => { + it('should returns nothing if no scheduled task', async () => { + const res = await client.dequeue({ groupKey: 'abc', limit: 1, waitForCompletion: false }); + expect(res.unwrap()).toEqual([]); + }); + it('should return scheduled tasks', async () => { + const groupKey = rndStr(); + const scheduledAction = await client.schedule({ + name: 'Task', + groupKey, + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + args: { + type: 'action', + name: `action-a`, + connection: { + id: 123, + provider_config_key: 'P', + environment_id: 456 + }, + activityLogId: 789, + input: { foo: 'bar' } + } + }); + const scheduledWebhook = await client.schedule({ + name: 'Task', + groupKey, + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + args: { + type: 'webhook', + name: `webhook-a`, + parentSyncName: 'parent', + connection: { + id: 123, + provider_config_key: 'P', + environment_id: 456 + }, + activityLogId: 789, + input: { foo: 'bar' } + } + }); + const res = await client.dequeue({ groupKey, limit: 2, waitForCompletion: false }); + expect(res.unwrap().length).toBe(2); + expect(res.unwrap()[0]?.isAction()).toBe(true); + expect(res.unwrap()[1]?.isWebhook()).toBe(true); + expect(res.unwrap().map((task) => task.id)).toEqual([scheduledAction.unwrap().taskId, scheduledWebhook.unwrap().taskId]); + }); + }); }); class MockProcessor { @@ -174,7 +294,7 @@ class MockProcessor { constructor({ groupKey, process }: { groupKey: string; process: (task: Task) => void }) { this.interval = setInterval(async () => { - const tasks = (await scheduler.list({ groupKey })).unwrap(); + const tasks = (await scheduler.search({ groupKey })).unwrap(); for (const task of tasks) { switch (task.state) { case 'CREATED': diff --git a/packages/orchestrator/lib/clients/client.ts b/packages/orchestrator/lib/clients/client.ts new file mode 100644 index 0000000000..038a8943a0 --- /dev/null +++ b/packages/orchestrator/lib/clients/client.ts @@ -0,0 +1,264 @@ +import { route as postScheduleRoute } from '../routes/v1/postSchedule.js'; +import { route as postDequeueRoute } from '../routes/v1/postDequeue.js'; +import { route as postSearchRoute } from '../routes/v1/postSearch.js'; +import { route as getOutputRoute } from '../routes/v1/tasks/taskId/getOutput.js'; +import { route as putTaskRoute } from '../routes/v1/tasks/putTaskId.js'; +import { route as postHeartbeatRoute } from '../routes/v1/tasks/taskId/postHeartbeat.js'; +import type { Result, Route } from '@nangohq/utils'; +import { Ok, Err, routeFetch, stringifyError } from '@nangohq/utils'; +import type { Endpoint } from '@nangohq/types'; +import type { ClientError, SchedulingProps, ExecuteActionProps, ExecuteProps, ExecuteReturn, ExecuteWebhookProps, TaskAction, TaskWebhook } from './types.js'; +import { validateTask } from './validate.js'; +import type { JsonValue } from 'type-fest'; + +export class OrchestratorClient { + private baseUrl: string; + + constructor({ baseUrl }: { baseUrl: string }) { + this.baseUrl = baseUrl; + } + + private routeFetch>(route: Route) { + return routeFetch(this.baseUrl, route); + } + + public async schedule(props: SchedulingProps): Promise> { + const res = await this.routeFetch(postScheduleRoute)({ + body: { + scheduling: 'immediate', + name: props.name, + groupKey: props.groupKey, + retry: props.retry, + timeoutSettingsInSecs: props.timeoutSettingsInSecs, + args: props.args + } + }); + if ('error' in res) { + return Err({ + name: res.error.code, + message: res.error.message || `Error scheduling tasks`, + payload: JSON.stringify(props) + }); + } else { + return Ok(res); + } + } + + private async execute(props: ExecuteProps): Promise { + const scheduleProps = { + retry: { count: 0, max: 0 }, + timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 }, + ...props + } as SchedulingProps; + const res = await this.schedule(scheduleProps); + if (res.isErr()) { + return res; + } + const taskId = res.value.taskId; + const getOutput = await this.routeFetch(getOutputRoute)({ params: { taskId }, query: { waitForCompletion: true } }); + if ('error' in getOutput) { + return Err({ + name: getOutput.error.code, + message: getOutput.error.message || `Error fetching task '${taskId}' output`, + payload: {} + }); + } else { + switch (getOutput.state) { + case 'CREATED': + case 'STARTED': + return Err({ + name: 'task_in_progress_error', + message: `Task ${taskId} is in progress`, + payload: getOutput.output + }); + case 'SUCCEEDED': + return Ok(getOutput.output); + case 'FAILED': + return Err({ + name: 'task_failed_error', + message: `Task ${taskId} failed`, + payload: getOutput.output + }); + case 'EXPIRED': + return Err({ + name: 'task_expired_error', + message: `Task ${taskId} expired`, + payload: getOutput.output + }); + case 'CANCELLED': + return Err({ + name: 'task_cancelled_error', + message: `Task ${taskId} cancelled`, + payload: getOutput.output + }); + } + } + } + + public async executeAction(props: ExecuteActionProps): Promise { + const { args, ...rest } = props; + const schedulingProps = { + ...rest, + args: { + ...args, + type: 'action' as const + } + }; + return this.execute(schedulingProps); + } + + public async executeWebhook(props: ExecuteWebhookProps): Promise { + const { args, ...rest } = props; + const schedulingProps = { + ...rest, + args: { + ...args, + type: 'webhook' as const + } + }; + return this.execute(schedulingProps); + } + + public async search({ + ids, + groupKey, + limit + }: { + ids?: string[]; + groupKey?: string; + limit?: number; + }): Promise> { + const body = { + ...(ids ? { ids } : {}), + ...(groupKey ? { groupKey } : {}), + ...(limit ? { limit } : {}) + }; + const res = await this.routeFetch(postSearchRoute)({ body }); + if ('error' in res) { + return Err({ + name: res.error.code, + message: res.error.message || `Error listing tasks`, + payload: body + }); + } else { + const tasks = res.flatMap((task) => { + const validated = validateTask(task); + if (validated.isErr()) { + return []; + } + return [validated.value]; + }); + return Ok(tasks); + } + } + + public async dequeue({ + groupKey, + limit, + waitForCompletion + }: { + groupKey: string; + limit: number; + waitForCompletion: boolean; + }): Promise> { + const res = await this.routeFetch(postDequeueRoute)({ + body: { + groupKey, + limit, + waitForCompletion + } + }); + if ('error' in res) { + return Err({ + name: res.error.code, + message: res.error.message || `Error dequeueing tasks`, + payload: { groupKey, limit } + }); + } else { + const dequeuedTasks = res.flatMap((task) => { + const validated = validateTask(task); + if (validated.isErr()) { + return []; + } + return [validated.value]; + }); + return Ok(dequeuedTasks); + } + } + + public async heartbeat({ taskId }: { taskId: string }): Promise> { + const res = await this.routeFetch(postHeartbeatRoute)({ + params: { taskId } + }); + if ('error' in res) { + return Err({ + name: res.error.code, + message: res.error.message || `Error heartbeating task '${taskId}'`, + payload: { taskId } + }); + } else { + return Ok(undefined); + } + } + + public async succeed({ taskId, output }: { taskId: string; output: JsonValue }): Promise> { + const res = await this.routeFetch(putTaskRoute)({ + params: { taskId }, + body: { output, state: 'SUCCEEDED' } + }); + if ('error' in res) { + return Err({ + name: res.error.code, + message: res.error.message || `Error succeeding task '${taskId}'`, + payload: { taskId, output } + }); + } else { + return validateTask(res).mapError((err) => ({ + name: 'succeed_failed', + message: `Failed to mark task ${taskId} as succeeded: ${stringifyError(err)}`, + payload: { taskId, output } + })); + } + } + + public async failed({ taskId, error }: { taskId: string; error: Error }): Promise> { + const output = { name: error.name, message: error.message }; + const res = await this.routeFetch(putTaskRoute)({ + params: { taskId }, + body: { output, state: 'FAILED' } + }); + if ('error' in res) { + return Err({ + name: res.error.code, + message: res.error.message || `Error failing task '${taskId}'`, + payload: { taskId, error: output } + }); + } else { + return validateTask(res).mapError((err) => ({ + name: 'failed_failed', + message: `Failed to mark task ${taskId} as failed: ${stringifyError(err)}`, + payload: { taskId, error: output } + })); + } + } + + public async cancel({ taskId, reason }: { taskId: string; reason: string }): Promise> { + const res = await this.routeFetch(putTaskRoute)({ + params: { taskId }, + body: { output: reason, state: 'CANCELLED' } + }); + if ('error' in res) { + return Err({ + name: res.error.code, + message: res.error.message || `Error cancelling task '${taskId}'`, + payload: { taskId, error: reason } + }); + } else { + return validateTask(res).mapError((err) => ({ + name: 'cacel_failed', + message: `Failed to mark task ${taskId} as cancelled: ${stringifyError(err)}`, + payload: { taskId, error: reason } + })); + } + } +} diff --git a/packages/orchestrator/lib/clients/processor.integration.test.ts b/packages/orchestrator/lib/clients/processor.integration.test.ts new file mode 100644 index 0000000000..07b00423f4 --- /dev/null +++ b/packages/orchestrator/lib/clients/processor.integration.test.ts @@ -0,0 +1,143 @@ +import { expect, describe, it, beforeAll, afterAll, vi } from 'vitest'; +import { getTestDbClient, Scheduler } from '@nangohq/scheduler'; +import { getServer } from '../server.js'; +import { OrchestratorClient } from './client.js'; +import { OrchestratorProcessor } from './processor.js'; +import getPort from 'get-port'; +import { EventsHandler } from '../events.js'; +import { Ok, Err } from '@nangohq/utils'; +import type { Result } from '@nangohq/utils'; +import type { JsonValue } from 'type-fest'; +import type { OrchestratorTask, TaskAction, TaskWebhook } from './types.js'; + +const dbClient = getTestDbClient(); +const eventsHandler = new EventsHandler({ + CREATED: () => {}, + STARTED: () => {}, + SUCCEEDED: () => {}, + FAILED: () => {}, + EXPIRED: () => {}, + CANCELLED: () => {} +}); +const scheduler = new Scheduler({ + dbClient, + on: eventsHandler.onCallbacks +}); +const port = await getPort(); +const orchestratorClient = new OrchestratorClient({ baseUrl: `http://localhost:${port}` }); + +describe('OrchestratorProcessor', async () => { + const server = getServer(scheduler, eventsHandler); + + beforeAll(async () => { + await dbClient.migrate(); + server.listen(port); + }); + + afterAll(async () => { + scheduler.stop(); + await dbClient.clearDatabase(); + }); + + it('should process tasks and mark them as successful if processing succeed', async () => { + const groupKey = rndStr(); + const mockProcess = vi.fn(async (): Promise> => Ok({ foo: 'bar' })); + const n = 10; + await processN(mockProcess, groupKey, n); + + expect(mockProcess).toHaveBeenCalledTimes(n); + const tasks = await scheduler.search({ groupKey }); + for (const task of tasks.unwrap()) { + expect(task.state).toBe('SUCCEEDED'); + } + }); + it('should process tasks and mark them as failed if processing failed', async () => { + const groupKey = rndStr(); + const mockProcess = vi.fn(async (): Promise> => Err('Failed')); + const n = 10; + await processN(mockProcess, groupKey, n); + + expect(mockProcess).toHaveBeenCalledTimes(n); + const tasks = await scheduler.search({ groupKey }); + for (const task of tasks.unwrap()) { + expect(task.state).toBe('FAILED'); + } + }); + it('should cancel terminated tasks', async () => { + const groupKey = rndStr(); + const mockAbort = vi.fn((_taskId: string) => {}); + const mockProcess = vi.fn(async (task: OrchestratorTask): Promise> => { + let aborted = false; + task.abortController.signal.onabort = () => { + aborted = true; + mockAbort(task.id); + }; + await new Promise((resolve) => setTimeout(resolve, 500)); + if (aborted) { + return Err('Aborted'); + } + return Ok({ foo: 'bar' }); + }); + + // Cancel all tasks after 100 ms + const cancellingTimeout = setTimeout(async () => { + const tasks = await scheduler.search({ groupKey }); + for (const task of tasks.unwrap()) { + await scheduler.cancel({ taskId: task.id, reason: { message: 'Cancelling task' } }); + } + }, 100); + const n = 5; + await processN(mockProcess, groupKey, n); + + expect(mockProcess).toHaveBeenCalledTimes(n); + const tasks = await scheduler.search({ groupKey, state: 'CANCELLED' }); + for (const task of tasks.unwrap()) { + expect(mockAbort).toHaveBeenCalledWith(task.id); + } + clearTimeout(cancellingTimeout); + }); +}); + +async function processN(handler: (task: TaskAction | TaskWebhook) => Promise>, groupKey: string, n: number) { + const processor = new OrchestratorProcessor({ + handler, + opts: { orchestratorClient, groupKey, maxConcurrency: 3, checkForTerminatedInterval: 100 } + }); + for (let i = 0; i < n; i++) { + await schedule({ groupKey }); + } + processor.start(); + // Wait so the processor can process all tasks + await new Promise((resolve) => setTimeout(resolve, 1000)); + return processor; +} + +async function schedule({ groupKey }: { groupKey: string }) { + await scheduler.schedule({ + scheduling: 'immediate', + taskProps: { + groupKey, + name: 'Task', + retryMax: 0, + retryCount: 0, + createdToStartedTimeoutSecs: 30, + startedToCompletedTimeoutSecs: 30, + heartbeatTimeoutSecs: 30, + payload: { + type: 'action', + activityLogId: 1234, + name: 'Task', + connection: { + id: 1234, + provider_config_key: 'P', + environment_id: 5678 + }, + input: { foo: 'bar' } + } + } + }); +} + +function rndStr() { + return Math.random().toString(36).substring(7); +} diff --git a/packages/orchestrator/lib/clients/processor.ts b/packages/orchestrator/lib/clients/processor.ts new file mode 100644 index 0000000000..466754bc8b --- /dev/null +++ b/packages/orchestrator/lib/clients/processor.ts @@ -0,0 +1,146 @@ +import type { Result } from '@nangohq/utils'; +import { Err, stringifyError, getLogger } from '@nangohq/utils'; +import type { OrchestratorClient } from './client.js'; +import type { OrchestratorTask } from './types.js'; +import type { JsonValue } from 'type-fest'; + +const logger = getLogger('orchestrator.clients.processor'); + +export class OrchestratorProcessor { + private handler: (task: OrchestratorTask) => Promise>; + private groupKey: string; + private orchestratorClient: OrchestratorClient; + private queue: Queue; + private stopped: boolean; + private abortControllers: Map; + private terminatedTimer: NodeJS.Timeout | null = null; + private checkForTerminatedInterval: number; + + constructor({ + handler, + opts + }: { + handler: (task: OrchestratorTask) => Promise>; + opts: { orchestratorClient: OrchestratorClient; groupKey: string; maxConcurrency: number; checkForTerminatedInterval?: number }; + }) { + this.stopped = true; + this.handler = handler; + this.groupKey = opts.groupKey; + this.orchestratorClient = opts.orchestratorClient; + this.queue = new Queue(opts.maxConcurrency); + this.abortControllers = new Map(); + this.checkForTerminatedInterval = opts.checkForTerminatedInterval || 1000; + } + + public start() { + this.stopped = false; + this.terminatedTimer = setInterval(async () => { + await this.checkForTerminatedTasks(); + }, this.checkForTerminatedInterval); // checking for cancelled/expired doesn't require to be very responsive so we can do it on an interval + void this.processingLoop(); + } + + public stop() { + this.stopped = true; + if (this.terminatedTimer) { + clearInterval(this.terminatedTimer); + } + } + + private async checkForTerminatedTasks() { + if (this.stopped || this.abortControllers.size <= 0) { + return; + } + const ids = Array.from(this.abortControllers.keys()); + const search = await this.orchestratorClient.search({ ids }); + if (search.isErr()) { + return Err(search.error); + } + for (const task of search.value) { + // if task is already in a terminal state, invoke the abort signal + if (['FAILED', 'EXPIRED', 'CANCELLED', 'SUCCEEDED'].includes(task.state)) { + const abortController = this.abortControllers.get(task.id); + if (abortController) { + abortController.abort(); + this.abortControllers.delete(task.id); + } + } + } + return; + } + + private async processingLoop() { + while (!this.stopped) { + if (this.queue.available() > 0) { + const tasks = await this.orchestratorClient.dequeue({ groupKey: this.groupKey, limit: this.queue.available() * 2, waitForCompletion: true }); // fetch more than available to keep the queue full + if (tasks.isErr()) { + logger.error(`failed to dequeue tasks: ${stringifyError(tasks.error)}`); + await new Promise((resolve) => setTimeout(resolve, 1000)); // wait for a bit before retrying to avoid hammering the server in case of repetitive errors + continue; + } + for (const task of tasks.value) { + await this.processTask(task); + } + } + } + return; + } + + private async processTask(task: OrchestratorTask): Promise { + this.abortControllers.set(task.id, task.abortController); + this.queue.run(async () => { + try { + const res = await this.handler(task); + if (res.isErr()) { + this.orchestratorClient.failed({ taskId: task.id, error: res.error }); + } else { + this.orchestratorClient.succeed({ taskId: task.id, output: res.value }); + } + this.abortControllers.delete(task.id); + } catch (err) { + logger.error(`process uncaught error: ${stringifyError(err)}`); + } + }); + } +} + +class Queue { + private maxConcurrency: number; + private queue: (() => void)[]; + + constructor(maxConcurrency: number) { + this.maxConcurrency = maxConcurrency; + this.queue = []; + } + + private async acquire(): Promise { + if (this.queue.length < this.maxConcurrency) { + return; + } + await new Promise((resolve) => { + this.queue.push(resolve); + }); + } + + private release(): void { + if (this.queue.length > 0) { + const next = this.queue.shift(); + if (next) { + next(); + } + } + } + + public async run(f: () => Promise): Promise { + await this.acquire(); + try { + return await f(); + } finally { + this.release(); + } + } + + public available(): number { + return this.maxConcurrency - this.queue.length; + } +} diff --git a/packages/orchestrator/lib/clients/types.ts b/packages/orchestrator/lib/clients/types.ts new file mode 100644 index 0000000000..a94051a89e --- /dev/null +++ b/packages/orchestrator/lib/clients/types.ts @@ -0,0 +1,106 @@ +import type { JsonValue, SetOptional } from 'type-fest'; +import type { PostSchedule } from '../routes/v1/postSchedule.js'; +import type { Result } from '@nangohq/utils'; +import type { TaskState } from '@nangohq/scheduler'; + +export type SchedulingProps = Omit; + +interface ActionArgs { + name: string; + connection: { + id: number; + provider_config_key: string; + environment_id: number; + }; + activityLogId: number; + input: JsonValue; +} +interface WebhookArgs { + name: string; + parentSyncName: string; + connection: { + id: number; + provider_config_key: string; + environment_id: number; + }; + activityLogId: number | null; + input: JsonValue; +} + +export type ExecuteProps = SetOptional; +export type ExecuteReturn = Result; +export type ExecuteActionProps = Omit & { args: ActionArgs }; +export type ExecuteWebhookProps = Omit & { args: WebhookArgs }; + +export type OrchestratorTask = TaskAction | TaskWebhook; + +export interface TaskAction extends ActionArgs { + id: string; + state: TaskState; + abortController: AbortController; + isWebhook(this: OrchestratorTask): this is TaskWebhook; + isAction(this: OrchestratorTask): this is TaskAction; +} +export function TaskAction(props: { + id: string; + state: TaskState; + name: string; + connection: { + id: number; + provider_config_key: string; + environment_id: number; + }; + activityLogId: number; + input: JsonValue; +}): TaskAction { + return { + id: props.id, + name: props.name, + state: props.state, + connection: props.connection, + activityLogId: props.activityLogId, + input: props.input, + abortController: new AbortController(), + isWebhook: () => false, + isAction: () => true + }; +} + +export interface TaskWebhook extends WebhookArgs { + id: string; + state: TaskState; + abortController: AbortController; + isWebhook(this: OrchestratorTask): this is TaskWebhook; + isAction(this: OrchestratorTask): this is TaskAction; +} +export function TaskWebhook(props: { + id: string; + state: TaskState; + name: string; + parentSyncName: string; + connection: { + id: number; + provider_config_key: string; + environment_id: number; + }; + activityLogId: number | null; + input: JsonValue; +}): TaskWebhook { + return { + id: props.id, + state: props.state, + name: props.name, + parentSyncName: props.parentSyncName, + connection: props.connection, + activityLogId: props.activityLogId, + input: props.input, + abortController: new AbortController(), + isWebhook: () => true, + isAction: () => false + }; +} + +export interface ClientError extends Error { + name: string; + payload: JsonValue; +} diff --git a/packages/orchestrator/lib/clients/validate.ts b/packages/orchestrator/lib/clients/validate.ts new file mode 100644 index 0000000000..c3a4e70932 --- /dev/null +++ b/packages/orchestrator/lib/clients/validate.ts @@ -0,0 +1,53 @@ +import { taskStates } from '@nangohq/scheduler'; +import type { Task } from '@nangohq/scheduler'; +import { TaskAction, TaskWebhook } from './types.js'; +import { z } from 'zod'; +import { actionArgsSchema, webhookArgsSchema } from '../routes/v1/postSchedule.js'; +import { Err, Ok } from '@nangohq/utils'; +import type { Result } from '@nangohq/utils'; + +const commonSchemaFields = { + id: z.string().uuid(), + name: z.string().min(1), + groupKey: z.string().min(1), + state: z.enum(taskStates) +}; +const actionSchema = z.object({ + ...commonSchemaFields, + payload: actionArgsSchema +}); +const webhookSchema = z.object({ + ...commonSchemaFields, + payload: webhookArgsSchema +}); + +export function validateTask(task: Task): Result { + const action = actionSchema.safeParse(task); + if (action.success) { + return Ok( + TaskAction({ + state: action.data.state, + id: action.data.id, + name: action.data.name, + connection: action.data.payload.connection, + activityLogId: action.data.payload.activityLogId, + input: action.data.payload.input + }) + ); + } + const webhook = webhookSchema.safeParse(task); + if (webhook.success) { + return Ok( + TaskWebhook({ + id: webhook.data.id, + state: webhook.data.state, + name: webhook.data.name, + parentSyncName: webhook.data.payload.parentSyncName, + connection: webhook.data.payload.connection, + activityLogId: webhook.data.payload.activityLogId, + input: webhook.data.payload.input + }) + ); + } + return Err(`Cannot validate task: ${task}`); +} diff --git a/packages/orchestrator/lib/events.ts b/packages/orchestrator/lib/events.ts index 6e49576371..ca51463dc7 100644 --- a/packages/orchestrator/lib/events.ts +++ b/packages/orchestrator/lib/events.ts @@ -1,12 +1,6 @@ import type { Task } from '@nangohq/scheduler'; import EventEmitter from 'node:events'; -type TaskEvent = 'completed'; - -export function getEventId(event: TaskEvent, taskId: string) { - return `task:${event}:${taskId}`; -} - export class EventsHandler extends EventEmitter { public readonly onCallbacks: { CREATED: (task: Task) => void; @@ -29,30 +23,27 @@ export class EventsHandler extends EventEmitter { this.onCallbacks = { CREATED: (task: Task) => { on.CREATED(task); + this.emit(`task:started:${task.groupKey}`, task); }, STARTED: (task: Task) => { on.STARTED(task); }, SUCCEEDED: (task: Task) => { on.SUCCEEDED(task); - this.emitEvent('completed', task); + this.emit(`task:completed:${task.id}`, task); }, FAILED: (task: Task) => { on.FAILED(task); - this.emitEvent('completed', task); + this.emit(`task:completed:${task.id}`, task); }, EXPIRED: (task: Task) => { on.EXPIRED(task); - this.emitEvent('completed', task); + this.emit(`task:completed:${task.id}`, task); }, CANCELLED: (task: Task) => { on.CANCELLED(task); - this.emitEvent('completed', task); + this.emit(`task:completed:${task.id}`, task); } }; } - - private emitEvent(event: TaskEvent, task: Task) { - this.emit(getEventId(event, task.id), task); - } } diff --git a/packages/orchestrator/lib/index.ts b/packages/orchestrator/lib/index.ts index 37fd896c44..fe5692e2e2 100644 --- a/packages/orchestrator/lib/index.ts +++ b/packages/orchestrator/lib/index.ts @@ -1 +1,3 @@ -export * from './client.js'; +export * from './clients/types.js'; +export * from './clients/client.js'; +export * from './clients/processor.js'; diff --git a/packages/orchestrator/lib/routes/health.ts b/packages/orchestrator/lib/routes/getHealth.ts similarity index 70% rename from packages/orchestrator/lib/routes/health.ts rename to packages/orchestrator/lib/routes/getHealth.ts index 2936bb1a92..20c3c51365 100644 --- a/packages/orchestrator/lib/routes/health.ts +++ b/packages/orchestrator/lib/routes/getHealth.ts @@ -10,12 +10,9 @@ type Health = Endpoint<{ const path = '/health'; const method = 'GET'; -export const handler: RouteHandler = { +export const routeHandler: RouteHandler = { path, method, - validate: (_req, _res, next) => { - // No validation needed - next(); - }, + validate: (_req, _res, next) => next(), // No extra validation needed handler: (_req, res) => res.json({ status: 'ok' }) }; diff --git a/packages/orchestrator/lib/routes/v1/postDequeue.ts b/packages/orchestrator/lib/routes/v1/postDequeue.ts new file mode 100644 index 0000000000..8629bf81ee --- /dev/null +++ b/packages/orchestrator/lib/routes/v1/postDequeue.ts @@ -0,0 +1,86 @@ +import { z } from 'zod'; +import type { Scheduler, Task } from '@nangohq/scheduler'; +import type { ApiError, Endpoint } from '@nangohq/types'; +import type { EndpointRequest, EndpointResponse, RouteHandler, Route } from '@nangohq/utils'; +import { validateRequest } from '@nangohq/utils'; +import type EventEmitter from 'node:events'; + +const path = '/v1/dequeue'; +const method = 'POST'; + +type PostDequeue = Endpoint<{ + Method: typeof method; + Path: typeof path; + Body: { + groupKey: string; + limit: number; + waitForCompletion: boolean; + }; + Error: ApiError<'dequeue_failed'>; + Success: Task[]; +}>; + +const validate = validateRequest({ + parseBody: (data) => + z + .object({ + groupKey: z.string().min(1), + limit: z.coerce.number().positive(), + waitForCompletion: z.coerce.boolean() + }) + .parse(data) +}); + +export const route: Route = { path, method }; + +export const routeHandler = (scheduler: Scheduler, eventEmitter: EventEmitter): RouteHandler => { + return { + ...route, + validate, + handler: handler(scheduler, eventEmitter) + }; +}; + +const handler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { + return async (req: EndpointRequest, res: EndpointResponse) => { + const { groupKey, limit, waitForCompletion } = req.body; + const waitForCompletionTimeoutMs = 60_000; + const eventId = `task:started:${groupKey}`; + const cleanupAndRespond = (respond: (res: EndpointResponse) => void) => { + if (timeout) { + clearTimeout(timeout); + } + if (onTaskStarted) { + eventEmitter.removeListener(eventId, onTaskStarted); + } + if (!res.writableEnded) { + respond(res); + } + }; + const onTaskStarted = async (_t: Task) => { + const getTasks = await scheduler.dequeue({ groupKey, limit }); + if (getTasks.isErr()) { + cleanupAndRespond((res) => res.status(500).json({ error: { code: 'dequeue_failed', message: getTasks.error.message } })); + } else { + cleanupAndRespond((res) => res.status(200).json(getTasks.value)); + } + }; + const timeout = setTimeout(() => { + cleanupAndRespond((res) => res.status(200).send([])); + }, waitForCompletionTimeoutMs); + + eventEmitter.once(eventId, onTaskStarted); + + const getTasks = await scheduler.dequeue({ groupKey, limit }); + if (getTasks.isErr()) { + cleanupAndRespond((res) => res.status(500).json({ error: { code: 'dequeue_failed', message: getTasks.error.message } })); + return; + } + if (waitForCompletion && getTasks.value.length === 0) { + await new Promise((resolve) => resolve(timeout)); + } else { + cleanupAndRespond((res) => res.status(200).json(getTasks.value)); + } + return; + }; +}; diff --git a/packages/orchestrator/lib/routes/v1/schedule.ts b/packages/orchestrator/lib/routes/v1/postSchedule.ts similarity index 58% rename from packages/orchestrator/lib/routes/v1/schedule.ts rename to packages/orchestrator/lib/routes/v1/postSchedule.ts index c6019e87fb..7b2e3175b5 100644 --- a/packages/orchestrator/lib/routes/v1/schedule.ts +++ b/packages/orchestrator/lib/routes/v1/postSchedule.ts @@ -5,8 +5,12 @@ import type { ApiError, Endpoint } from '@nangohq/types'; import type { EndpointRequest, EndpointResponse, RouteHandler, Route } from '@nangohq/utils'; import { validateRequest } from '@nangohq/utils'; import { jsonSchema } from '../../utils/validation.js'; +import type { TaskType } from '../../types.js'; -type Schedule = Endpoint<{ +const path = '/v1/schedule'; +const method = 'POST'; + +export type PostSchedule = Endpoint<{ Method: typeof method; Path: typeof path; Body: { @@ -22,18 +26,50 @@ type Schedule = Endpoint<{ startedToCompleted: number; heartbeat: number; }; - args: JsonValue; + args: JsonValue & { type: TaskType }; }; Error: ApiError<'schedule_failed'>; Success: { taskId: string }; }>; -const path = '/v1/schedule'; -const method = 'POST'; +const commonSchemaFields = { + name: z.string().min(1), + connection: z.object({ + id: z.number().positive(), + provider_config_key: z.string().min(1), + environment_id: z.number().positive() + }), + input: jsonSchema +}; -const validate = validateRequest({ - parseBody: (data) => - z +export const actionArgsSchema = z.object({ + type: z.literal('action'), + activityLogId: z.number().positive(), + ...commonSchemaFields +}); +export const webhookArgsSchema = z.object({ + type: z.literal('webhook'), + parentSyncName: z.string().min(1), + activityLogId: z.number().positive().nullable(), + ...commonSchemaFields +}); + +const validate = validateRequest({ + parseBody: (data: any) => { + function argsSchema(data: any) { + if ('args' in data && 'type' in data.args) { + switch (data.args.type) { + case 'action': + return actionArgsSchema; + case 'webhook': + return webhookArgsSchema; + default: + throw new Error(`Invalid task type: '${data.args.type}'`); + } + } + throw new Error('Missing task type'); + } + return z .object({ scheduling: z.literal('immediate'), name: z.string().min(1), @@ -47,22 +83,14 @@ const validate = validateRequest({ startedToCompleted: z.number().int().positive(), heartbeat: z.number().int().positive() }), - args: z.object({ - name: z.string().min(1), - connection: z.object({ - id: z.number().positive(), - provider_config_key: z.string().min(1), - environment_id: z.number().positive() - }), - activityLogId: z.number().positive(), - input: z.optional(jsonSchema).default({}) - }) + args: argsSchema(data) }) - .parse(data) + .parse(data); + } }); -const getHandler = (scheduler: Scheduler) => { - return async (req: EndpointRequest, res: EndpointResponse) => { +const handler = (scheduler: Scheduler) => { + return async (req: EndpointRequest, res: EndpointResponse) => { const task = await scheduler.schedule({ scheduling: req.body.scheduling, taskProps: { @@ -83,12 +111,12 @@ const getHandler = (scheduler: Scheduler) => { }; }; -export const route: Route = { path, method }; +export const route: Route = { path, method }; -export const getRouteHandler = (scheduler: Scheduler): RouteHandler => { +export const routeHandler = (scheduler: Scheduler): RouteHandler => { return { ...route, validate, - handler: getHandler(scheduler) + handler: handler(scheduler) }; }; diff --git a/packages/orchestrator/lib/routes/v1/postSearch.ts b/packages/orchestrator/lib/routes/v1/postSearch.ts new file mode 100644 index 0000000000..c6374354c1 --- /dev/null +++ b/packages/orchestrator/lib/routes/v1/postSearch.ts @@ -0,0 +1,56 @@ +import { z } from 'zod'; +import type { Scheduler, Task } from '@nangohq/scheduler'; +import type { ApiError, Endpoint } from '@nangohq/types'; +import type { EndpointRequest, EndpointResponse, RouteHandler, Route } from '@nangohq/utils'; +import { validateRequest } from '@nangohq/utils'; + +const path = '/v1/search'; +const method = 'POST'; + +type PostSearch = Endpoint<{ + Method: typeof method; + Path: typeof path; + Body: { + ids?: string[] | undefined; + groupKey?: string | undefined; + limit?: number | undefined; + }; + Error: ApiError<'search_failed'>; + Success: Task[]; +}>; + +const validate = validateRequest({ + parseBody: (data) => + z + .object({ + groupKey: z.string().min(1).optional(), + limit: z.coerce.number().positive().optional(), + ids: z.array(z.string().uuid()).optional() + }) + .parse(data) +}); + +const handler = (scheduler: Scheduler) => { + return async (req: EndpointRequest, res: EndpointResponse) => { + const { ids, groupKey, limit } = req.body; + const getTasks = await scheduler.search({ + ...(ids ? { ids } : {}), + ...(groupKey ? { groupKey } : {}), + ...(limit ? { limit } : {}) + }); + if (getTasks.isErr()) { + return res.status(500).json({ error: { code: 'search_failed', message: getTasks.error.message } }); + } + return res.status(201).json(getTasks.value); + }; +}; + +export const route: Route = { path, method }; + +export const routeHandler = (scheduler: Scheduler): RouteHandler => { + return { + ...route, + validate, + handler: handler(scheduler) + }; +}; diff --git a/packages/orchestrator/lib/routes/v1/tasks/putTaskId.ts b/packages/orchestrator/lib/routes/v1/tasks/putTaskId.ts new file mode 100644 index 0000000000..bca8d96b3b --- /dev/null +++ b/packages/orchestrator/lib/routes/v1/tasks/putTaskId.ts @@ -0,0 +1,67 @@ +import { z } from 'zod'; +import type { JsonValue } from 'type-fest'; +import type { Scheduler, Task } from '@nangohq/scheduler'; +import type { ApiError, Endpoint } from '@nangohq/types'; +import type { EndpointRequest, EndpointResponse, RouteHandler, Route, Result } from '@nangohq/utils'; +import { validateRequest } from '@nangohq/utils'; +import { jsonSchema } from '../../../utils/validation.js'; + +type PutTask = Endpoint<{ + Method: typeof method; + Path: typeof path; + Params: { + taskId: string; + }; + Body: { + output: JsonValue; + state: 'SUCCEEDED' | 'FAILED' | 'CANCELLED'; + }; + Error: ApiError<'put_task_failed' | 'invalid_state'>; + Success: Task; +}>; + +const path = '/v1/tasks/:taskId'; +const method = 'PUT'; + +const validate = validateRequest({ + parseBody: (data) => z.object({ output: jsonSchema, state: z.enum(['SUCCEEDED', 'FAILED']) }).parse(data), + parseParams: (data) => z.object({ taskId: z.string().uuid() }).parse(data) +}); + +const handler = (scheduler: Scheduler) => { + return async (req: EndpointRequest, res: EndpointResponse) => { + const { taskId } = req.params; + const { state, output } = req.body; + let updated: Result; + switch (state) { + case 'SUCCEEDED': + updated = await scheduler.succeed({ taskId: taskId, output: output }); + break; + case 'FAILED': + updated = await scheduler.fail({ taskId: taskId, error: output }); + break; + case 'CANCELLED': + updated = await scheduler.cancel({ taskId: taskId, reason: output }); + break; + default: + res.status(400).json({ error: { code: 'invalid_state', message: `Invalid state ${state}` } }); + return; + } + if (updated.isErr()) { + res.status(500).json({ error: { code: 'put_task_failed', message: updated.error.message } }); + return; + } + res.status(200).json(updated.value); + return; + }; +}; + +export const route: Route = { path, method }; + +export const routeHandler = (scheduler: Scheduler): RouteHandler => { + return { + ...route, + validate, + handler: handler(scheduler) + }; +}; diff --git a/packages/orchestrator/lib/routes/v1/task/taskId/output.ts b/packages/orchestrator/lib/routes/v1/tasks/taskId/getOutput.ts similarity index 79% rename from packages/orchestrator/lib/routes/v1/task/taskId/output.ts rename to packages/orchestrator/lib/routes/v1/tasks/taskId/getOutput.ts index d5ac093c7d..5f4494f41d 100644 --- a/packages/orchestrator/lib/routes/v1/task/taskId/output.ts +++ b/packages/orchestrator/lib/routes/v1/tasks/taskId/getOutput.ts @@ -5,9 +5,8 @@ import type { ApiError, Endpoint } from '@nangohq/types'; import type { EndpointRequest, EndpointResponse, RouteHandler, Route } from '@nangohq/utils'; import { validateRequest } from '@nangohq/utils'; import type { EventEmitter } from 'node:events'; -import { getEventId } from '../../../../events.js'; -type Output = Endpoint<{ +type GetOutput = Endpoint<{ Method: typeof method; Path: typeof path; Params: { @@ -20,10 +19,10 @@ type Output = Endpoint<{ Success: { state: TaskState; output: JsonValue }; }>; -const path = '/v1/task/:taskId/output'; +const path = '/v1/tasks/:taskId/output'; const method = 'GET'; -const validate = validateRequest({ +const validate = validateRequest({ parseQuery: (data) => z .object({ @@ -37,11 +36,11 @@ const validate = validateRequest({ parseParams: (data) => z.object({ taskId: z.string().uuid() }).parse(data) }); -const getHandler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { - return async (req: EndpointRequest, res: EndpointResponse) => { +const handler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { + return async (req: EndpointRequest, res: EndpointResponse) => { const waitForCompletionTimeoutMs = 120_000; - const eventId = getEventId('completed', req.params.taskId); - const cleanupAndRespond = (respond: (res: EndpointResponse) => void) => { + const eventId = `task:completed:${req.params.taskId}`; + const cleanupAndRespond = (respond: (res: EndpointResponse) => void) => { if (timeout) { clearTimeout(timeout); } @@ -75,12 +74,12 @@ const getHandler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { }; }; -export const route: Route = { path, method }; +export const route: Route = { path, method }; -export const getRouteHandler = (scheduler: Scheduler, eventEmmiter: EventEmitter): RouteHandler => { +export const routeHandler = (scheduler: Scheduler, eventEmmiter: EventEmitter): RouteHandler => { return { ...route, validate, - handler: getHandler(scheduler, eventEmmiter) + handler: handler(scheduler, eventEmmiter) }; }; diff --git a/packages/orchestrator/lib/routes/v1/tasks/taskId/postHeartbeat.ts b/packages/orchestrator/lib/routes/v1/tasks/taskId/postHeartbeat.ts new file mode 100644 index 0000000000..ebaf8f928d --- /dev/null +++ b/packages/orchestrator/lib/routes/v1/tasks/taskId/postHeartbeat.ts @@ -0,0 +1,45 @@ +import { z } from 'zod'; +import type { Scheduler } from '@nangohq/scheduler'; +import type { ApiError, Endpoint } from '@nangohq/types'; +import type { EndpointRequest, EndpointResponse, RouteHandler, Route } from '@nangohq/utils'; +import { validateRequest } from '@nangohq/utils'; + +const path = '/v1/tasks/:taskId/heartbeat'; +const method = 'POST'; + +type PostHeartbeat = Endpoint<{ + Method: typeof method; + Path: typeof path; + Params: { + taskId: string; + }; + Error: ApiError<'post_heartbeat_failed'>; + Success: never; +}>; + +const validate = validateRequest({ + parseParams: (data) => z.object({ taskId: z.string().uuid() }).parse(data) +}); + +const handler = (scheduler: Scheduler) => { + return async (req: EndpointRequest, res: EndpointResponse) => { + const { taskId } = req.params; + const heartbeat = await scheduler.heartbeat({ taskId: taskId }); + if (heartbeat.isErr()) { + res.status(500).json({ error: { code: 'post_heartbeat_failed', message: heartbeat.error.message } }); + return; + } + res.status(204).send(); + return; + }; +}; + +export const route: Route = { path, method }; + +export const routeHandler = (scheduler: Scheduler): RouteHandler => { + return { + ...route, + validate, + handler: handler(scheduler) + }; +}; diff --git a/packages/orchestrator/lib/server.ts b/packages/orchestrator/lib/server.ts index 66adc5d3fd..b5933d2022 100644 --- a/packages/orchestrator/lib/server.ts +++ b/packages/orchestrator/lib/server.ts @@ -1,8 +1,12 @@ import express from 'express'; import type { Express, Request, Response, NextFunction } from 'express'; -import { getRouteHandler as scheduleHandler } from './routes/v1/schedule.js'; -import { handler as healthHandler } from './routes/health.js'; -import { getRouteHandler as outputHandler } from './routes/v1/task/taskId/output.js'; +import { routeHandler as postScheduleHandler } from './routes/v1/postSchedule.js'; +import { routeHandler as postSearchHandler } from './routes/v1/postSearch.js'; +import { routeHandler as postDequeueHandler } from './routes/v1/postDequeue.js'; +import { routeHandler as putTaskHandler } from './routes/v1/tasks/putTaskId.js'; +import { routeHandler as getHealthHandler } from './routes/getHealth.js'; +import { routeHandler as getOutputHandler } from './routes/v1/tasks/taskId/getOutput.js'; +import { routeHandler as postHeartbeatHandler } from './routes/v1/tasks/taskId/postHeartbeat.js'; import { getLogger, createRoute } from '@nangohq/utils'; import type { Scheduler } from '@nangohq/scheduler'; import type { ApiError } from '@nangohq/types'; @@ -33,9 +37,13 @@ export const getServer = (scheduler: Scheduler, eventEmmiter: EventEmitter): Exp //TODO: add auth middleware - createRoute(server, healthHandler); - createRoute(server, scheduleHandler(scheduler)); - createRoute(server, outputHandler(scheduler, eventEmmiter)); + createRoute(server, getHealthHandler); + createRoute(server, postScheduleHandler(scheduler)); + createRoute(server, postSearchHandler(scheduler)); + createRoute(server, putTaskHandler(scheduler)); + createRoute(server, getOutputHandler(scheduler, eventEmmiter)); + createRoute(server, postHeartbeatHandler(scheduler)); + createRoute(server, postDequeueHandler(scheduler, eventEmmiter)); server.use((err: unknown, _req: Request, res: Response, next: NextFunction) => { res.status(500).json({ error: `Internal server error: '${err}'` }); diff --git a/packages/orchestrator/lib/types.ts b/packages/orchestrator/lib/types.ts new file mode 100644 index 0000000000..69ca1e5acd --- /dev/null +++ b/packages/orchestrator/lib/types.ts @@ -0,0 +1,2 @@ +export const taskTypes = ['action', 'webhook', 'sync'] as const; +export type TaskType = (typeof taskTypes)[number]; diff --git a/packages/scheduler/lib/models/tasks.integration.test.ts b/packages/scheduler/lib/models/tasks.integration.test.ts index 73e5f35c53..036b0fd725 100644 --- a/packages/scheduler/lib/models/tasks.integration.test.ts +++ b/packages/scheduler/lib/models/tasks.integration.test.ts @@ -1,6 +1,6 @@ import { expect, describe, it, beforeEach, afterEach } from 'vitest'; import * as tasks from './tasks.js'; -import { taskStates } from './tasks.js'; +import { taskStates } from '../types.js'; import type { TaskState, Task } from '../types.js'; import { getTestDbClient } from '../db/helpers.test.js'; import type { knex } from 'knex'; @@ -128,24 +128,28 @@ describe('Task', () => { expect(expired).toHaveLength(1); expect(expired[0]?.output).toMatchObject({ reason: `heartbeatTimeoutSecs_exceeded` }); }); - it('should list of tasks', async () => { + it('should search tasks', async () => { const t1 = await createTaskWithState(db, 'STARTED'); const t2 = await createTaskWithState(db, 'CREATED'); const t3 = await createTaskWithState(db, 'CREATED'); - const l1 = (await tasks.list(db)).unwrap(); + const l1 = (await tasks.search(db)).unwrap(); expect(l1.length).toBe(3); - const l2 = (await tasks.list(db, { groupKey: t1.groupKey })).unwrap(); + const l2 = (await tasks.search(db, { groupKey: t1.groupKey })).unwrap(); expect(l2.length).toBe(1); expect(l2.map((t) => t.id)).toStrictEqual([t1.id]); - const l3 = (await tasks.list(db, { state: 'CREATED' })).unwrap(); + const l3 = (await tasks.search(db, { state: 'CREATED' })).unwrap(); expect(l3.length).toBe(2); expect(l3.map((t) => t.id)).toStrictEqual([t2.id, t3.id]); - const l4 = (await tasks.list(db, { state: 'CREATED', groupKey: 'unkown' })).unwrap(); + const l4 = (await tasks.search(db, { state: 'CREATED', groupKey: 'unkown' })).unwrap(); expect(l4.length).toBe(0); + + const l5 = (await tasks.search(db, { ids: [t1.id, t2.id] })).unwrap(); + expect(l5.length).toBe(2); + expect(l5.map((t) => t.id)).toStrictEqual([t1.id, t2.id]); }); }); diff --git a/packages/scheduler/lib/models/tasks.ts b/packages/scheduler/lib/models/tasks.ts index 9638724b1f..31dde1adee 100644 --- a/packages/scheduler/lib/models/tasks.ts +++ b/packages/scheduler/lib/models/tasks.ts @@ -2,6 +2,7 @@ import type { JsonValue } from 'type-fest'; import type knex from 'knex'; import type { Result } from '@nangohq/utils'; import { Ok, Err, stringifyError } from '@nangohq/utils'; +import { taskStates } from '../types.js'; import type { TaskState, Task, TaskTerminalState, TaskNonTerminalState } from '../types.js'; import { uuidv7 } from 'uuidv7'; @@ -9,8 +10,6 @@ export const TASKS_TABLE = 'tasks'; export type TaskProps = Omit; -export const taskStates = ['CREATED', 'STARTED', 'SUCCEEDED', 'FAILED', 'EXPIRED', 'CANCELLED'] as const; - interface TaskStateTransition { from: TaskState; to: TaskState; @@ -133,8 +132,11 @@ export async function get(db: knex.Knex, taskId: string): Promise> return Ok(DbTask.from(task)); } -export async function list(db: knex.Knex, params?: { groupKey?: string; state?: TaskState; limit?: number }): Promise> { +export async function search(db: knex.Knex, params?: { ids?: string[]; groupKey?: string; state?: TaskState; limit?: number }): Promise> { const query = db.from(TASKS_TABLE); + if (params?.ids) { + query.whereIn('id', params.ids); + } if (params?.groupKey) { query.where('group_key', params.groupKey); } @@ -142,7 +144,7 @@ export async function list(db: knex.Knex, params?: { groupKey?: string; state?: query.where('state', params.state); } const limit = params?.limit || 100; - const tasks = await query.limit(limit); + const tasks = await query.limit(limit).orderBy('id'); return Ok(tasks.map(DbTask.from)); } diff --git a/packages/scheduler/lib/scheduler.ts b/packages/scheduler/lib/scheduler.ts index 9111ad3fe0..99c0ead8bc 100644 --- a/packages/scheduler/lib/scheduler.ts +++ b/packages/scheduler/lib/scheduler.ts @@ -69,15 +69,16 @@ export class Scheduler { } /** - * List tasks + * Search tasks * @param params + * @param params.ids - Task IDs * @param params.groupKey - Group key * @param params.state - Task state * @example - * const tasks = await scheduler.list({ groupKey: 'test', state: 'CREATED' }); + * const tasks = await scheduler.search({ groupKey: 'test', state: 'CREATED' }); */ - public async list(params?: { groupKey?: string; state?: TaskState }): Promise> { - return tasks.list(this.dbClient.db, params); + public async search(params?: { ids?: string[]; groupKey?: string; state?: TaskState; limit?: number }): Promise> { + return tasks.search(this.dbClient.db, params); } /** @@ -203,11 +204,12 @@ export class Scheduler { /** * Cancel a task * @param cancelBy - Cancel by task ID or schedule ID + * @param reason - Reason for cancellation * @returns Task * @example * const cancelled = await scheduler.cancel({ taskId: '00000000-0000-0000-0000-000000000000' }); */ - public async cancel(cancelBy: { taskId: string; reason: string } | { scheduleId: string; reason: string }): Promise> { + public async cancel(cancelBy: { taskId: string; reason: JsonValue } | { scheduleId: string; reason: JsonValue }): Promise> { if ('scheduleId' in cancelBy) { throw new Error(`Cancelling tasks for schedule '${cancelBy.scheduleId}' not implemented`); } diff --git a/packages/scheduler/lib/types.ts b/packages/scheduler/lib/types.ts index 63b5178b38..16798a3201 100644 --- a/packages/scheduler/lib/types.ts +++ b/packages/scheduler/lib/types.ts @@ -1,6 +1,7 @@ -import type { TaskProps, taskStates } from './models/tasks'; +import type { TaskProps } from './models/tasks'; import type { JsonValue } from 'type-fest'; +export const taskStates = ['CREATED', 'STARTED', 'SUCCEEDED', 'FAILED', 'EXPIRED', 'CANCELLED'] as const; export type TaskState = (typeof taskStates)[number]; export type TaskTerminalState = Exclude; export type TaskNonTerminalState = Exclude; diff --git a/packages/server/Dockerfile b/packages/server/Dockerfile index e085d04b4e..181bcbe753 100644 --- a/packages/server/Dockerfile +++ b/packages/server/Dockerfile @@ -17,6 +17,7 @@ COPY packages/utils/ packages/utils/ COPY packages/logs/ packages/logs/ COPY packages/node-client/ packages/node-client/ COPY packages/records/ packages/records/ +COPY packages/scheduler/ packages/scheduler/ COPY packages/orchestrator/ packages/orchestrator/ RUN npm pkg delete scripts.prepare diff --git a/packages/shared/lib/clients/orchestrator.ts b/packages/shared/lib/clients/orchestrator.ts index 267231d49c..dc01161ad3 100644 --- a/packages/shared/lib/clients/orchestrator.ts +++ b/packages/shared/lib/clients/orchestrator.ts @@ -20,7 +20,7 @@ import type { LogLevel } from '@nangohq/types'; import SyncClient from './sync.client.js'; import type { Client as TemporalClient } from '@temporalio/client'; import { LogActionEnum } from '../models/Activity.js'; -import type { TExecuteReturn, TExecuteActionProps, TExecuteWebhookProps } from '@nangohq/nango-orchestrator'; +import type { ExecuteReturn, ExecuteActionProps, ExecuteWebhookProps } from '@nangohq/nango-orchestrator'; import type { Account } from '../models/Admin.js'; import type { Environment } from '../models/Environment.js'; import type { SyncConfig } from '../models/index.js'; @@ -36,8 +36,8 @@ async function getTemporal(): Promise { } export interface OrchestratorClientInterface { - executeAction(props: TExecuteActionProps): Promise; - executeWebhook(props: TExecuteWebhookProps): Promise; + executeAction(props: ExecuteActionProps): Promise; + executeWebhook(props: ExecuteWebhookProps): Promise; } export class Orchestrator { @@ -114,7 +114,7 @@ export class Orchestrator { if (res.isErr()) { logger.error(`Error: Execution '${executionId}' failed: ${stringifyError(res.error)}`); } else { - logger.info(`Execution '${executionId}' executed successfully with result: ${res.value}`); + logger.info(`Execution '${executionId}' executed successfully with result: ${JSON.stringify(res.value)}`); } }, (error) => { @@ -350,7 +350,7 @@ export class Orchestrator { if (res.isErr()) { logger.error(`Error: Execution '${executionId}' failed: ${stringifyError(res.error)}`); } else { - logger.info(`Execution '${executionId}' executed successfully with result: ${res.value}`); + logger.info(`Execution '${executionId}' executed successfully with result: ${JSON.stringify(res.value)}`); } }, (error) => { diff --git a/packages/utils/lib/environment/parse.ts b/packages/utils/lib/environment/parse.ts index b520c5013f..59b009d5b8 100644 --- a/packages/utils/lib/environment/parse.ts +++ b/packages/utils/lib/environment/parse.ts @@ -113,7 +113,7 @@ export const ENVS = z.object({ SENTRY_DNS: z.string().url().optional(), // Temporal - TEMPORAL_NAMESPACE: z.string().optional(), + TEMPORAL_NAMESPACE: z.string().optional().default('default'), TEMPORAL_ADDRESS: z.string().optional(), TEMPORAL_WORKER_MAX_CONCURRENCY: z.coerce.number().default(500), diff --git a/packages/utils/lib/result.ts b/packages/utils/lib/result.ts index 7a6a14d100..5d4923ca09 100644 --- a/packages/utils/lib/result.ts +++ b/packages/utils/lib/result.ts @@ -8,6 +8,7 @@ export interface Left { isOk(this: Result): this is Right; unwrap(): T; map(fn: (value: T) => U): Result; + mapError(fn: (error: E | string) => U): Result; } export interface Right { @@ -16,6 +17,7 @@ export interface Right { isOk(this: Result): this is Right; unwrap(): T; map(fn: (value: T) => U): Result; + mapError(fn: (error: E | string) => U): Result; } export type Result = Left | Right; @@ -32,6 +34,9 @@ export function Ok(value: T): Result { } catch (error) { return Err(error as E); } + }, + mapError: (_fn: (error: E | string) => U): Result => { + return Ok(value); } }; } @@ -46,6 +51,13 @@ export function Err(error: E | string): Result { isOk: () => false, map: (_fn: (value: T) => U): Result => { return Err(error); + }, + mapError: (fn: (error: E | string) => U): Result => { + try { + return Err(fn(error)); + } catch (error) { + return Err(error as U); + } } }; }