Skip to content

Commit

Permalink
Implement Batch polling (#46)
Browse files Browse the repository at this point in the history
* refactor: use batch polling

* fix: reset task in progress

* feat: added logs for when tasks are not getting
polled
fix: poller count calculation

* feat: added logs,
feat: added batchPollingTimeout option
feat: added logs

* feat: added pollInterval property to type

* refactor: favored map over object for runner.
feat: added ability to update single worker config
feat: added sanity check on workers

* refactor: updated docs

* fix: updated docs

* refactor: extracted constants to file
refactor: changed log message

* feat: added keepalive agent
  • Loading branch information
Sudakatux authored Aug 2, 2023
1 parent dd47202 commit 6e4e0ec
Show file tree
Hide file tree
Showing 10 changed files with 685 additions and 224 deletions.
30 changes: 20 additions & 10 deletions src/orkes/OrkesConductorClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,34 @@ import { request as baseRequest } from "./request/request";
import { baseOrkesConductorClient } from "./BaseOrkesConductorClient";
import { FetchFn } from "./types";
import fetch, { Headers, RequestInit, Response } from "node-fetch";
import http from "http";
import https from "https";

const httpAgent = new http.Agent({ keepAlive: true });
const httpsAgent = new https.Agent({ keepAlive: true });

const agent = (_parsedURL: URL) =>
_parsedURL.protocol == "http:" ? httpAgent : httpsAgent;

const nodeFetchWrapper: FetchFn<RequestInit, Response> = async (
input,
options = {}
) => {
const res = await fetch(input.toString(), options as RequestInit);
const res = await fetch(input.toString(), {
...options,
agent,
} as RequestInit);
return res;
};

const fetchCache =
fetchCatchDns<{ headers: Record<string, string> }, Response>(
nodeFetchWrapper,
{
//@ts-ignore
headerFactory: (headers?: HeadersInit) =>
new Headers((headers as Record<string, string>) || {}),
}
);
const fetchCache = fetchCatchDns<{ headers: Record<string, string> }, Response>(
nodeFetchWrapper,
{
//@ts-ignore
headerFactory: (headers?: HeadersInit) =>
new Headers((headers as Record<string, string>) || {}),
}
);

const defaultRequestHandler: ConductorHttpRequest = (
__request,
Expand Down
135 changes: 70 additions & 65 deletions src/task/Poller.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,41 @@
import { ConductorLogger, noopLogger } from "../common";
import {
DEFAULT_POLL_INTERVAL,
DEFAULT_WARN_AT_O,
DEFAULT_CONCURRENCY,
} from "./constants";

interface PollerOptions {
pollInterval?: number;
concurrency: number;
warnAtO?: number;
}

export class Poller {
private concurrentCalls: Array<{
promise: Promise<void>;
stop: () => Promise<boolean>;
}> = [];
private pollFunction: () => Promise<void> = async () => {};
export class Poller<T> {
private timeoutHandler?: NodeJS.Timeout;
private pollFunction: (count: number) => Promise<T[]>;
private performWorkFunction: (work: T) => Promise<void> = async () => {};
private polling = false;
private _tasksInProcess = 0;
private _counterAtO = 0;
private _pollerId: string = "";
options: PollerOptions = {
pollInterval: 1000,
concurrency: 1,
pollInterval: DEFAULT_POLL_INTERVAL,
concurrency: DEFAULT_CONCURRENCY,
warnAtO: DEFAULT_WARN_AT_O,
};
logger: ConductorLogger = noopLogger;

constructor(
pollFunction: () => Promise<void>,
pollerId: string,
pollFunction: (count: number) => Promise<T[]>,
performWorkFunction: (work: T) => Promise<void>,
pollerOptions?: Partial<PollerOptions>,
logger?: ConductorLogger
) {
this._pollerId = pollerId;
this.pollFunction = pollFunction;
this.performWorkFunction = performWorkFunction;
this.options = { ...this.options, ...pollerOptions };
this.logger = logger || noopLogger;
}
Expand All @@ -32,87 +44,80 @@ export class Poller {
return this.polling;
}

get tasksInProcess() {
return this._tasksInProcess;
}

/**
* Starts polling for work
*/
startPolling = () => {
if (this.polling) {
throw new Error("Runner is already started");
}

return this.poll();
this._tasksInProcess = 0;
this.polling = true;
this.poll();
};

/**
* Stops Polling for work
*/
stopPolling = async () => {
await Promise.all(this.concurrentCalls.map((call) => call.stop()));
this.polling = false;
clearTimeout(this.timeoutHandler!);
};

/**
* adds or shuts down concurrent calls based on the concurrency setting
* @param concurrency
*/
private updateConcurrency(concurrency: number) {
if (concurrency > 0 && concurrency !== this.options.concurrency) {
if (concurrency < this.options.concurrency) {
const result = this.concurrentCalls.splice(
0,
this.options.concurrency - concurrency
);
result.forEach((call) => {
call.stop();
this.logger.debug("stopping some spawned calls");
});
} else {
for (let i = 0; i < concurrency - this.options.concurrency; i++) {
this.concurrentCalls.push(this.singlePoll());
this.logger.debug("spawning additional poll calls");
}
}
this.options.concurrency = concurrency;
}
}
private performWork = async (work: T) => {
await this.performWorkFunction(work);
this._tasksInProcess--;
};

updateOptions(options: Partial<PollerOptions>) {
const newOptions = { ...this.options, ...options };
this.updateConcurrency(newOptions.concurrency);
this.options = newOptions;
}

private poll = async () => {
if (!this.polling) {
this.polling = true;
for (let i = 0; i < this.options.concurrency; i++) {
this.concurrentCalls.push(this.singlePoll());
}
}
};

private singlePoll = () => {
let poll = this.polling;
let timeout: NodeJS.Timeout;
const pollingCall = async () => {
while (poll) {
await this.pollFunction();
await new Promise(
(r) =>
poll ? (timeout = setTimeout(() => r(true), this.options.pollInterval)): r(true)
while (this.isPolling) {
try {
// Concurrency could have been updated. Accounting for that
const count = Math.max(
0,
this.options.concurrency - this._tasksInProcess
);

if (count === 0) {
this.logger.debug(
"Max in process reached, Will skip polling for " + this._pollerId
);
this._counterAtO++;
if (this._counterAtO > (this.options.warnAtO ?? 100)) {
this.logger.info(
`Not polling anything because in process tasks is maxed as concurrency level. ${this._pollerId}`
);
}
} else {
this._counterAtO = 0;
const tasksResult: T[] = await this.pollFunction(count);
this._tasksInProcess =
this._tasksInProcess + (tasksResult ?? []).length;

// Don't wait for the tasks to finish only 'listen' to the number of tasks being processes
tasksResult.forEach(this.performWork);
}
} catch (e: any) {
this.logger.error(`Error polling for tasks: ${e.message}`, e);
}
};

return {
promise: pollingCall(),
stop: (): Promise<boolean> =>
new Promise((r) => {
clearTimeout(timeout);
poll = false;
this.logger.debug("stopping single poll call");
r(true);
}),
};
await new Promise((r) =>
this.isPolling
? (this.timeoutHandler = setTimeout(
() => r(true),
this.options.pollInterval
))
: r(true)
);
}
};
}
62 changes: 45 additions & 17 deletions src/task/TaskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import {
import { ConductorLogger, DefaultLogger } from "../common";
import { ConductorWorker } from "./Worker";
import { ConductorClient } from "../common/open-api";
import {
DEFAULT_POLL_INTERVAL,
DEFAULT_BATCH_POLLING_TIMEOUT,
DEFAULT_CONCURRENCY,
} from "./constants";

export type TaskManagerOptions = TaskRunnerOptions;

Expand All @@ -19,9 +24,10 @@ export interface TaskManagerConfig {

const defaultManagerOptions: Required<TaskManagerOptions> = {
workerID: "",
pollInterval: 1000,
pollInterval: DEFAULT_POLL_INTERVAL,
domain: undefined,
concurrency: 1,
concurrency: DEFAULT_CONCURRENCY,
batchPollingTimeout: DEFAULT_BATCH_POLLING_TIMEOUT,
};

function workerId(options: Partial<TaskManagerOptions>) {
Expand All @@ -32,7 +38,7 @@ function workerId(options: Partial<TaskManagerOptions>) {
* Responsible for initializing and managing the runners that poll and work different task queues.
*/
export class TaskManager {
private tasks: Record<string, Array<TaskRunner>> = {};
private workerRunners: Map<string, TaskRunner> = new Map();
private readonly client: ConductorClient;
private readonly logger: ConductorLogger;
private readonly errorHandler: TaskErrorHandler;
Expand Down Expand Up @@ -68,6 +74,7 @@ export class TaskManager {
return {
...this.options,
concurrency: worker.concurrency ?? this.options.concurrency,
pollInterval: worker.pollInterval ?? this.options.pollInterval,
domain: worker.domain ?? this.options.domain,
};
};
Expand All @@ -76,6 +83,21 @@ export class TaskManager {
return this.polling;
}

updatePollingOptionForWorker = (
workerTaskDefName: string,
options: Partial<TaskManagerOptions>
) => {
const maybeRunner = this.workerRunners.get(workerTaskDefName);

if (maybeRunner != null) {
maybeRunner.updateOptions(options);
} else {
this.logger.info(
`No runner found for worker with taskDefName: ${workerTaskDefName}`
);
}
};

/**
* new options will get merged to existing options
* @param options new options to update polling options
Expand All @@ -86,26 +108,33 @@ export class TaskManager {
...this.workerManagerWorkerOptions(worker),
...options,
};
const runners = this.tasks[worker.taskDefName];
runners.forEach((runner) => {
runner.updateOptions(newOptions);
});
this.updatePollingOptionForWorker(worker.taskDefName, newOptions);
});
this.options.concurrency = options.concurrency ?? this.options.concurrency;
this.options.pollInterval =
options.pollInterval ?? this.options.pollInterval;
};

sanityCheck = () => {
if (this.workers.length === 0) {
throw new Error("No workers supplied to TaskManager");
}
const workerIDs = new Set();
for (const item of this.workers) {
if (workerIDs.has(item.taskDefName)) {
throw new Error(`Duplicate worker taskDefName: ${item.taskDefName}`);
}
workerIDs.add(item.taskDefName);
}
};

/**
* Start polling for tasks
*/
startPolling = () => {
this.sanityCheck();
this.workers.forEach((worker) => {
this.tasks[worker.taskDefName] = [];
const options = this.workerManagerWorkerOptions(worker);
this.logger.debug(
`Starting taskDefName=${worker.taskDefName} concurrency=${options.concurrency} domain=${options.domain}`
);
const runner = new TaskRunner({
worker,
options,
Expand All @@ -114,19 +143,18 @@ export class TaskManager {
onError: this.errorHandler,
});
runner.startPolling();
this.tasks[worker.taskDefName].push(runner);
this.workerRunners.set(worker.taskDefName, runner);
});
this.polling = true;
};
/**
* Stops polling for tasks
*/
stopPolling = async () => {
for (const taskType in this.tasks) {
await Promise.all(
this.tasks[taskType].map((runner) => runner.stopPolling())
);
this.tasks[taskType] = [];
for (const [workerTaskDefName, runner] of this.workerRunners) {
this.logger.debug(`Stopping taskDefName=${workerTaskDefName}`);
await runner.stopPolling();
this.workerRunners.delete(workerTaskDefName);
}
this.polling = false;
};
Expand Down
Loading

0 comments on commit 6e4e0ec

Please sign in to comment.