Skip to content

Commit

Permalink
address PR review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
TBonnin committed May 30, 2024
1 parent e57716e commit 0a8e96d
Show file tree
Hide file tree
Showing 19 changed files with 104 additions and 100 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,6 @@ MAILGUN_API_KEY=

# Redis (optional)
NANGO_REDIS_URL=

# Orchestrator
ORCHESTRATOR_SERVICE_URL="http://localhost:3008"
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { JsonValue } from 'type-fest';
import { Err, Ok } from '@nangohq/utils';
import type { Result } from '@nangohq/utils';

export async function process(task: OrchestratorTask): Promise<Result<JsonValue>> {
export async function handler(task: OrchestratorTask): Promise<Result<JsonValue>> {
task.abortController.signal.onabort = () => {
abort(task);
};
Expand Down
2 changes: 1 addition & 1 deletion packages/jobs/lib/processor/processor.worker.boot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { isMainThread, parentPort, workerData } from 'node:worker_threads';
import { getLogger } from '@nangohq/utils';
import { ProcessorChild } from './processor.worker.js';

const logger = getLogger('Scheduler.monitor');
const logger = getLogger('processor.worker.boot');

if (!isMainThread && parentPort) {
new ProcessorChild(parentPort, workerData);
Expand Down
6 changes: 3 additions & 3 deletions packages/jobs/lib/processor/processor.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { MessagePort } from 'node:worker_threads';
import { Worker, isMainThread } from 'node:worker_threads';
import { getLogger, stringifyError } from '@nangohq/utils';
import { OrchestratorClient, OrchestratorProcessor } from '@nangohq/nango-orchestrator';
import { process } from './process.js';
import { handler } from './handler.js';

const logger = getLogger('jobs.processor.worker');

Expand Down Expand Up @@ -73,7 +73,7 @@ export class ProcessorChild {
});
const client = new OrchestratorClient({ baseUrl: this.opts.orchestratorUrl });
this.processor = new OrchestratorProcessor({
process,
handler,
opts: {
orchestratorClient: client,
groupKey: this.opts.groupKey,
Expand All @@ -84,7 +84,7 @@ export class ProcessorChild {

async start(): Promise<void> {
logger.info(`Starting Processor: ${JSON.stringify(this.opts)}`);
await this.processor.start();
this.processor.start();
}

stop(): void {
Expand Down
2 changes: 1 addition & 1 deletion packages/jobs/nodemon.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../../.env"],
"watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../orchestrator/dist", "../../.env"],
"ext": "js,ts,json",
"ignore": ["lib/**/*.spec.ts"],
"exec": "tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ describe('OrchestratorClient', async () => {
input: { foo: 'bar' }
}
});
const res = await client.dequeue({ groupKey, limit: 2 });
const res = await client.dequeue({ groupKey, limit: 2, waitForCompletion: false });
expect(res.unwrap().length).toBe(2);
expect(res.unwrap()[0]?.isAction()).toBe(true);
expect(res.unwrap()[1]?.isWebhook()).toBe(true);
Expand Down
20 changes: 10 additions & 10 deletions packages/orchestrator/lib/clients/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { postRoute as postScheduleRoute } from '../routes/v1/schedule.js';
import { getRoute as getDequeueRoute } from '../routes/v1/dequeue.js';
import { postRoute as postSearchRoute } from '../routes/v1/search.js';
import { getRoute as getOutputRoute } from '../routes/v1/tasks/taskId/output.js';
import { putRoute as putTaskRoute } from '../routes/v1/tasks/taskId.js';
import { postRoute as postHeartbeatRoute } from '../routes/v1/tasks/taskId/heartbeat.js';
import { route as postScheduleRoute } from '../routes/v1/postSchedule.js';
import { route as postDequeueRoute } from '../routes/v1/postDequeue.js';
import { route as postSearchRoute } from '../routes/v1/postSearch.js';
import { route as getOutputRoute } from '../routes/v1/tasks/taskId/getOutput.js';
import { route as putTaskRoute } from '../routes/v1/tasks/putTaskId.js';
import { route as postHeartbeatRoute } from '../routes/v1/tasks/taskId/postHeartbeat.js';
import type { Result, Route } from '@nangohq/utils';
import { Ok, Err, routeFetch, stringifyError } from '@nangohq/utils';
import type { Endpoint } from '@nangohq/types';
Expand Down Expand Up @@ -155,14 +155,14 @@ export class OrchestratorClient {
public async dequeue({
groupKey,
limit,
waitForCompletion = true
waitForCompletion
}: {
groupKey: string;
limit: number;
waitForCompletion?: boolean;
waitForCompletion: boolean;
}): Promise<Result<(TaskWebhook | TaskAction)[], ClientError>> {
const res = await this.routeFetch(getDequeueRoute)({
query: {
const res = await this.routeFetch(postDequeueRoute)({
body: {
groupKey,
limit,
waitForCompletion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,15 @@ describe('OrchestratorProcessor', async () => {
});
});

async function processN(process: (task: TaskAction | TaskWebhook) => Promise<Result<JsonValue>>, groupKey: string, n: number) {
const processor = new OrchestratorProcessor({ process, opts: { orchestratorClient, groupKey, maxConcurrency: 3, checkForTerminatedInterval: 100 } });
async function processN(handler: (task: TaskAction | TaskWebhook) => Promise<Result<JsonValue>>, groupKey: string, n: number) {
const processor = new OrchestratorProcessor({
handler,
opts: { orchestratorClient, groupKey, maxConcurrency: 3, checkForTerminatedInterval: 100 }
});
for (let i = 0; i < n; i++) {
await schedule({ groupKey });
}
await processor.start();
processor.start();
// Wait so the processor can process all tasks
await new Promise((resolve) => setTimeout(resolve, 1000));
return processor;
Expand Down
46 changes: 24 additions & 22 deletions packages/orchestrator/lib/clients/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { JsonValue } from 'type-fest';
const logger = getLogger('orchestrator.clients.processor');

export class OrchestratorProcessor {
private process: (task: OrchestratorTask) => Promise<Result<JsonValue>>;
private handler: (task: OrchestratorTask) => Promise<Result<JsonValue>>;
private groupKey: string;
private orchestratorClient: OrchestratorClient;
private queue: Queue;
Expand All @@ -17,22 +17,22 @@ export class OrchestratorProcessor {
private checkForTerminatedInterval: number;

constructor({
process,
handler,
opts
}: {
process: (task: OrchestratorTask) => Promise<Result<JsonValue>>;
handler: (task: OrchestratorTask) => Promise<Result<JsonValue>>;
opts: { orchestratorClient: OrchestratorClient; groupKey: string; maxConcurrency: number; checkForTerminatedInterval?: number };
}) {
this.stopped = true;
this.process = process;
this.handler = handler;
this.groupKey = opts.groupKey;
this.orchestratorClient = opts.orchestratorClient;
this.queue = new Queue(opts.maxConcurrency);
this.abortControllers = new Map();
this.checkForTerminatedInterval = opts.checkForTerminatedInterval || 1000;
}

public async start() {
public start() {
this.stopped = false;
this.terminatedTimer = setInterval(async () => {
await this.checkForTerminatedTasks();
Expand All @@ -48,20 +48,21 @@ export class OrchestratorProcessor {
}

private async checkForTerminatedTasks() {
if (!this.stopped && this.abortControllers.size > 0) {
const ids = Array.from(this.abortControllers.keys());
const search = await this.orchestratorClient.search({ ids });
if (search.isErr()) {
return Err(search.error);
}
for (const task of search.value) {
// if task is already in a terminal state, invoke the abort signal
if (['FAILED', 'EXPIRED', 'CANCELLED', 'SUCCEEDED'].includes(task.state)) {
const abortController = this.abortControllers.get(task.id);
if (abortController) {
abortController.abort();
this.abortControllers.delete(task.id);
}
if (this.stopped || this.abortControllers.size <= 0) {
return;
}
const ids = Array.from(this.abortControllers.keys());
const search = await this.orchestratorClient.search({ ids });
if (search.isErr()) {
return Err(search.error);
}
for (const task of search.value) {
// if task is already in a terminal state, invoke the abort signal
if (['FAILED', 'EXPIRED', 'CANCELLED', 'SUCCEEDED'].includes(task.state)) {
const abortController = this.abortControllers.get(task.id);
if (abortController) {
abortController.abort();
this.abortControllers.delete(task.id);
}
}
}
Expand All @@ -71,9 +72,10 @@ export class OrchestratorProcessor {
private async processingLoop() {
while (!this.stopped) {
if (this.queue.available() > 0) {
const tasks = await this.orchestratorClient.dequeue({ groupKey: this.groupKey, limit: this.queue.available() * 2 }); // fetch more than available to keep the queue full
const tasks = await this.orchestratorClient.dequeue({ groupKey: this.groupKey, limit: this.queue.available() * 2, waitForCompletion: true }); // fetch more than available to keep the queue full
if (tasks.isErr()) {
return Err(tasks.error);
logger.error(`failed to dequeue tasks: ${stringifyError(tasks.error)}`);
continue;
}
for (const task of tasks.value) {
await this.processTask(task);
Expand All @@ -87,7 +89,7 @@ export class OrchestratorProcessor {
this.abortControllers.set(task.id, task.abortController);
this.queue.run(async () => {
try {
const res = await this.process(task);
const res = await this.handler(task);
if (res.isErr()) {
this.orchestratorClient.failed({ taskId: task.id, error: res.error });
} else {
Expand Down
2 changes: 1 addition & 1 deletion packages/orchestrator/lib/clients/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { JsonValue, SetOptional } from 'type-fest';
import type { PostSchedule } from '../routes/v1/schedule.js';
import type { PostSchedule } from '../routes/v1/postSchedule.js';
import type { Result } from '@nangohq/utils';
import type { TaskState } from '@nangohq/scheduler';

Expand Down
2 changes: 1 addition & 1 deletion packages/orchestrator/lib/clients/validate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { taskStates } from '@nangohq/scheduler';
import type { Task } from '@nangohq/scheduler';
import { TaskAction, TaskWebhook } from './types.js';
import { z } from 'zod';
import { actionArgsSchema, webhookArgsSchema } from '../routes/v1/schedule.js';
import { actionArgsSchema, webhookArgsSchema } from '../routes/v1/postSchedule.js';
import { Err, Ok } from '@nangohq/utils';
import type { Result } from '@nangohq/utils';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Health = Endpoint<{
const path = '/health';
const method = 'GET';

export const getHandler: RouteHandler<Health> = {
export const routeHandler: RouteHandler<Health> = {
path,
method,
validate: (_req, _res, next) => next(), // No extra validation needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,47 @@ import { validateRequest } from '@nangohq/utils';
import type EventEmitter from 'node:events';

const path = '/v1/dequeue';
const method = 'GET';
const method = 'POST';

type GetDequeue = Endpoint<{
type PostDequeue = Endpoint<{
Method: typeof method;
Path: typeof path;
Querystring: {
Body: {
groupKey: string;
limit: number;
waitForCompletion?: boolean;
waitForCompletion: boolean;
};
Error: ApiError<'dequeue_failed'>;
Success: Task[];
}>;

const validate = validateRequest<GetDequeue>({
parseQuery: (data) =>
const validate = validateRequest<PostDequeue>({
parseBody: (data) =>
z
.object({
groupKey: z.string().min(1),
limit: z.coerce.number().positive(),
waitForCompletion: z
.string()
.optional()
.default('false')
.transform((val) => val === 'true')
waitForCompletion: z.coerce.boolean()
})
.parse(data)
});

export const getRoute: Route<GetDequeue> = { path, method };
export const route: Route<PostDequeue> = { path, method };

export const getRouteHandler = (scheduler: Scheduler, eventEmitter: EventEmitter): RouteHandler<GetDequeue> => {
export const routeHandler = (scheduler: Scheduler, eventEmitter: EventEmitter): RouteHandler<PostDequeue> => {
return {
...getRoute,
...route,
validate,
handler: getHandler(scheduler, eventEmitter)
handler: handler(scheduler, eventEmitter)
};
};

const getHandler = (scheduler: Scheduler, eventEmitter: EventEmitter) => {
return async (req: EndpointRequest<GetDequeue>, res: EndpointResponse<GetDequeue>) => {
const { groupKey, limit } = req.query;
const handler = (scheduler: Scheduler, eventEmitter: EventEmitter) => {
return async (req: EndpointRequest<PostDequeue>, res: EndpointResponse<PostDequeue>) => {
const { groupKey, limit, waitForCompletion } = req.body;
const waitForCompletionTimeoutMs = 60_000;
const eventId = `task:started:${groupKey}`;
const cleanupAndRespond = (respond: (res: EndpointResponse<GetDequeue>) => void) => {
const cleanupAndRespond = (respond: (res: EndpointResponse<PostDequeue>) => void) => {
if (timeout) {
clearTimeout(timeout);
}
Expand Down Expand Up @@ -80,7 +76,7 @@ const getHandler = (scheduler: Scheduler, eventEmitter: EventEmitter) => {
cleanupAndRespond((res) => res.status(500).json({ error: { code: 'dequeue_failed', message: getTasks.error.message } }));
return;
}
if (req.query.waitForCompletion && getTasks.value.length === 0) {
if (waitForCompletion && getTasks.value.length === 0) {
await new Promise((resolve) => resolve(timeout));
} else {
cleanupAndRespond((res) => res.status(200).json(getTasks.value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import { validateRequest } from '@nangohq/utils';
import { jsonSchema } from '../../utils/validation.js';
import type { TaskType } from '../../types.js';

const path = '/v1/schedule';
const method = 'POST';

export type PostSchedule = Endpoint<{
Method: typeof method;
Path: typeof path;
Expand All @@ -29,9 +32,6 @@ export type PostSchedule = Endpoint<{
Success: { taskId: string };
}>;

const path = '/v1/schedule';
const method = 'POST';

const commonSchemaFields = {
name: z.string().min(1),
connection: z.object({
Expand Down Expand Up @@ -89,7 +89,7 @@ const validate = validateRequest<PostSchedule>({
}
});

const postHandler = (scheduler: Scheduler) => {
const handler = (scheduler: Scheduler) => {
return async (req: EndpointRequest<PostSchedule>, res: EndpointResponse<PostSchedule>) => {
const task = await scheduler.schedule({
scheduling: req.body.scheduling,
Expand All @@ -111,12 +111,12 @@ const postHandler = (scheduler: Scheduler) => {
};
};

export const postRoute: Route<PostSchedule> = { path, method };
export const route: Route<PostSchedule> = { path, method };

export const postRouteHandler = (scheduler: Scheduler): RouteHandler<PostSchedule> => {
export const routeHandler = (scheduler: Scheduler): RouteHandler<PostSchedule> => {
return {
...postRoute,
...route,
validate,
handler: postHandler(scheduler)
handler: handler(scheduler)
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const validate = validateRequest<PostSearch>({
.parse(data)
});

const getHandler = (scheduler: Scheduler) => {
const handler = (scheduler: Scheduler) => {
return async (req: EndpointRequest<PostSearch>, res: EndpointResponse<PostSearch>) => {
const { ids, groupKey, limit } = req.body;
const getTasks = await scheduler.search({
Expand All @@ -45,12 +45,12 @@ const getHandler = (scheduler: Scheduler) => {
};
};

export const postRoute: Route<PostSearch> = { path, method };
export const route: Route<PostSearch> = { path, method };

export const postRouteHandler = (scheduler: Scheduler): RouteHandler<PostSearch> => {
export const routeHandler = (scheduler: Scheduler): RouteHandler<PostSearch> => {
return {
...postRoute,
...route,
validate,
handler: getHandler(scheduler)
handler: handler(scheduler)
};
};
Loading

0 comments on commit 0a8e96d

Please sign in to comment.