From 0a8e96da8de85b31f8dae56396d0c42c036b7ad0 Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Thu, 30 May 2024 11:56:34 -0400 Subject: [PATCH] address PR review comments --- .env.example | 3 ++ .../lib/processor/{process.ts => handler.ts} | 2 +- .../lib/processor/processor.worker.boot.ts | 2 +- .../jobs/lib/processor/processor.worker.ts | 6 +-- packages/jobs/nodemon.json | 2 +- .../lib/clients/client.integration.test.ts | 2 +- packages/orchestrator/lib/clients/client.ts | 20 ++++---- .../lib/clients/processor.integration.test.ts | 9 ++-- .../orchestrator/lib/clients/processor.ts | 46 ++++++++++--------- packages/orchestrator/lib/clients/types.ts | 2 +- packages/orchestrator/lib/clients/validate.ts | 2 +- .../lib/routes/{health.ts => getHealth.ts} | 2 +- .../routes/v1/{dequeue.ts => postDequeue.ts} | 36 +++++++-------- .../v1/{schedule.ts => postSchedule.ts} | 16 +++---- .../routes/v1/{search.ts => postSearch.ts} | 10 ++-- .../v1/tasks/{taskId.ts => putTaskId.ts} | 10 ++-- .../tasks/taskId/{output.ts => getOutput.ts} | 10 ++-- .../taskId/{heartbeat.ts => postHeartbeat.ts} | 10 ++-- packages/orchestrator/lib/server.ts | 14 +++--- 19 files changed, 104 insertions(+), 100 deletions(-) rename packages/jobs/lib/processor/{process.ts => handler.ts} (94%) rename packages/orchestrator/lib/routes/{health.ts => getHealth.ts} (88%) rename packages/orchestrator/lib/routes/v1/{dequeue.ts => postDequeue.ts} (70%) rename packages/orchestrator/lib/routes/v1/{schedule.ts => postSchedule.ts} (93%) rename packages/orchestrator/lib/routes/v1/{search.ts => postSearch.ts} (85%) rename packages/orchestrator/lib/routes/v1/tasks/{taskId.ts => putTaskId.ts} (89%) rename packages/orchestrator/lib/routes/v1/tasks/taskId/{output.ts => getOutput.ts} (89%) rename packages/orchestrator/lib/routes/v1/tasks/taskId/{heartbeat.ts => postHeartbeat.ts} (81%) diff --git a/.env.example b/.env.example index 3034cb4eb2..b31d88bd77 100644 --- a/.env.example +++ b/.env.example @@ -98,3 +98,6 @@ MAILGUN_API_KEY= # Redis (optional) NANGO_REDIS_URL= + +# Orchestrator +ORCHESTRATOR_SERVICE_URL="http://localhost:3008" diff --git a/packages/jobs/lib/processor/process.ts b/packages/jobs/lib/processor/handler.ts similarity index 94% rename from packages/jobs/lib/processor/process.ts rename to packages/jobs/lib/processor/handler.ts index ec4a2511ab..a7e8462c8e 100644 --- a/packages/jobs/lib/processor/process.ts +++ b/packages/jobs/lib/processor/handler.ts @@ -3,7 +3,7 @@ import type { JsonValue } from 'type-fest'; import { Err, Ok } from '@nangohq/utils'; import type { Result } from '@nangohq/utils'; -export async function process(task: OrchestratorTask): Promise> { +export async function handler(task: OrchestratorTask): Promise> { task.abortController.signal.onabort = () => { abort(task); }; diff --git a/packages/jobs/lib/processor/processor.worker.boot.ts b/packages/jobs/lib/processor/processor.worker.boot.ts index c8206997b1..ae18b7ba2c 100644 --- a/packages/jobs/lib/processor/processor.worker.boot.ts +++ b/packages/jobs/lib/processor/processor.worker.boot.ts @@ -2,7 +2,7 @@ import { isMainThread, parentPort, workerData } from 'node:worker_threads'; import { getLogger } from '@nangohq/utils'; import { ProcessorChild } from './processor.worker.js'; -const logger = getLogger('Scheduler.monitor'); +const logger = getLogger('processor.worker.boot'); if (!isMainThread && parentPort) { new ProcessorChild(parentPort, workerData); diff --git a/packages/jobs/lib/processor/processor.worker.ts b/packages/jobs/lib/processor/processor.worker.ts index 98590bb745..71b7a85bd2 100644 --- a/packages/jobs/lib/processor/processor.worker.ts +++ b/packages/jobs/lib/processor/processor.worker.ts @@ -3,7 +3,7 @@ import type { MessagePort } from 'node:worker_threads'; import { Worker, isMainThread } from 'node:worker_threads'; import { getLogger, stringifyError } from '@nangohq/utils'; import { OrchestratorClient, OrchestratorProcessor } from '@nangohq/nango-orchestrator'; -import { process } from './process.js'; +import { handler } from './handler.js'; const logger = getLogger('jobs.processor.worker'); @@ -73,7 +73,7 @@ export class ProcessorChild { }); const client = new OrchestratorClient({ baseUrl: this.opts.orchestratorUrl }); this.processor = new OrchestratorProcessor({ - process, + handler, opts: { orchestratorClient: client, groupKey: this.opts.groupKey, @@ -84,7 +84,7 @@ export class ProcessorChild { async start(): Promise { logger.info(`Starting Processor: ${JSON.stringify(this.opts)}`); - await this.processor.start(); + this.processor.start(); } stop(): void { diff --git a/packages/jobs/nodemon.json b/packages/jobs/nodemon.json index 146f312628..55169b7959 100644 --- a/packages/jobs/nodemon.json +++ b/packages/jobs/nodemon.json @@ -1,5 +1,5 @@ { - "watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../../.env"], + "watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../orchestrator/dist", "../../.env"], "ext": "js,ts,json", "ignore": ["lib/**/*.spec.ts"], "exec": "tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env", diff --git a/packages/orchestrator/lib/clients/client.integration.test.ts b/packages/orchestrator/lib/clients/client.integration.test.ts index 0400a80a3d..b1b9d4138b 100644 --- a/packages/orchestrator/lib/clients/client.integration.test.ts +++ b/packages/orchestrator/lib/clients/client.integration.test.ts @@ -280,7 +280,7 @@ describe('OrchestratorClient', async () => { input: { foo: 'bar' } } }); - const res = await client.dequeue({ groupKey, limit: 2 }); + const res = await client.dequeue({ groupKey, limit: 2, waitForCompletion: false }); expect(res.unwrap().length).toBe(2); expect(res.unwrap()[0]?.isAction()).toBe(true); expect(res.unwrap()[1]?.isWebhook()).toBe(true); diff --git a/packages/orchestrator/lib/clients/client.ts b/packages/orchestrator/lib/clients/client.ts index 250e101a26..038a8943a0 100644 --- a/packages/orchestrator/lib/clients/client.ts +++ b/packages/orchestrator/lib/clients/client.ts @@ -1,9 +1,9 @@ -import { postRoute as postScheduleRoute } from '../routes/v1/schedule.js'; -import { getRoute as getDequeueRoute } from '../routes/v1/dequeue.js'; -import { postRoute as postSearchRoute } from '../routes/v1/search.js'; -import { getRoute as getOutputRoute } from '../routes/v1/tasks/taskId/output.js'; -import { putRoute as putTaskRoute } from '../routes/v1/tasks/taskId.js'; -import { postRoute as postHeartbeatRoute } from '../routes/v1/tasks/taskId/heartbeat.js'; +import { route as postScheduleRoute } from '../routes/v1/postSchedule.js'; +import { route as postDequeueRoute } from '../routes/v1/postDequeue.js'; +import { route as postSearchRoute } from '../routes/v1/postSearch.js'; +import { route as getOutputRoute } from '../routes/v1/tasks/taskId/getOutput.js'; +import { route as putTaskRoute } from '../routes/v1/tasks/putTaskId.js'; +import { route as postHeartbeatRoute } from '../routes/v1/tasks/taskId/postHeartbeat.js'; import type { Result, Route } from '@nangohq/utils'; import { Ok, Err, routeFetch, stringifyError } from '@nangohq/utils'; import type { Endpoint } from '@nangohq/types'; @@ -155,14 +155,14 @@ export class OrchestratorClient { public async dequeue({ groupKey, limit, - waitForCompletion = true + waitForCompletion }: { groupKey: string; limit: number; - waitForCompletion?: boolean; + waitForCompletion: boolean; }): Promise> { - const res = await this.routeFetch(getDequeueRoute)({ - query: { + const res = await this.routeFetch(postDequeueRoute)({ + body: { groupKey, limit, waitForCompletion diff --git a/packages/orchestrator/lib/clients/processor.integration.test.ts b/packages/orchestrator/lib/clients/processor.integration.test.ts index 6c7ae5ce71..07b00423f4 100644 --- a/packages/orchestrator/lib/clients/processor.integration.test.ts +++ b/packages/orchestrator/lib/clients/processor.integration.test.ts @@ -98,12 +98,15 @@ describe('OrchestratorProcessor', async () => { }); }); -async function processN(process: (task: TaskAction | TaskWebhook) => Promise>, groupKey: string, n: number) { - const processor = new OrchestratorProcessor({ process, opts: { orchestratorClient, groupKey, maxConcurrency: 3, checkForTerminatedInterval: 100 } }); +async function processN(handler: (task: TaskAction | TaskWebhook) => Promise>, groupKey: string, n: number) { + const processor = new OrchestratorProcessor({ + handler, + opts: { orchestratorClient, groupKey, maxConcurrency: 3, checkForTerminatedInterval: 100 } + }); for (let i = 0; i < n; i++) { await schedule({ groupKey }); } - await processor.start(); + processor.start(); // Wait so the processor can process all tasks await new Promise((resolve) => setTimeout(resolve, 1000)); return processor; diff --git a/packages/orchestrator/lib/clients/processor.ts b/packages/orchestrator/lib/clients/processor.ts index c64374780d..16be8fb3ad 100644 --- a/packages/orchestrator/lib/clients/processor.ts +++ b/packages/orchestrator/lib/clients/processor.ts @@ -7,7 +7,7 @@ import type { JsonValue } from 'type-fest'; const logger = getLogger('orchestrator.clients.processor'); export class OrchestratorProcessor { - private process: (task: OrchestratorTask) => Promise>; + private handler: (task: OrchestratorTask) => Promise>; private groupKey: string; private orchestratorClient: OrchestratorClient; private queue: Queue; @@ -17,14 +17,14 @@ export class OrchestratorProcessor { private checkForTerminatedInterval: number; constructor({ - process, + handler, opts }: { - process: (task: OrchestratorTask) => Promise>; + handler: (task: OrchestratorTask) => Promise>; opts: { orchestratorClient: OrchestratorClient; groupKey: string; maxConcurrency: number; checkForTerminatedInterval?: number }; }) { this.stopped = true; - this.process = process; + this.handler = handler; this.groupKey = opts.groupKey; this.orchestratorClient = opts.orchestratorClient; this.queue = new Queue(opts.maxConcurrency); @@ -32,7 +32,7 @@ export class OrchestratorProcessor { this.checkForTerminatedInterval = opts.checkForTerminatedInterval || 1000; } - public async start() { + public start() { this.stopped = false; this.terminatedTimer = setInterval(async () => { await this.checkForTerminatedTasks(); @@ -48,20 +48,21 @@ export class OrchestratorProcessor { } private async checkForTerminatedTasks() { - if (!this.stopped && this.abortControllers.size > 0) { - const ids = Array.from(this.abortControllers.keys()); - const search = await this.orchestratorClient.search({ ids }); - if (search.isErr()) { - return Err(search.error); - } - for (const task of search.value) { - // if task is already in a terminal state, invoke the abort signal - if (['FAILED', 'EXPIRED', 'CANCELLED', 'SUCCEEDED'].includes(task.state)) { - const abortController = this.abortControllers.get(task.id); - if (abortController) { - abortController.abort(); - this.abortControllers.delete(task.id); - } + if (this.stopped || this.abortControllers.size <= 0) { + return; + } + const ids = Array.from(this.abortControllers.keys()); + const search = await this.orchestratorClient.search({ ids }); + if (search.isErr()) { + return Err(search.error); + } + for (const task of search.value) { + // if task is already in a terminal state, invoke the abort signal + if (['FAILED', 'EXPIRED', 'CANCELLED', 'SUCCEEDED'].includes(task.state)) { + const abortController = this.abortControllers.get(task.id); + if (abortController) { + abortController.abort(); + this.abortControllers.delete(task.id); } } } @@ -71,9 +72,10 @@ export class OrchestratorProcessor { private async processingLoop() { while (!this.stopped) { if (this.queue.available() > 0) { - const tasks = await this.orchestratorClient.dequeue({ groupKey: this.groupKey, limit: this.queue.available() * 2 }); // fetch more than available to keep the queue full + const tasks = await this.orchestratorClient.dequeue({ groupKey: this.groupKey, limit: this.queue.available() * 2, waitForCompletion: true }); // fetch more than available to keep the queue full if (tasks.isErr()) { - return Err(tasks.error); + logger.error(`failed to dequeue tasks: ${stringifyError(tasks.error)}`); + continue; } for (const task of tasks.value) { await this.processTask(task); @@ -87,7 +89,7 @@ export class OrchestratorProcessor { this.abortControllers.set(task.id, task.abortController); this.queue.run(async () => { try { - const res = await this.process(task); + const res = await this.handler(task); if (res.isErr()) { this.orchestratorClient.failed({ taskId: task.id, error: res.error }); } else { diff --git a/packages/orchestrator/lib/clients/types.ts b/packages/orchestrator/lib/clients/types.ts index 0da298158a..a94051a89e 100644 --- a/packages/orchestrator/lib/clients/types.ts +++ b/packages/orchestrator/lib/clients/types.ts @@ -1,5 +1,5 @@ import type { JsonValue, SetOptional } from 'type-fest'; -import type { PostSchedule } from '../routes/v1/schedule.js'; +import type { PostSchedule } from '../routes/v1/postSchedule.js'; import type { Result } from '@nangohq/utils'; import type { TaskState } from '@nangohq/scheduler'; diff --git a/packages/orchestrator/lib/clients/validate.ts b/packages/orchestrator/lib/clients/validate.ts index cc7cb5aca8..c3a4e70932 100644 --- a/packages/orchestrator/lib/clients/validate.ts +++ b/packages/orchestrator/lib/clients/validate.ts @@ -2,7 +2,7 @@ import { taskStates } from '@nangohq/scheduler'; import type { Task } from '@nangohq/scheduler'; import { TaskAction, TaskWebhook } from './types.js'; import { z } from 'zod'; -import { actionArgsSchema, webhookArgsSchema } from '../routes/v1/schedule.js'; +import { actionArgsSchema, webhookArgsSchema } from '../routes/v1/postSchedule.js'; import { Err, Ok } from '@nangohq/utils'; import type { Result } from '@nangohq/utils'; diff --git a/packages/orchestrator/lib/routes/health.ts b/packages/orchestrator/lib/routes/getHealth.ts similarity index 88% rename from packages/orchestrator/lib/routes/health.ts rename to packages/orchestrator/lib/routes/getHealth.ts index 8c02bfedd1..20c3c51365 100644 --- a/packages/orchestrator/lib/routes/health.ts +++ b/packages/orchestrator/lib/routes/getHealth.ts @@ -10,7 +10,7 @@ type Health = Endpoint<{ const path = '/health'; const method = 'GET'; -export const getHandler: RouteHandler = { +export const routeHandler: RouteHandler = { path, method, validate: (_req, _res, next) => next(), // No extra validation needed diff --git a/packages/orchestrator/lib/routes/v1/dequeue.ts b/packages/orchestrator/lib/routes/v1/postDequeue.ts similarity index 70% rename from packages/orchestrator/lib/routes/v1/dequeue.ts rename to packages/orchestrator/lib/routes/v1/postDequeue.ts index 853692eac2..8629bf81ee 100644 --- a/packages/orchestrator/lib/routes/v1/dequeue.ts +++ b/packages/orchestrator/lib/routes/v1/postDequeue.ts @@ -6,51 +6,47 @@ import { validateRequest } from '@nangohq/utils'; import type EventEmitter from 'node:events'; const path = '/v1/dequeue'; -const method = 'GET'; +const method = 'POST'; -type GetDequeue = Endpoint<{ +type PostDequeue = Endpoint<{ Method: typeof method; Path: typeof path; - Querystring: { + Body: { groupKey: string; limit: number; - waitForCompletion?: boolean; + waitForCompletion: boolean; }; Error: ApiError<'dequeue_failed'>; Success: Task[]; }>; -const validate = validateRequest({ - parseQuery: (data) => +const validate = validateRequest({ + parseBody: (data) => z .object({ groupKey: z.string().min(1), limit: z.coerce.number().positive(), - waitForCompletion: z - .string() - .optional() - .default('false') - .transform((val) => val === 'true') + waitForCompletion: z.coerce.boolean() }) .parse(data) }); -export const getRoute: Route = { path, method }; +export const route: Route = { path, method }; -export const getRouteHandler = (scheduler: Scheduler, eventEmitter: EventEmitter): RouteHandler => { +export const routeHandler = (scheduler: Scheduler, eventEmitter: EventEmitter): RouteHandler => { return { - ...getRoute, + ...route, validate, - handler: getHandler(scheduler, eventEmitter) + handler: handler(scheduler, eventEmitter) }; }; -const getHandler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { - return async (req: EndpointRequest, res: EndpointResponse) => { - const { groupKey, limit } = req.query; +const handler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { + return async (req: EndpointRequest, res: EndpointResponse) => { + const { groupKey, limit, waitForCompletion } = req.body; const waitForCompletionTimeoutMs = 60_000; const eventId = `task:started:${groupKey}`; - const cleanupAndRespond = (respond: (res: EndpointResponse) => void) => { + const cleanupAndRespond = (respond: (res: EndpointResponse) => void) => { if (timeout) { clearTimeout(timeout); } @@ -80,7 +76,7 @@ const getHandler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { cleanupAndRespond((res) => res.status(500).json({ error: { code: 'dequeue_failed', message: getTasks.error.message } })); return; } - if (req.query.waitForCompletion && getTasks.value.length === 0) { + if (waitForCompletion && getTasks.value.length === 0) { await new Promise((resolve) => resolve(timeout)); } else { cleanupAndRespond((res) => res.status(200).json(getTasks.value)); diff --git a/packages/orchestrator/lib/routes/v1/schedule.ts b/packages/orchestrator/lib/routes/v1/postSchedule.ts similarity index 93% rename from packages/orchestrator/lib/routes/v1/schedule.ts rename to packages/orchestrator/lib/routes/v1/postSchedule.ts index a1a4628ee2..7b2e3175b5 100644 --- a/packages/orchestrator/lib/routes/v1/schedule.ts +++ b/packages/orchestrator/lib/routes/v1/postSchedule.ts @@ -7,6 +7,9 @@ import { validateRequest } from '@nangohq/utils'; import { jsonSchema } from '../../utils/validation.js'; import type { TaskType } from '../../types.js'; +const path = '/v1/schedule'; +const method = 'POST'; + export type PostSchedule = Endpoint<{ Method: typeof method; Path: typeof path; @@ -29,9 +32,6 @@ export type PostSchedule = Endpoint<{ Success: { taskId: string }; }>; -const path = '/v1/schedule'; -const method = 'POST'; - const commonSchemaFields = { name: z.string().min(1), connection: z.object({ @@ -89,7 +89,7 @@ const validate = validateRequest({ } }); -const postHandler = (scheduler: Scheduler) => { +const handler = (scheduler: Scheduler) => { return async (req: EndpointRequest, res: EndpointResponse) => { const task = await scheduler.schedule({ scheduling: req.body.scheduling, @@ -111,12 +111,12 @@ const postHandler = (scheduler: Scheduler) => { }; }; -export const postRoute: Route = { path, method }; +export const route: Route = { path, method }; -export const postRouteHandler = (scheduler: Scheduler): RouteHandler => { +export const routeHandler = (scheduler: Scheduler): RouteHandler => { return { - ...postRoute, + ...route, validate, - handler: postHandler(scheduler) + handler: handler(scheduler) }; }; diff --git a/packages/orchestrator/lib/routes/v1/search.ts b/packages/orchestrator/lib/routes/v1/postSearch.ts similarity index 85% rename from packages/orchestrator/lib/routes/v1/search.ts rename to packages/orchestrator/lib/routes/v1/postSearch.ts index af8faaa71d..c6374354c1 100644 --- a/packages/orchestrator/lib/routes/v1/search.ts +++ b/packages/orchestrator/lib/routes/v1/postSearch.ts @@ -30,7 +30,7 @@ const validate = validateRequest({ .parse(data) }); -const getHandler = (scheduler: Scheduler) => { +const handler = (scheduler: Scheduler) => { return async (req: EndpointRequest, res: EndpointResponse) => { const { ids, groupKey, limit } = req.body; const getTasks = await scheduler.search({ @@ -45,12 +45,12 @@ const getHandler = (scheduler: Scheduler) => { }; }; -export const postRoute: Route = { path, method }; +export const route: Route = { path, method }; -export const postRouteHandler = (scheduler: Scheduler): RouteHandler => { +export const routeHandler = (scheduler: Scheduler): RouteHandler => { return { - ...postRoute, + ...route, validate, - handler: getHandler(scheduler) + handler: handler(scheduler) }; }; diff --git a/packages/orchestrator/lib/routes/v1/tasks/taskId.ts b/packages/orchestrator/lib/routes/v1/tasks/putTaskId.ts similarity index 89% rename from packages/orchestrator/lib/routes/v1/tasks/taskId.ts rename to packages/orchestrator/lib/routes/v1/tasks/putTaskId.ts index df255eeb49..bca8d96b3b 100644 --- a/packages/orchestrator/lib/routes/v1/tasks/taskId.ts +++ b/packages/orchestrator/lib/routes/v1/tasks/putTaskId.ts @@ -28,7 +28,7 @@ const validate = validateRequest({ parseParams: (data) => z.object({ taskId: z.string().uuid() }).parse(data) }); -const putHandler = (scheduler: Scheduler) => { +const handler = (scheduler: Scheduler) => { return async (req: EndpointRequest, res: EndpointResponse) => { const { taskId } = req.params; const { state, output } = req.body; @@ -56,12 +56,12 @@ const putHandler = (scheduler: Scheduler) => { }; }; -export const putRoute: Route = { path, method }; +export const route: Route = { path, method }; -export const putRouteHandler = (scheduler: Scheduler): RouteHandler => { +export const routeHandler = (scheduler: Scheduler): RouteHandler => { return { - ...putRoute, + ...route, validate, - handler: putHandler(scheduler) + handler: handler(scheduler) }; }; diff --git a/packages/orchestrator/lib/routes/v1/tasks/taskId/output.ts b/packages/orchestrator/lib/routes/v1/tasks/taskId/getOutput.ts similarity index 89% rename from packages/orchestrator/lib/routes/v1/tasks/taskId/output.ts rename to packages/orchestrator/lib/routes/v1/tasks/taskId/getOutput.ts index 19840b6b75..5f4494f41d 100644 --- a/packages/orchestrator/lib/routes/v1/tasks/taskId/output.ts +++ b/packages/orchestrator/lib/routes/v1/tasks/taskId/getOutput.ts @@ -36,7 +36,7 @@ const validate = validateRequest({ parseParams: (data) => z.object({ taskId: z.string().uuid() }).parse(data) }); -const getHandler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { +const handler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { return async (req: EndpointRequest, res: EndpointResponse) => { const waitForCompletionTimeoutMs = 120_000; const eventId = `task:completed:${req.params.taskId}`; @@ -74,12 +74,12 @@ const getHandler = (scheduler: Scheduler, eventEmitter: EventEmitter) => { }; }; -export const getRoute: Route = { path, method }; +export const route: Route = { path, method }; -export const getRouteHandler = (scheduler: Scheduler, eventEmmiter: EventEmitter): RouteHandler => { +export const routeHandler = (scheduler: Scheduler, eventEmmiter: EventEmitter): RouteHandler => { return { - ...getRoute, + ...route, validate, - handler: getHandler(scheduler, eventEmmiter) + handler: handler(scheduler, eventEmmiter) }; }; diff --git a/packages/orchestrator/lib/routes/v1/tasks/taskId/heartbeat.ts b/packages/orchestrator/lib/routes/v1/tasks/taskId/postHeartbeat.ts similarity index 81% rename from packages/orchestrator/lib/routes/v1/tasks/taskId/heartbeat.ts rename to packages/orchestrator/lib/routes/v1/tasks/taskId/postHeartbeat.ts index 02acf385d6..ebaf8f928d 100644 --- a/packages/orchestrator/lib/routes/v1/tasks/taskId/heartbeat.ts +++ b/packages/orchestrator/lib/routes/v1/tasks/taskId/postHeartbeat.ts @@ -21,7 +21,7 @@ const validate = validateRequest({ parseParams: (data) => z.object({ taskId: z.string().uuid() }).parse(data) }); -const putHandler = (scheduler: Scheduler) => { +const handler = (scheduler: Scheduler) => { return async (req: EndpointRequest, res: EndpointResponse) => { const { taskId } = req.params; const heartbeat = await scheduler.heartbeat({ taskId: taskId }); @@ -34,12 +34,12 @@ const putHandler = (scheduler: Scheduler) => { }; }; -export const postRoute: Route = { path, method }; +export const route: Route = { path, method }; -export const postRouteHandler = (scheduler: Scheduler): RouteHandler => { +export const routeHandler = (scheduler: Scheduler): RouteHandler => { return { - ...postRoute, + ...route, validate, - handler: putHandler(scheduler) + handler: handler(scheduler) }; }; diff --git a/packages/orchestrator/lib/server.ts b/packages/orchestrator/lib/server.ts index 753a9cfe38..ab6c8a24fb 100644 --- a/packages/orchestrator/lib/server.ts +++ b/packages/orchestrator/lib/server.ts @@ -1,12 +1,12 @@ import express from 'express'; import type { Express, Request, Response, NextFunction } from 'express'; -import { postRouteHandler as postScheduleHandler } from './routes/v1/schedule.js'; -import { postRouteHandler as postSearchHandler } from './routes/v1/search.js'; -import { getRouteHandler as getDequeueHandler } from './routes/v1/dequeue.js'; -import { putRouteHandler as putTaskHandler } from './routes/v1/tasks/taskId.js'; -import { getHandler as getHealthHandler } from './routes/health.js'; -import { getRouteHandler as getOutputHandler } from './routes/v1/tasks/taskId/output.js'; -import { postRouteHandler as postHeartbeatHandler } from './routes/v1/tasks/taskId/heartbeat.js'; +import { routeHandler as postScheduleHandler } from './routes/v1/postSchedule.js'; +import { routeHandler as postSearchHandler } from './routes/v1/postSearch.js'; +import { routeHandler as getDequeueHandler } from './routes/v1/postDequeue.js'; +import { routeHandler as putTaskHandler } from './routes/v1/tasks/putTaskId.js'; +import { routeHandler as getHealthHandler } from './routes/getHealth.js'; +import { routeHandler as getOutputHandler } from './routes/v1/tasks/taskId/getOutput.js'; +import { routeHandler as postHeartbeatHandler } from './routes/v1/tasks/taskId/postHeartbeat.js'; import { getLogger, createRoute } from '@nangohq/utils'; import type { Scheduler } from '@nangohq/scheduler'; import type { ApiError } from '@nangohq/types';