Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(orchestrator): implement task processor #2221

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,6 @@ MAILGUN_API_KEY=

# Redis (optional)
NANGO_REDIS_URL=

# Orchestrator
ORCHESTRATOR_SERVICE_URL="http://localhost:3008"
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/jobs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ COPY packages/data-ingestion/ packages/data-ingestion/
COPY packages/jobs/ packages/jobs/
COPY packages/logs/ packages/logs/
COPY packages/runner/ packages/runner/
COPY packages/scheduler/ packages/scheduler/
COPY packages/orchestrator/ packages/orchestrator/
COPY package*.json ./

Expand Down
28 changes: 23 additions & 5 deletions packages/jobs/lib/app.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
import './tracer.js';
import { Temporal } from './temporal.js';
import { Processor } from './processor/processor.js';
import { server } from './server.js';
import { cronAutoIdleDemo } from './crons/autoIdleDemo.js';
import { deleteOldActivityLogs } from './crons/deleteOldActivities.js';
import { deleteSyncsData } from './crons/deleteSyncsData.js';
import { reconcileTemporalSchedules } from './crons/reconcileTemporalSchedules.js';
import { getLogger, stringifyError } from '@nangohq/utils';
import { JOBS_PORT } from './constants.js';
import { db } from '@nangohq/shared';
import { db, featureFlags } from '@nangohq/shared';
import { envs } from './env.js';

const logger = getLogger('Jobs');

try {
server.listen(JOBS_PORT);
logger.info(`🚀 service ready at http://localhost:${JOBS_PORT}`);
const temporalNs = process.env['TEMPORAL_NAMESPACE'] || 'default';
const port = envs.NANGO_JOBS_PORT;
const temporalNs = envs.TEMPORAL_NAMESPACE;
const orchestratorServiceUrl = envs.ORCHESTRATOR_SERVICE_URL;
server.listen(port);
logger.info(`🚀 service ready at http://localhost:${port}`);
const temporal = new Temporal(temporalNs);
const processor = new Processor(orchestratorServiceUrl);
bodinsamuel marked this conversation as resolved.
Show resolved Hide resolved

// This promise never resolve
void temporal.start();

// Start processor
const getFlag = () => featureFlags.isEnabled('orchestrator:dryrun:process', 'global', false, false);
const processorFlagTimer = setInterval(async () => {
const isProcessorEnabled = await getFlag();
if (isProcessorEnabled && processor.isStopped()) {
processor.start();
}
if (!isProcessorEnabled && !processor.isStopped()) {
processor.stop();
}
}, 1000);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using setInterval because of the flag so it can be enabled/disabled without restarting jobs. Once stable it will be a simple processor.start()

db.enableMetrics();

// Register recurring tasks
Expand All @@ -31,6 +47,8 @@ try {
// handle SIGTERM
process.on('SIGTERM', () => {
temporal.stop();
processor.stop();
clearInterval(processorFlagTimer);
server.server.close(() => {
process.exit(0);
});
Expand Down
1 change: 0 additions & 1 deletion packages/jobs/lib/constants.ts

This file was deleted.

3 changes: 3 additions & 0 deletions packages/jobs/lib/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { ENVS, parseEnvs } from '@nangohq/utils';

export const envs = parseEnvs(ENVS.required({ ORCHESTRATOR_SERVICE_URL: true }));
bodinsamuel marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 34 additions & 0 deletions packages/jobs/lib/processor/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import type { OrchestratorTask, TaskWebhook, TaskAction } from '@nangohq/nango-orchestrator';
import type { JsonValue } from 'type-fest';
import { Err, Ok } from '@nangohq/utils';
import type { Result } from '@nangohq/utils';

export async function handler(task: OrchestratorTask): Promise<Result<JsonValue>> {
task.abortController.signal.onabort = () => {
abort(task);
};
if (task.isAction()) {
return action(task);
}
if (task.isWebhook()) {
return webhook(task);
}
return Err(`Unreachable`);
}

async function abort(_task: OrchestratorTask): Promise<Result<void>> {
// TODO: Implement abort processing
return Ok(undefined);
}

async function action(task: TaskAction): Promise<Result<JsonValue>> {
// TODO: Implement action processing
// Returning a successful result for now
return Ok({ taskId: task.id, dryrun: true });
}

async function webhook(task: TaskWebhook): Promise<Result<JsonValue>> {
// TODO: Implement action processing
// Returning an error for now
return Err(`Not implemented: ${JSON.stringify({ taskId: task.id })}`);
}
50 changes: 50 additions & 0 deletions packages/jobs/lib/processor/processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { getLogger } from '@nangohq/utils';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are looking for a name that is not a variant of process, here it looks a cluster of workers not really a processor

Copy link
Collaborator Author

@TBonnin TBonnin May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to avoid worker 😀 which also has a lot of meanings

import { ProcessorWorker } from './processor.worker.js';

const logger = getLogger('jobs.processor');

export class Processor {
private orchestratorServiceUrl: string;
private workers: ProcessorWorker[];
private stopped: boolean;

constructor(orchestratorServiceUrl: string) {
this.orchestratorServiceUrl = orchestratorServiceUrl;
this.workers = [];
this.stopped = true;
}

isStopped() {
return this.stopped;
}

start() {
logger.info('Starting task processors');
try {
const actionWorker = new ProcessorWorker({
orchestratorUrl: this.orchestratorServiceUrl,
groupKey: 'action',
maxConcurrency: 100
});
actionWorker.start();

const webhookWorker = new ProcessorWorker({
orchestratorUrl: this.orchestratorServiceUrl,
groupKey: 'webhook',
maxConcurrency: 50
});
webhookWorker.start();
this.workers = [actionWorker, webhookWorker];
this.stopped = false;
} catch (e) {
logger.error(e);
}
}

stop() {
if (this.workers) {
this.workers.forEach((worker) => worker.stop());
}
this.stopped = true;
}
}
11 changes: 11 additions & 0 deletions packages/jobs/lib/processor/processor.worker.boot.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { isMainThread, parentPort, workerData } from 'node:worker_threads';
import { getLogger } from '@nangohq/utils';
import { ProcessorChild } from './processor.worker.js';

const logger = getLogger('processor.worker.boot');

if (!isMainThread && parentPort) {
new ProcessorChild(parentPort, workerData);
} else {
logger.error('Processor should not be instantiated in the main thread');
}
94 changes: 94 additions & 0 deletions packages/jobs/lib/processor/processor.worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import * as fs from 'fs';
import type { MessagePort } from 'node:worker_threads';
import { Worker, isMainThread } from 'node:worker_threads';
import { getLogger, stringifyError } from '@nangohq/utils';
import { OrchestratorClient, OrchestratorProcessor } from '@nangohq/nango-orchestrator';
import { handler } from './handler.js';

const logger = getLogger('jobs.processor.worker');

export class ProcessorWorker {
private worker: Worker | null;
constructor({ orchestratorUrl, groupKey, maxConcurrency }: { orchestratorUrl: string; groupKey: string; maxConcurrency: number }) {
if (isMainThread) {
const url = new URL('../../dist/processor/processor.worker.boot.js', import.meta.url);
if (!fs.existsSync(url)) {
throw new Error(`Processor worker boot script not found at ${url}`);
}
this.worker = new Worker(url, { workerData: { orchestratorUrl, groupKey, maxConcurrency } });
this.worker.on('error', (err) => {
logger.error(`ProcessorWorker exited with error: ${stringifyError(err)}`);
});
this.worker.on('exit', (code) => {
if (code !== 0) {
logger.error(`ProcessorWorker exited with exit code: ${code}`);
}
});
} else {
throw new Error('ProcessorWorker should be instantiated in the main thread');
}
}

start(): void {
this.worker?.postMessage('start');
}

stop(): void {
if (this.worker) {
this.worker.postMessage('stop');
this.worker = null;
}
}
}

export class ProcessorChild {
private parent: MessagePort;
private processor: OrchestratorProcessor;
private opts: {
orchestratorUrl: string;
groupKey: string;
maxConcurrency: number;
};

constructor(parent: MessagePort, workerData: { orchestratorUrl: string; groupKey: string; maxConcurrency: number }) {
if (isMainThread) {
throw new Error('Processor should not be instantiated in the main thread');
}
if (!workerData.orchestratorUrl || !workerData.groupKey || workerData.maxConcurrency <= 0) {
throw new Error(
`Missing required options for processor worker. Expecting orchestratorUrl, groupKey, maxConcurrency > 0, got: ${JSON.stringify(workerData)}`
);
}
this.opts = workerData;
this.parent = parent;
this.parent.on('message', async (msg: 'start' | 'stop') => {
switch (msg) {
case 'start':
await this.start();
break;
case 'stop':
this.stop();
break;
}
});
const client = new OrchestratorClient({ baseUrl: this.opts.orchestratorUrl });
this.processor = new OrchestratorProcessor({
handler,
opts: {
orchestratorClient: client,
groupKey: this.opts.groupKey,
maxConcurrency: this.opts.maxConcurrency
}
});
}

async start(): Promise<void> {
logger.info(`Starting Processor: ${JSON.stringify(this.opts)}`);
this.processor.start();
}

stop(): void {
logger.info(`Stopping Processor: ${JSON.stringify(this.opts)}`);
this.processor.stop();
}
}
3 changes: 2 additions & 1 deletion packages/jobs/lib/temporal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import { createRequire } from 'module';
import * as activities from './activities.js';
import { SYNC_TASK_QUEUE, WEBHOOK_TASK_QUEUE } from '@nangohq/shared';
import { isProd, isEnterprise, getLogger } from '@nangohq/utils';
import { envs } from './env.js';

const logger = getLogger('Jobs.Temporal');

const TEMPORAL_WORKER_MAX_CONCURRENCY = parseInt(process.env['TEMPORAL_WORKER_MAX_CONCURRENCY'] || '0') || 500;
const TEMPORAL_WORKER_MAX_CONCURRENCY = envs.TEMPORAL_WORKER_MAX_CONCURRENCY;

export class Temporal {
namespace: string;
Expand Down
2 changes: 1 addition & 1 deletion packages/jobs/nodemon.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../../.env"],
"watch": ["lib", "../shared/dist", "../utils/dist", "../records/dist", "../data-ingestion/dist", "../logs/dist", "../orchestrator/dist", "../../.env"],
"ext": "js,ts,json",
"ignore": ["lib/**/*.spec.ts"],
"exec": "tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env",
Expand Down
1 change: 1 addition & 0 deletions packages/jobs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"@types/node": "^20.12.2",
"nodemon": "^3.0.1",
"typescript": "^5.3.3",
"type-fest": "4.14.0",
"vitest": "0.33.0"
}
}
Loading
Loading