Skip to content

Commit

Permalink
fix(exe-unit): fixed exceeding max listeners when running multiple co…
Browse files Browse the repository at this point in the history
…mmands on the same exe unit

Added removal of event listeners for abort signal after script execution
  • Loading branch information
mgordel committed Aug 28, 2024
1 parent a3658cf commit ef3112e
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions src/activity/exe-script-executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
anyAbortSignal,
createAbortSignalFromTimeout,
Logger,
mergeUntilFirstComplete,
@@ -14,7 +13,7 @@ import { Result, StreamingBatchEvent } from "./results";
import { Activity } from "./activity";
import { getMessageFromApiError } from "../shared/utils/apiErrorMessage";
import { ActivityModule } from "./activity.module";
import { catchError, map, Observable, takeWhile } from "rxjs";
import { catchError, finalize, map, Observable, takeWhile } from "rxjs";

/**
* Information needed to fetch the results of a script execution
@@ -106,17 +105,25 @@ export class ExeScriptExecutor {
signalOrTimeout?: number | AbortSignal,
maxRetries?: number,
): Observable<Result> {
const signal = anyAbortSignal(this.abortSignal, createAbortSignalFromTimeout(signalOrTimeout));
const abortController = new AbortController();
const signal = createAbortSignalFromTimeout(signalOrTimeout);
const onAbort = () => abortController.abort();
if (signal.aborted || this.abortSignal.aborted) {
abortController.abort(signal.reason);
} else {
signal.addEventListener("abort", onAbort);
this.abortSignal.addEventListener("abort", onAbort);
}

// observable that emits when the script execution should be aborted
const abort$ = new Observable<never>((subscriber) => {
const getError = () => new GolemAbortError("Execution of script has been aborted", signal.reason);
const getError = () => new GolemAbortError("Execution of script has been aborted", abortController.signal.reason);

if (signal.aborted) {
if (abortController.signal.aborted) {
subscriber.error(getError());
}

signal.addEventListener("abort", () => {
abortController.signal.addEventListener("abort", () => {
subscriber.error(getError());
});
});
@@ -126,7 +133,12 @@ export class ExeScriptExecutor {
? this.streamingBatch(batch.batchId, batch.batchSize)
: this.pollingBatch(batch.batchId, maxRetries);

return mergeUntilFirstComplete(abort$, results$);
return mergeUntilFirstComplete(abort$, results$).pipe(
finalize(() => {
this.abortSignal.removeEventListener("abort", onAbort);
signal.removeEventListener("abort", onAbort);
}),
);
}

protected async send(script: ExeScriptRequest): Promise<string> {

0 comments on commit ef3112e

Please sign in to comment.