Skip to content

Commit

Permalink
feat(orchestrator): add support for schedules (#2260)
Browse files Browse the repository at this point in the history
Based on #2256. Review this one
first 🙏

This PR is adding model and logic to create schedules in the
orchestrator. In a following PR I will add a scheduler worker_thread
that will create tasks based on exisiting schedules when necessary

## Checklist before requesting a review (skip if just adding/editing
APIs & templates)
- [x] I added tests, otherwise the reason is: 
- [ ] I added observability, otherwise the reason is:
- [ ] I added analytics, otherwise the reason is:

---------

Co-authored-by: Samuel Bodin <[email protected]>
  • Loading branch information
TBonnin and bodinsamuel authored Jun 5, 2024
1 parent 5e75624 commit ad420f8
Show file tree
Hide file tree
Showing 16 changed files with 628 additions and 124 deletions.
37 changes: 31 additions & 6 deletions packages/orchestrator/lib/clients/client.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,34 @@ describe('OrchestratorClient', async () => {
scheduler.stop();
await dbClient.clearDatabase();
});

describe('recurring schedule', () => {
it('should be created', async () => {
const res = await client.recurring({
name: 'Task',
startsAt: new Date(),
frequencyMs: 300_000,
args: {
type: 'sync',
syncId: 'sync-a',
syncName: rndStr(),
syncJobId: 5678,
connection: {
id: 123,
connection_id: 'C',
provider_config_key: 'P',
environment_id: 456
},
debug: false
}
});
expect(res.isOk()).toBe(true);
});
});

describe('heartbeat', () => {
it('should be successful', async () => {
const scheduledTask = await client.schedule({
const scheduledTask = await client.immediate({
name: 'Task',
groupKey: rndStr(),
retry: { count: 0, max: 0 },
Expand Down Expand Up @@ -203,7 +228,7 @@ describe('OrchestratorClient', async () => {
describe('succeed', () => {
it('should support big output', async () => {
const groupKey = rndStr();
const actionA = await client.schedule({
const actionA = await client.immediate({
name: 'Task',
groupKey,
retry: { count: 0, max: 0 },
Expand All @@ -229,7 +254,7 @@ describe('OrchestratorClient', async () => {
describe('search', () => {
it('should returns task by ids', async () => {
const groupKey = rndStr();
const actionA = await client.schedule({
const actionA = await client.immediate({
name: 'Task',
groupKey,
retry: { count: 0, max: 0 },
Expand All @@ -247,7 +272,7 @@ describe('OrchestratorClient', async () => {
input: { foo: 'bar' }
}
});
const actionB = await client.schedule({
const actionB = await client.immediate({
name: 'Task',
groupKey,
retry: { count: 0, max: 0 },
Expand Down Expand Up @@ -278,7 +303,7 @@ describe('OrchestratorClient', async () => {
});
it('should return scheduled tasks', async () => {
const groupKey = rndStr();
const scheduledAction = await client.schedule({
const scheduledAction = await client.immediate({
name: 'Task',
groupKey,
retry: { count: 0, max: 0 },
Expand All @@ -296,7 +321,7 @@ describe('OrchestratorClient', async () => {
input: { foo: 'bar' }
}
});
const scheduledWebhook = await client.schedule({
const scheduledWebhook = await client.immediate({
name: 'Task',
groupKey,
retry: { count: 0, max: 0 },
Expand Down
39 changes: 30 additions & 9 deletions packages/orchestrator/lib/clients/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { route as postScheduleRoute } from '../routes/v1/postSchedule.js';
import { route as postImmediateRoute } from '../routes/v1/postImmediate.js';
import { route as postRecurringRoute } from '../routes/v1/postRecurring.js';
import { route as postDequeueRoute } from '../routes/v1/postDequeue.js';
import { route as postSearchRoute } from '../routes/v1/postSearch.js';
import { route as getOutputRoute } from '../routes/v1/tasks/taskId/getOutput.js';
Expand All @@ -9,13 +10,14 @@ import { Ok, Err, routeFetch, stringifyError, getLogger } from '@nangohq/utils';
import type { Endpoint } from '@nangohq/types';
import type {
ClientError,
SchedulingProps,
ImmediateProps,
ExecuteActionProps,
ExecuteProps,
ExecuteReturn,
ExecuteWebhookProps,
ExecutePostConnectionProps,
OrchestratorTask
OrchestratorTask,
RecurringProps
} from './types.js';
import { validateTask } from './validate.js';
import type { JsonValue } from 'type-fest';
Expand All @@ -33,10 +35,9 @@ export class OrchestratorClient {
return routeFetch(this.baseUrl, route);
}

public async schedule(props: SchedulingProps): Promise<Result<{ taskId: string }, ClientError>> {
const res = await this.routeFetch(postScheduleRoute)({
public async immediate(props: ImmediateProps): Promise<Result<{ taskId: string }, ClientError>> {
const res = await this.routeFetch(postImmediateRoute)({
body: {
scheduling: 'immediate',
name: props.name,
groupKey: props.groupKey,
retry: props.retry,
Expand All @@ -47,7 +48,27 @@ export class OrchestratorClient {
if ('error' in res) {
return Err({
name: res.error.code,
message: res.error.message || `Error scheduling tasks`,
message: res.error.message || `Error scheduling immediate task`,
payload: JSON.stringify(props)
});
} else {
return Ok(res);
}
}

public async recurring(props: RecurringProps): Promise<Result<{ scheduleId: string }, ClientError>> {
const res = await this.routeFetch(postRecurringRoute)({
body: {
name: props.name,
startsAt: props.startsAt,
frequencyMs: props.frequencyMs,
args: props.args
}
});
if ('error' in res) {
return Err({
name: res.error.code,
message: res.error.message || `Error creating recurring schedule`,
payload: JSON.stringify(props)
});
} else {
Expand All @@ -60,8 +81,8 @@ export class OrchestratorClient {
retry: { count: 0, max: 0 },
timeoutSettingsInSecs: { createdToStarted: 30, startedToCompleted: 30, heartbeat: 60 },
...props
} as SchedulingProps;
const res = await this.schedule(scheduleProps);
} as ImmediateProps;
const res = await this.immediate(scheduleProps);
if (res.isErr()) {
return res;
}
Expand Down
45 changes: 21 additions & 24 deletions packages/orchestrator/lib/clients/processor.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,36 +106,33 @@ async function processN(handler: (task: OrchestratorTask) => Promise<Result<Json
});
processor.start({ tracer });
for (let i = 0; i < n; i++) {
await scheduleTask({ groupKey });
await immediateTask({ groupKey });
}
// Wait so the processor can process all tasks
await new Promise((resolve) => setTimeout(resolve, 1000));
return processor;
}

async function scheduleTask({ groupKey }: { groupKey: string }) {
return scheduler.schedule({
scheduling: 'immediate',
taskProps: {
groupKey,
name: 'Task',
retryMax: 0,
retryCount: 0,
createdToStartedTimeoutSecs: 30,
startedToCompletedTimeoutSecs: 30,
heartbeatTimeoutSecs: 30,
payload: {
type: 'action',
activityLogId: 1234,
actionName: 'Task',
connection: {
id: 1234,
connection_id: 'C',
provider_config_key: 'P',
environment_id: 5678
},
input: { foo: 'bar' }
}
async function immediateTask({ groupKey }: { groupKey: string }) {
return scheduler.immediate({
groupKey,
name: 'Task',
retryMax: 0,
retryCount: 0,
createdToStartedTimeoutSecs: 30,
startedToCompletedTimeoutSecs: 30,
heartbeatTimeoutSecs: 30,
payload: {
type: 'action',
activityLogId: 1234,
actionName: 'Task',
connection: {
id: 1234,
connection_id: 'C',
provider_config_key: 'P',
environment_id: 5678
},
input: { foo: 'bar' }
}
});
}
Expand Down
8 changes: 5 additions & 3 deletions packages/orchestrator/lib/clients/types.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { JsonValue, SetOptional } from 'type-fest';
import type { PostSchedule } from '../routes/v1/postSchedule.js';
import type { PostImmediate } from '../routes/v1/postImmediate.js';
import type { PostRecurring } from '../routes/v1/postRecurring.js';
import type { Result } from '@nangohq/utils';
import type { TaskState } from '@nangohq/scheduler';

export type SchedulingProps = Omit<PostSchedule['Body'], 'scheduling'>;
export type ImmediateProps = PostImmediate['Body'];
export type RecurringProps = PostRecurring['Body'];

interface SyncArgs {
syncId: string;
Expand Down Expand Up @@ -52,7 +54,7 @@ interface PostConnectionArgs {
activityLogId: number;
}

export type ExecuteProps = SetOptional<SchedulingProps, 'retry' | 'timeoutSettingsInSecs'>;
export type ExecuteProps = SetOptional<ImmediateProps, 'retry' | 'timeoutSettingsInSecs'>;
export type ExecuteReturn = Result<JsonValue, ClientError>;
export type ExecuteActionProps = Omit<ExecuteProps, 'args'> & { args: ActionArgs };
export type ExecuteWebhookProps = Omit<ExecuteProps, 'args'> & { args: WebhookArgs };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import { validateRequest } from '@nangohq/utils';
import type { TaskType } from '../../types.js';
import { syncArgsSchema, actionArgsSchema, postConnectionArgsSchema, webhookArgsSchema } from '../../clients/validate.js';

const path = '/v1/schedule';
const path = '/v1/immediate';
const method = 'POST';

export type PostSchedule = Endpoint<{
export type PostImmediate = Endpoint<{
Method: typeof method;
Path: typeof path;
Body: {
scheduling: 'immediate';
name: string;
groupKey: string;
retry: {
Expand All @@ -28,11 +27,11 @@ export type PostSchedule = Endpoint<{
};
args: JsonValue & { type: TaskType };
};
Error: ApiError<'schedule_failed'>;
Error: ApiError<'immediate_failed'>;
Success: { taskId: string };
}>;

const validate = validateRequest<PostSchedule>({
const validate = validateRequest<PostImmediate>({
parseBody: (data: any) => {
function argsSchema(data: any) {
if ('args' in data && 'type' in data.args) {
Expand All @@ -53,7 +52,6 @@ const validate = validateRequest<PostSchedule>({
}
return z
.object({
scheduling: z.literal('immediate'),
name: z.string().min(1),
groupKey: z.string().min(1),
retry: z.object({
Expand All @@ -72,30 +70,27 @@ const validate = validateRequest<PostSchedule>({
});

const handler = (scheduler: Scheduler) => {
return async (req: EndpointRequest<PostSchedule>, res: EndpointResponse<PostSchedule>) => {
const task = await scheduler.schedule({
scheduling: req.body.scheduling,
taskProps: {
name: req.body.name,
payload: req.body.args,
groupKey: req.body.groupKey,
retryMax: req.body.retry.max,
retryCount: req.body.retry.count,
createdToStartedTimeoutSecs: req.body.timeoutSettingsInSecs.createdToStarted,
startedToCompletedTimeoutSecs: req.body.timeoutSettingsInSecs.startedToCompleted,
heartbeatTimeoutSecs: req.body.timeoutSettingsInSecs.heartbeat
}
return async (req: EndpointRequest<PostImmediate>, res: EndpointResponse<PostImmediate>) => {
const task = await scheduler.immediate({
name: req.body.name,
payload: req.body.args,
groupKey: req.body.groupKey,
retryMax: req.body.retry.max,
retryCount: req.body.retry.count,
createdToStartedTimeoutSecs: req.body.timeoutSettingsInSecs.createdToStarted,
startedToCompletedTimeoutSecs: req.body.timeoutSettingsInSecs.startedToCompleted,
heartbeatTimeoutSecs: req.body.timeoutSettingsInSecs.heartbeat
});
if (task.isErr()) {
return res.status(500).json({ error: { code: 'schedule_failed', message: task.error.message } });
return res.status(500).json({ error: { code: 'immediate_failed', message: task.error.message } });
}
return res.status(201).json({ taskId: task.value.id });
};
};

export const route: Route<PostSchedule> = { path, method };
export const route: Route<PostImmediate> = { path, method };

export const routeHandler = (scheduler: Scheduler): RouteHandler<PostSchedule> => {
export const routeHandler = (scheduler: Scheduler): RouteHandler<PostImmediate> => {
return {
...route,
validate,
Expand Down
61 changes: 61 additions & 0 deletions packages/orchestrator/lib/routes/v1/postRecurring.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { z } from 'zod';
import type { JsonValue } from 'type-fest';
import type { Scheduler } from '@nangohq/scheduler';
import type { ApiError, Endpoint } from '@nangohq/types';
import type { EndpointRequest, EndpointResponse, RouteHandler, Route } from '@nangohq/utils';
import { validateRequest } from '@nangohq/utils';
import { syncArgsSchema } from '../../clients/validate.js';

const path = '/v1/recurring';
const method = 'POST';

export type PostRecurring = Endpoint<{
Method: typeof method;
Path: typeof path;
Body: {
name: string;
startsAt: Date;
frequencyMs: number;
args: JsonValue;
};
Error: ApiError<'recurring_failed'>;
Success: { scheduleId: string };
}>;

const validate = validateRequest<PostRecurring>({
parseBody: (data: any) => {
return z
.object({
name: z.string().min(1),
startsAt: z.coerce.date(),
frequencyMs: z.number().int().positive(),
args: syncArgsSchema
})
.parse(data);
}
});

const handler = (scheduler: Scheduler) => {
return async (req: EndpointRequest<PostRecurring>, res: EndpointResponse<PostRecurring>) => {
const schedule = await scheduler.recurring({
name: req.body.name,
payload: req.body.args,
startsAt: req.body.startsAt,
frequencyMs: req.body.frequencyMs
});
if (schedule.isErr()) {
return res.status(500).json({ error: { code: 'recurring_failed', message: schedule.error.message } });
}
return res.status(201).json({ scheduleId: schedule.value.id });
};
};

export const route: Route<PostRecurring> = { path, method };

export const routeHandler = (scheduler: Scheduler): RouteHandler<PostRecurring> => {
return {
...route,
validate,
handler: handler(scheduler)
};
};
Loading

0 comments on commit ad420f8

Please sign in to comment.