diff --git a/src/core/executor.ts b/src/core/executor.ts index 534c428..d95c39c 100644 --- a/src/core/executor.ts +++ b/src/core/executor.ts @@ -6,13 +6,13 @@ import { StartWorkflowRequest, SkipTaskRequest, WorkflowRun, + WorkflowStatus, } from "../common/open-api"; import { TaskResultStatus } from "./types"; -import { errorMapper,tryCatchReThrow } from "./helpers"; +import { errorMapper, tryCatchReThrow } from "./helpers"; const RETRY_TIME_IN_MILLISECONDS = 10000; - export class WorkflowExecutor { public readonly _client: ConductorClient; @@ -52,10 +52,16 @@ export class WorkflowExecutor { name: string, version: number, requestId: string, - waitUntilTaskRef: string = '', + waitUntilTaskRef: string = "" ): Promise { return tryCatchReThrow(() => - this._client.workflowResource.executeWorkflow(workflowRequest, name, version, requestId, waitUntilTaskRef) + this._client.workflowResource.executeWorkflow( + workflowRequest, + name, + version, + requestId, + waitUntilTaskRef + ) ); } @@ -111,7 +117,7 @@ export class WorkflowExecutor { workflowInstanceId: string, includeOutput: boolean, includeVariables: boolean - ) { + ): Promise { return tryCatchReThrow(() => this._client.workflowResource.getWorkflowStatusSummary( workflowInstanceId, @@ -141,7 +147,7 @@ export class WorkflowExecutor { public reRun( workflowInstanceId: string, rerunWorkflowRequest: Partial = {} - ) { + ): Promise { return tryCatchReThrow(() => this._client.workflowResource.rerun( workflowInstanceId, diff --git a/src/task/TaskManager.ts b/src/task/TaskManager.ts index 697db12..805f0bf 100644 --- a/src/task/TaskManager.ts +++ b/src/task/TaskManager.ts @@ -1,8 +1,6 @@ import os from "os"; import { TaskRunner, - TaskRunnerOptions, - TaskErrorHandler, noopErrorHandler, } from "./TaskRunner"; import { ConductorLogger, DefaultLogger } from "../common"; @@ -13,6 +11,7 @@ import { DEFAULT_BATCH_POLLING_TIMEOUT, DEFAULT_CONCURRENCY, } from "./constants"; +import { TaskErrorHandler, TaskRunnerOptions } from "./types"; export type TaskManagerOptions = TaskRunnerOptions; diff --git a/src/task/TaskRunner.ts b/src/task/TaskRunner.ts index 178c04d..c5fc53b 100644 --- a/src/task/TaskRunner.ts +++ b/src/task/TaskRunner.ts @@ -7,28 +7,12 @@ import { DEFAULT_BATCH_POLLING_TIMEOUT, DEFAULT_CONCURRENCY, } from "./constants"; +import { TaskErrorHandler, TaskRunnerOptions, RunnerArgs } from "./types"; +import { optionEquals } from "./helpers"; const DEFAULT_ERROR_MESSAGE = "An unknown error occurred"; const MAX_RETRIES = 3; -export type TaskErrorHandler = (error: Error, task?: Task) => void; - -export interface TaskRunnerOptions { - workerID: string; - domain: string | undefined; - pollInterval?: number; - concurrency?: number; - batchPollingTimeout?: number; -} -export interface RunnerArgs { - worker: ConductorWorker; - taskResource: TaskResourceService; - options: TaskRunnerOptions; - logger?: ConductorLogger; - onError?: TaskErrorHandler; - concurrency?: number; -} - //eslint-disable-next-line export const noopErrorHandler: TaskErrorHandler = (__error: Error) => {}; @@ -103,13 +87,18 @@ export class TaskRunner { updateOptions(options: Partial) { const newOptions = { ...this.options, ...options }; - this.poller.updateOptions({ - concurrency: newOptions.concurrency, - pollInterval: newOptions.pollInterval, - }); - this.logger.info( - `TaskWorker ${this.worker.taskDefName} configuration updated with concurrency of ${this.poller.options.concurrency} and poll interval of ${this.poller.options.pollInterval}` - ); + const isOptionsUpdated = !optionEquals(this.options, newOptions); + + if (isOptionsUpdated) { + this.poller.updateOptions({ + concurrency: newOptions.concurrency, + pollInterval: newOptions.pollInterval, + }); + this.logger.info( + `TaskWorker ${this.worker.taskDefName} configuration updated with concurrency of ${this.poller.options.concurrency} and poll interval of ${this.poller.options.pollInterval}` + ); + } + this.options = newOptions; } diff --git a/src/task/__tests__/TaskRunner.test.ts b/src/task/__tests__/TaskRunner.test.ts index ec4301c..5feb11b 100644 --- a/src/task/__tests__/TaskRunner.test.ts +++ b/src/task/__tests__/TaskRunner.test.ts @@ -1,7 +1,8 @@ import { jest, test, expect } from "@jest/globals"; import type { Mocked } from "jest-mock"; -import { RunnerArgs, TaskRunner } from "../TaskRunner"; +import { TaskRunner } from "../TaskRunner"; +import { RunnerArgs } from "../types"; import { mockLogger } from "./mockLogger"; import { TaskResourceService } from "../../common/open-api"; diff --git a/src/task/__tests__/helpers.test.ts b/src/task/__tests__/helpers.test.ts new file mode 100644 index 0000000..2744210 --- /dev/null +++ b/src/task/__tests__/helpers.test.ts @@ -0,0 +1,55 @@ +import { expect, describe, it } from "@jest/globals"; +import { optionEquals } from "../helpers"; +import { TaskRunnerOptions } from "../types"; + +describe("helpers", () => { + it("Should return true if both options are equals", () => { + const someOptions: TaskRunnerOptions = { + workerID: "some-worker-id", + domain: "mydomain", + pollInterval: 1000, + concurrency: 1, + batchPollingTimeout: 1000, + }; + const otherOptions: TaskRunnerOptions = { + ...someOptions, + }; + expect(optionEquals(someOptions, otherOptions)).toBeTruthy(); + + expect(optionEquals(otherOptions, someOptions)).toBeTruthy(); + }); + + it("Should return true if both options are equals", () => { + const someOptions: TaskRunnerOptions = { + workerID: "some-worker-id", + domain: "mydomain", + pollInterval: 1000, + concurrency: 1, + batchPollingTimeout: 1000, + }; + const otherOptions: TaskRunnerOptions = { + ...someOptions, + batchPollingTimeout: 2000, + }; + expect(optionEquals(someOptions, otherOptions)).not.toBeTruthy(); + + expect(optionEquals(otherOptions, someOptions)).not.toBeTruthy(); + }); + it("Should return false if options are only equal in some properties", () => { + const someOptions: TaskRunnerOptions = { + workerID: "some-worker-id", + domain: "mydomain", + pollInterval: 1000, + concurrency: 1, + batchPollingTimeout: 1000, + }; + + const someOptionsPrime: TaskRunnerOptions = { + workerID: "some-worker-id", + domain: "mydomain", + }; + + expect(optionEquals(someOptions, someOptionsPrime)).not.toBeTruthy(); + expect(optionEquals(someOptionsPrime, someOptions)).not.toBeTruthy(); + }); +}); diff --git a/src/task/helpers.ts b/src/task/helpers.ts new file mode 100644 index 0000000..431e2bf --- /dev/null +++ b/src/task/helpers.ts @@ -0,0 +1,22 @@ +import { TaskRunnerOptions } from "./types"; +type OptionEntries = Array< + [keyof TaskRunnerOptions, string | number | undefined] + >; + + +/** + * Compares if the new options are really new + * @param oldOptions + * @param newOptions + */ +export const optionEquals = ( + oldOptions: Partial, + newOptions: Partial +) => { + const newOptionEntries = Object.entries(newOptions) as OptionEntries; + const oldOptionsEntries = Object.entries(oldOptions) as OptionEntries; + + return newOptionEntries.length === oldOptionsEntries.length && newOptionEntries.every( + ([key, value]) => (oldOptions[key] as unknown) === value + ); +}; diff --git a/src/task/index.ts b/src/task/index.ts index 8a17277..443d590 100644 --- a/src/task/index.ts +++ b/src/task/index.ts @@ -1,3 +1,4 @@ -export * from "./TaskRunner" -export * from "./TaskManager" -export * from "./Worker" +export * from "./TaskRunner"; +export * from "./TaskManager"; +export * from "./Worker"; +export * from "./types"; diff --git a/src/task/types.ts b/src/task/types.ts new file mode 100644 index 0000000..845a2c6 --- /dev/null +++ b/src/task/types.ts @@ -0,0 +1,20 @@ +import type { ConductorLogger } from "../common"; +import type { ConductorWorker } from "./Worker"; +import type { Task, TaskResourceService } from "../common/open-api"; + +export type TaskErrorHandler = (error: Error, task?: Task) => void; +export interface TaskRunnerOptions { + workerID: string; + domain: string | undefined; + pollInterval?: number; + concurrency?: number; + batchPollingTimeout?: number; +} +export interface RunnerArgs { + worker: ConductorWorker; + taskResource: TaskResourceService; + options: TaskRunnerOptions; + logger?: ConductorLogger; + onError?: TaskErrorHandler; + concurrency?: number; +}