Skip to content

Commit

Permalink
feat(orchestrator): implement task processor (#2221)
Browse files Browse the repository at this point in the history
## Describe your changes
This PR is implementing the orchestrator processor which, for now, is
being run by jobs.
The processor is behind a flag
`flag:orchestrator:dryrun:process:global`. It is still in dryrun mode
since no real processing is happening, tasks are either immediately
succeeded or failed. It is the last step before actually processing
webhook/actions

### Notes
- I have added orchestrator endpoints to support dequeueing, set task
state, heartbeat and their respective functions in the orchestrator
client
- A processor dequeues tasks in a infinite loop and process the task via
the `process` functions passed as argument.
- A processor also check if pending tasks have been completed (ie: they
might have expired or be cancelled) and send abort signal.
- In jobs, 2 processor worker_thread are being started. One for action
and one for webhook to reproduce what we have right now with webhooks
being in a separate temporal queue

Tested in staging

## Checklist before requesting a review (skip if just adding/editing
APIs & templates)
- [x] I added tests, otherwise the reason is: 
- [ ] I added observability, otherwise the reason is:
- [ ] I added analytics, otherwise the reason is:
  • Loading branch information
TBonnin authored May 30, 2024
1 parent 923a7fc commit d1ac7a3
Show file tree
Hide file tree
Showing 39 changed files with 1,462 additions and 265 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,6 @@ MAILGUN_API_KEY=

# Redis (optional)
NANGO_REDIS_URL=

# Orchestrator
ORCHESTRATOR_SERVICE_URL="http://localhost:3008"
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/jobs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ COPY packages/data-ingestion/ packages/data-ingestion/
COPY packages/jobs/ packages/jobs/
COPY packages/logs/ packages/logs/
COPY packages/runner/ packages/runner/
COPY packages/scheduler/ packages/scheduler/
COPY packages/orchestrator/ packages/orchestrator/
COPY package*.json ./

Expand Down
28 changes: 23 additions & 5 deletions packages/jobs/lib/app.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
import './tracer.js';
import { Temporal } from './temporal.js';
import { Processor } from './processor/processor.js';
import { server } from './server.js';
import { cronAutoIdleDemo } from './crons/autoIdleDemo.js';
import { deleteOldActivityLogs } from './crons/deleteOldActivities.js';
import { deleteSyncsData } from './crons/deleteSyncsData.js';
import { reconcileTemporalSchedules } from './crons/reconcileTemporalSchedules.js';
import { getLogger, stringifyError } from '@nangohq/utils';
import { JOBS_PORT } from './constants.js';
import { db } from '@nangohq/shared';
import { db, featureFlags } from '@nangohq/shared';
import { envs } from './env.js';

const logger = getLogger('Jobs');

try {
server.listen(JOBS_PORT);
logger.info(`🚀 service ready at http://localhost:${JOBS_PORT}`);
const temporalNs = process.env['TEMPORAL_NAMESPACE'] || 'default';
const port = envs.NANGO_JOBS_PORT;
const temporalNs = envs.TEMPORAL_NAMESPACE;
const orchestratorServiceUrl = envs.ORCHESTRATOR_SERVICE_URL;
server.listen(port);
logger.info(`🚀 service ready at http://localhost:${port}`);
const temporal = new Temporal(temporalNs);
const processor = new Processor(orchestratorServiceUrl);

// This promise never resolve
void temporal.start();

// Start processor
const getFlag = () => featureFlags.isEnabled('orchestrator:dryrun:process', 'global', false, false);
const processorFlagTimer = setInterval(async () => {
const isProcessorEnabled = await getFlag();
if (isProcessorEnabled && processor.isStopped()) {
processor.start();
}
if (!isProcessorEnabled && !processor.isStopped()) {
processor.stop();
}
}, 1000);

db.enableMetrics();

// Register recurring tasks
Expand All @@ -31,6 +47,8 @@ try {
// handle SIGTERM
process.on('SIGTERM', () => {
temporal.stop();
processor.stop();
clearInterval(processorFlagTimer);
server.server.close(() => {
process.exit(0);
});
Expand Down
1 change: 0 additions & 1 deletion packages/jobs/lib/constants.ts

This file was deleted.

3 changes: 3 additions & 0 deletions packages/jobs/lib/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { ENVS, parseEnvs } from '@nangohq/utils';

export const envs = parseEnvs(ENVS.required({ ORCHESTRATOR_SERVICE_URL: true }));
34 changes: 34 additions & 0 deletions packages/jobs/lib/processor/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import type { OrchestratorTask, TaskWebhook, TaskAction } from '@nangohq/nango-orchestrator';
import type { JsonValue } from 'type-fest';
import { Err, Ok } from '@nangohq/utils';
import type { Result } from '@nangohq/utils';

export async function handler(task: OrchestratorTask): Promise<Result<JsonValue>> {
task.abortController.signal.onabort = () => {
abort(task);
};
if (task.isAction()) {
return action(task);
}
if (task.isWebhook()) {
return webhook(task);
}
return Err(`Unreachable`);
}

async function abort(_task: OrchestratorTask): Promise<Result<void>> {
// TODO: Implement abort processing
return Ok(undefined);
}

async function action(task: TaskAction): Promise<Result<JsonValue>> {
// TODO: Implement action processing
// Returning a successful result for now
return Ok({ taskId: task.id, dryrun: true });
}

async function webhook(task: TaskWebhook): Promise<Result<JsonValue>> {
// TODO: Implement action processing
// Returning an error for now
return Err(`Not implemented: ${JSON.stringify({ taskId: task.id })}`);
}
50 changes: 50 additions & 0 deletions packages/jobs/lib/processor/processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { getLogger } from '@nangohq/utils';
import { ProcessorWorker } from './processor.worker.js';

const logger = getLogger('jobs.processor');

export class Processor {
private orchestratorServiceUrl: string;
private workers: ProcessorWorker[];
private stopped: boolean;

constructor(orchestratorServiceUrl: string) {
this.orchestratorServiceUrl = orchestratorServiceUrl;
this.workers = [];
this.stopped = true;
}

isStopped() {
return this.stopped;
}

start() {
logger.info('Starting task processors');
try {
const actionWorker = new ProcessorWorker({
orchestratorUrl: this.orchestratorServiceUrl,
groupKey: 'action',
maxConcurrency: 100
});
actionWorker.start();

const webhookWorker = new ProcessorWorker({
orchestratorUrl: this.orchestratorServiceUrl,
groupKey: 'webhook',
maxConcurrency: 50
});
webhookWorker.start();
this.workers = [actionWorker, webhookWorker];
this.stopped = false;
} catch (e) {
logger.error(e);
}
}

stop() {
if (this.workers) {
this.workers.forEach((worker) => worker.stop());
}
this.stopped = true;
}
}
11 changes: 11 additions & 0 deletions packages/jobs/lib/processor/processor.worker.boot.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { isMainThread, parentPort, workerData } from 'node:worker_threads';
import { getLogger } from '@nangohq/utils';
import { ProcessorChild } from './processor.worker.js';

const logger = getLogger('processor.worker.boot');

if (!isMainThread && parentPort) {
new ProcessorChild(parentPort, workerData);
} else {
logger.error('Processor should not be instantiated in the main thread');
}
94 changes: 94 additions & 0 deletions packages/jobs/lib/processor/processor.worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import * as fs from 'fs';
import type { MessagePort } from 'node:worker_threads';
import { Worker, isMainThread } from 'node:worker_threads';
import { getLogger, stringifyError } from '@nangohq/utils';
import { OrchestratorClient, OrchestratorProcessor } from '@nangohq/nango-orchestrator';
import { handler } from './handler.js';

const logger = getLogger('jobs.processor.worker');

export class ProcessorWorker {
private worker: Worker | null;
constructor({ orchestratorUrl, groupKey, maxConcurrency }: { orchestratorUrl: string; groupKey: string; maxConcurrency: number }) {
if (isMainThread) {
const url = new URL('../../dist/processor/processor.worker.boot.js', import.meta.url);
if (!fs.existsSync(url)) {
throw new Error(`Processor worker boot script not found at ${url}`);
}
this.worker = new Worker(url, { workerData: { orchestratorUrl, groupKey, maxConcurrency } });
this.worker.on('error', (err) => {
logger.error(`ProcessorWorker exited with error: ${stringifyError(err)}`);
});
this.worker.on('exit', (code) => {
if (code !== 0) {
logger.error(`ProcessorWorker exited with exit code: ${code}`);
}
});
} else {
throw new Error('ProcessorWorker should be instantiated in the main thread');
}
}

start(): void {
this.worker?.postMessage('start');
}

stop(): void {
if (this.worker) {
this.worker.postMessage('stop');
this.worker = null;
}
}
}

export class ProcessorChild {
private parent: MessagePort;
private processor: OrchestratorProcessor;
private opts: {
orchestratorUrl: string;
groupKey: string;
maxConcurrency: number;
};

constructor(parent: MessagePort, workerData: { orchestratorUrl: string; groupKey: string; maxConcurrency: number }) {
if (isMainThread) {
throw new Error('Processor should not be instantiated in the main thread');
}
if (!workerData.orchestratorUrl || !workerData.groupKey || workerData.maxConcurrency <= 0) {
throw new Error(
`Missing required options for processor worker. Expecting orchestratorUrl, groupKey, maxConcurrency > 0, got: ${JSON.stringify(workerData)}`
);
}
this.opts = workerData;
this.parent = parent;
this.parent.on('message', async (msg: 'start' | 'stop') => {
switch (msg) {
case 'start':
await this.start();
break;
case 'stop':
this.stop();
break;
}
});
const client = new OrchestratorClient({ baseUrl: this.opts.orchestratorUrl });
this.processor = new OrchestratorProcessor({
handler,
opts: {
orchestratorClient: client,
groupKey: this.opts.groupKey,
maxConcurrency: this.opts.maxConcurrency
}
});
}

async start(): Promise<void> {
logger.info(`Starting Processor: ${JSON.stringify(this.opts)}`);
this.processor.start();
}

stop(): void {
logger.info(`Stopping Processor: ${JSON.stringify(this.opts)}`);
this.processor.stop();
}
}
3 changes: 2 additions & 1 deletion packages/jobs/lib/temporal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import { createRequire } from 'module';
import * as activities from './activities.js';
import { SYNC_TASK_QUEUE, WEBHOOK_TASK_QUEUE } from '@nangohq/shared';
import { isProd, isEnterprise, getLogger } from '@nangohq/utils';
import { envs } from './env.js';

const logger = getLogger('Jobs.Temporal');

const TEMPORAL_WORKER_MAX_CONCURRENCY = parseInt(process.env['TEMPORAL_WORKER_MAX_CONCURRENCY'] || '0') || 500;
const TEMPORAL_WORKER_MAX_CONCURRENCY = envs.TEMPORAL_WORKER_MAX_CONCURRENCY;

export class Temporal {
namespace: string;
Expand Down
2 changes: 1 addition & 1 deletion packages/jobs/nodemon.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../../.env"],
"watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../orchestrator/dist", "../../.env"],
"ext": "js,ts,json",
"ignore": ["lib/**/*.spec.ts"],
"exec": "tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env",
Expand Down
1 change: 1 addition & 0 deletions packages/jobs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"@types/node": "^20.12.2",
"nodemon": "^3.0.1",
"typescript": "^5.3.3",
"type-fest": "4.14.0",
"vitest": "0.33.0"
}
}
Loading

0 comments on commit d1ac7a3

Please sign in to comment.