From ef3112ea996e1683c5ef5f5771ff54163ca792d6 Mon Sep 17 00:00:00 2001 From: Marcin Gordel Date: Wed, 28 Aug 2024 15:33:19 +0200 Subject: [PATCH] fix(exe-unit): fixed exceeding max listeners when running multiple commands on the same exe unit Added removal of event listeners for abort signal after script execution --- src/activity/exe-script-executor.ts | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index f7db15b26..82450a811 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -1,5 +1,4 @@ import { - anyAbortSignal, createAbortSignalFromTimeout, Logger, mergeUntilFirstComplete, @@ -14,7 +13,7 @@ import { Result, StreamingBatchEvent } from "./results"; import { Activity } from "./activity"; import { getMessageFromApiError } from "../shared/utils/apiErrorMessage"; import { ActivityModule } from "./activity.module"; -import { catchError, map, Observable, takeWhile } from "rxjs"; +import { catchError, finalize, map, Observable, takeWhile } from "rxjs"; /** * Information needed to fetch the results of a script execution @@ -106,17 +105,25 @@ export class ExeScriptExecutor { signalOrTimeout?: number | AbortSignal, maxRetries?: number, ): Observable { - const signal = anyAbortSignal(this.abortSignal, createAbortSignalFromTimeout(signalOrTimeout)); + const abortController = new AbortController(); + const signal = createAbortSignalFromTimeout(signalOrTimeout); + const onAbort = () => abortController.abort(); + if (signal.aborted || this.abortSignal.aborted) { + abortController.abort(signal.reason); + } else { + signal.addEventListener("abort", onAbort); + this.abortSignal.addEventListener("abort", onAbort); + } // observable that emits when the script execution should be aborted const abort$ = new Observable((subscriber) => { - const getError = () => new GolemAbortError("Execution of script has been aborted", signal.reason); + const getError = () => new GolemAbortError("Execution of script has been aborted", abortController.signal.reason); - if (signal.aborted) { + if (abortController.signal.aborted) { subscriber.error(getError()); } - signal.addEventListener("abort", () => { + abortController.signal.addEventListener("abort", () => { subscriber.error(getError()); }); }); @@ -126,7 +133,12 @@ export class ExeScriptExecutor { ? this.streamingBatch(batch.batchId, batch.batchSize) : this.pollingBatch(batch.batchId, maxRetries); - return mergeUntilFirstComplete(abort$, results$); + return mergeUntilFirstComplete(abort$, results$).pipe( + finalize(() => { + this.abortSignal.removeEventListener("abort", onAbort); + signal.removeEventListener("abort", onAbort); + }), + ); } protected async send(script: ExeScriptRequest): Promise {