diff --git a/examples/basic/run-and-stream.ts b/examples/basic/run-and-stream.ts index a426b34ed..a573c9643 100644 --- a/examples/basic/run-and-stream.ts +++ b/examples/basic/run-and-stream.ts @@ -46,8 +46,8 @@ const order: MarketOrderSpec = { echo -n 'Hello from stderr yet again' >&2 `, ); - remoteProcess.stdout.on("data", (data) => console.log("stdout>", data)); - remoteProcess.stderr.on("data", (data) => console.error("stderr>", data)); + remoteProcess.stdout.subscribe((data) => console.log("stdout>", data)); + remoteProcess.stderr.subscribe((data) => console.error("stderr>", data)); await remoteProcess.waitForExit(); await rental.stopAndFinalize(); diff --git a/package-lock.json b/package-lock.json index c619c03fa..2d3c0cafe 100644 --- a/package-lock.json +++ b/package-lock.json @@ -71,7 +71,6 @@ "rollup-plugin-polyfill-node": "^0.13.0", "rollup-plugin-visualizer": "^5.12.0", "semantic-release": "^23.0.2", - "stream-browserify": "^3.0.0", "supertest": "^6.3.4", "ts-jest": "^29.1.2", "ts-loader": "^9.5.1", @@ -17385,15 +17384,6 @@ "node": ">= 0.8" } }, - "node_modules/stream-browserify": { - "version": "3.0.0", - "dev": true, - "license": "MIT", - "dependencies": { - "inherits": "~2.0.4", - "readable-stream": "^3.5.0" - } - }, "node_modules/stream-combiner2": { "version": "1.1.1", "dev": true, diff --git a/package.json b/package.json index 944d91524..ced63ba28 100644 --- a/package.json +++ b/package.json @@ -120,7 +120,6 @@ "rollup-plugin-polyfill-node": "^0.13.0", "rollup-plugin-visualizer": "^5.12.0", "semantic-release": "^23.0.2", - "stream-browserify": "^3.0.0", "supertest": "^6.3.4", "ts-jest": "^29.1.2", "ts-loader": "^9.5.1", diff --git a/rollup.config.mjs b/rollup.config.mjs index 9e46e821e..b44f023f2 100644 --- a/rollup.config.mjs +++ b/rollup.config.mjs @@ -31,9 +31,6 @@ export default [ plugins: [ deleteExistingBundles("dist"), ignore(["tmp"]), - alias({ - entries: [{ find: "stream", replacement: "stream-browserify" }], - }), nodeResolve({ browser: true, preferBuiltins: true }), commonjs(), nodePolyfills(), diff --git a/src/activity/exe-script-executor.test.ts b/src/activity/exe-script-executor.test.ts index be3663820..46ea51182 100644 --- a/src/activity/exe-script-executor.test.ts +++ b/src/activity/exe-script-executor.test.ts @@ -4,10 +4,10 @@ import { Capture, Deploy, DownloadFile, Run, Script, Start, Terminate, UploadFil import { buildExeScriptSuccessResult } from "../../tests/utils/helpers"; import { GolemWorkError, WorkErrorCode } from "./exe-unit"; import { Logger, sleep } from "../shared/utils"; -import { GolemAbortError, GolemError } from "../shared/error/golem-error"; +import { GolemAbortError } from "../shared/error/golem-error"; import { ExeScriptExecutor } from "./exe-script-executor"; import { StorageProvider } from "../shared/storage"; -import { from, of, throwError } from "rxjs"; +import { from, lastValueFrom, of, throwError, toArray } from "rxjs"; import { Result, StreamingBatchEvent } from "./results"; import resetAllMocks = jest.resetAllMocks; import { ActivityModule } from "./activity.module"; @@ -50,8 +50,9 @@ describe("ExeScriptExecutor", () => { }), ]); - const streamResult = await executor.execute(new Deploy().toExeScriptRequest()); - const { value: result } = await streamResult[Symbol.asyncIterator]().next(); + const executionMetadata = await executor.execute(new Deploy().toExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata); + const result = await lastValueFrom(result$); expect(result.result).toEqual("Ok"); }); @@ -78,9 +79,10 @@ describe("ExeScriptExecutor", () => { const expectedRunStdOuts = ["test", "test", "stdout_test_command_run_1", "stdout_test_command_run_2", "test"]; await script.before(); - const results = await executor.execute(script.getExeScriptRequest()); - - for await (const result of results) { + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata); + const results = await lastValueFrom(result$.pipe(toArray())); + for (const result of results) { expect(result.result).toEqual("Ok"); expect(result.stdout).toEqual(expectedRunStdOuts.shift()); } @@ -102,7 +104,7 @@ describe("ExeScriptExecutor", () => { const command6 = new Terminate(); const script = Script.create([command1, command2, command3, command4, command5, command6]); - when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenResolve([ + when(mockActivityModule.getBatchResults(_, _, _, _)).thenResolve([ buildExeScriptSuccessResult("test"), buildExeScriptSuccessResult("test"), buildExeScriptSuccessResult("stdout_test_command_run_1"), @@ -120,18 +122,28 @@ describe("ExeScriptExecutor", () => { "test", ]; await script.before(); - const results = await executor.execute(script.getExeScriptRequest()); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata); let resultCount = 0; - return new Promise((res) => { - results.on("data", (result) => { - expect(result.result).toEqual("Ok"); - expect(result.stdout).toEqual(expectedRunStdOuts.shift()); - ++resultCount; - }); - results.on("end", async () => { - await script.after([]); - expect(resultCount).toEqual(6); - return res(); + // each result 2 assertions and 1 in "complete" + expect.assertions(12 + 1); + return new Promise((res, rej) => { + result$.subscribe({ + next: (result) => { + expect(result.result).toEqual("Ok"); + expect(result.stdout).toEqual(expectedRunStdOuts.shift()); + ++resultCount; + }, + complete: async () => { + try { + await script.after([]); + expect(resultCount).toEqual(6); + return res(); + } catch (err) { + rej(err); + } + }, + error: (error) => rej(error), }); }); }); @@ -219,9 +231,11 @@ describe("ExeScriptExecutor", () => { ]; when(mockActivityModule.observeStreamingBatchEvents(_, _)).thenReturn(from(mockedEvents)); await script.before(); - const results = await executor.execute(script.getExeScriptRequest(), true); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, true); let expectedStdout; - for await (const result of results) { + const results = await lastValueFrom(result$.pipe(toArray())); + for (const result of results) { expect(result).toHaveProperty("index"); if (result.index === 2 && result.stdout) expectedStdout = result.stdout; } @@ -247,19 +261,28 @@ describe("ExeScriptExecutor", () => { const command6 = new Terminate(); const script = Script.create([command1, command2, command3, command4, command5, command6]); await script.before(); - const results = await executor.execute(script.getExeScriptRequest(), undefined, undefined); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, undefined, undefined); ac.abort(); - return new Promise((res) => { - results.on("error", (error) => { - expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`)); - return res(); + + expect.assertions(1); + return new Promise((res, rej) => { + result$.subscribe({ + complete: () => rej("Shouldn't have completed"), + error: (error) => { + try { + expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`)); + return res(); + } catch (err) { + rej(err); + } + }, }); - results.on("data", () => null); }); }); it("should cancel executor while streaming batch", async () => { - when(mockActivityModule.observeStreamingBatchEvents(_, _)).thenReturn(of()); + when(mockActivityModule.observeStreamingBatchEvents(_, _)).thenReturn(of()); const ac = new AbortController(); const executor = new ExeScriptExecutor( instance(mockActivity), @@ -279,14 +302,23 @@ describe("ExeScriptExecutor", () => { const command4 = new Terminate(); const script = Script.create([command1, command2, command3, command4]); await script.before(); - const results = await executor.execute(script.getExeScriptRequest(), true, undefined); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, true, undefined); ac.abort(); - return new Promise((res) => { - results.on("error", (error) => { - expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`)); - return res(); + + expect.assertions(1); + return new Promise((res, rej) => { + result$.subscribe({ + complete: () => rej("Shouldn't have completed"), + error: (error) => { + try { + expect(error).toEqual(new GolemAbortError(`Execution of script has been aborted`)); + return res(); + } catch (err) { + rej(err); + } + }, }); - results.on("data", () => null); }); }); }); @@ -306,20 +338,28 @@ describe("ExeScriptExecutor", () => { const error = new Error("Some undefined error"); when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenReject(error); - const results = await executor.execute(script.getExeScriptRequest(), false, 200, 0); - - return new Promise((res) => { - results.on("error", (error: GolemWorkError) => { - expect(error).toBeInstanceOf(GolemWorkError); - expect(error.code).toEqual(WorkErrorCode.ActivityResultsFetchingFailed); - expect(error.getActivity()).toBeDefined(); - expect(error.getAgreement()).toBeDefined(); - expect(error.getProvider()?.name).toEqual("test-provider-name"); - expect(error.previous?.toString()).toEqual("Error: Some undefined error"); - expect(error.toString()).toEqual("Error: Unable to get activity results. Error: Some undefined error"); - return res(); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, false, 200, 0); + + expect.assertions(7); + return new Promise((res, rej) => { + result$.subscribe({ + complete: () => rej("Shouldn't have completed"), + error: (error) => { + try { + expect(error).toBeInstanceOf(GolemWorkError); + expect(error.code).toEqual(WorkErrorCode.ActivityResultsFetchingFailed); + expect(error.getActivity()).toBeDefined(); + expect(error.getAgreement()).toBeDefined(); + expect(error.getProvider()?.name).toEqual("test-provider-name"); + expect(error.previous?.toString()).toEqual("Error: Some undefined error"); + expect(error.toString()).toEqual("Error: Unable to get activity results. Error: Some undefined error"); + return res(); + } catch (err) { + rej(err); + } + }, }); - results.on("data", () => null); }); }); @@ -344,19 +384,27 @@ describe("ExeScriptExecutor", () => { }; when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenReject(error); - const results = await executor.execute(script.getExeScriptRequest(), false, 1_000, 3); - - return new Promise((res) => { - results.on("error", (error: GolemWorkError) => { - expect(error).toBeInstanceOf(GolemWorkError); - expect(error.code).toEqual(WorkErrorCode.ActivityResultsFetchingFailed); - expect(error.getActivity()).toBeDefined(); - expect(error.getAgreement()).toBeDefined(); - expect(error.getProvider()?.name).toEqual("test-provider-name"); - expect(error.previous?.toString()).toEqual("Error: non-retryable error"); - return res(); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, false, 1_000, 3); + + expect.assertions(6); + return new Promise((res, rej) => { + result$.subscribe({ + complete: () => rej("Shouldn't have completed"), + error: (error) => { + try { + expect(error).toBeInstanceOf(GolemWorkError); + expect(error.code).toEqual(WorkErrorCode.ActivityResultsFetchingFailed); + expect(error.getActivity()).toBeDefined(); + expect(error.getAgreement()).toBeDefined(); + expect(error.getProvider()?.name).toEqual("test-provider-name"); + expect(error.previous?.toString()).toEqual("Error: non-retryable error"); + return res(); + } catch (err) { + rej(err); + } + }, }); - results.on("data", () => null); }); }); @@ -389,9 +437,10 @@ describe("ExeScriptExecutor", () => { .thenReject(error) .thenResolve([testResult]); - const results = await executor.execute(script.getExeScriptRequest(), false, undefined, 10); - - for await (const result of results) { + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, false, undefined, 10); + const results = await lastValueFrom(result$.pipe(toArray())); + for (const result of results) { expect(result).toEqual(testResult); } verify(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).times(3); @@ -410,26 +459,34 @@ describe("ExeScriptExecutor", () => { const error = { message: "GSB error: endpoint address not found. Terminated.", status: 500, - toString: () => "Error: GSB error: endpoint address not found. Terminated.", + toString: () => "GSB error: endpoint address not found. Terminated.", }; when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenReject(error); - const results = await executor.execute(script.getExeScriptRequest(), false, undefined, 1); - - return new Promise((res) => { - results.on("error", (error: GolemWorkError) => { - expect(error).toBeInstanceOf(GolemWorkError); - expect(error.code).toEqual(WorkErrorCode.ActivityResultsFetchingFailed); - expect(error.getActivity()).toBeDefined(); - expect(error.getAgreement()).toBeDefined(); - expect(error.getProvider()?.name).toEqual("test-provider-name"); - expect(error.previous?.message).toEqual("GSB error: endpoint address not found. Terminated."); - expect(error.toString()).toEqual( - "Error: Unable to get activity results. Error: GSB error: endpoint address not found. Terminated.", - ); - return res(); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, false, undefined, 1); + + expect.assertions(7); + return new Promise((res, rej) => { + result$.subscribe({ + complete: () => rej("Shouldn't have completed"), + error: (error) => { + try { + expect(error).toBeInstanceOf(GolemWorkError); + expect(error.code).toEqual(WorkErrorCode.ActivityResultsFetchingFailed); + expect(error.getActivity()).toBeDefined(); + expect(error.getAgreement()).toBeDefined(); + expect(error.getProvider()?.name).toEqual("test-provider-name"); + expect(error.previous?.message).toEqual("GSB error: endpoint address not found. Terminated."); + expect(error.toString()).toEqual( + "Error: Unable to get activity results. GSB error: endpoint address not found. Terminated.", + ); + return res(); + } catch (err) { + rej(err); + } + }, }); - results.on("data", () => null); }); }); @@ -445,16 +502,26 @@ describe("ExeScriptExecutor", () => { const command4 = new Run("test_command2"); const command5 = new Run("test_command3"); const script = Script.create([command1, command2, command3, command4, command5]); - const results = await executor.execute(script.getExeScriptRequest(), false, 1); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, false, 1); + + // wait for execute timeout to fire await sleep(10, true); - return new Promise((res) => { - results.on("error", (error: GolemWorkError) => { - expect(error).toBeInstanceOf(GolemAbortError); - expect(error.toString()).toEqual("Error: Execution of script has been aborted"); - return res(); + + expect.assertions(2); + return new Promise((res, rej) => { + result$.subscribe({ + complete: () => rej("Shouldn't have completed"), + error: (error) => { + try { + expect(error).toBeInstanceOf(GolemAbortError); + expect(error.toString()).toEqual("Error: Execution of script has been aborted"); + return res(); + } catch (err) { + rej(err); + } + }, }); - // results.on("end", () => rej()); - results.on("data", () => null); }); }); @@ -478,15 +545,26 @@ describe("ExeScriptExecutor", () => { const command4 = new Terminate(); const script = Script.create([command1, command2, command3, command4]); await script.before(); - const results = await executor.execute(script.getExeScriptRequest(), true, 800); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, true, 800); + + // wait for ExeScriptExecutor abort signal to fire + await sleep(10, true); + + expect.assertions(2); return new Promise((res, rej) => { - results.on("error", (error: GolemError) => { - expect(error).toBeInstanceOf(GolemAbortError); - expect(error.toString()).toEqual("Error: Execution of script has been aborted"); - return res(); + result$.subscribe({ + complete: () => rej("Shouldn't have completed"), + error: (error) => { + try { + expect(error).toBeInstanceOf(GolemAbortError); + expect(error.toString()).toEqual("Error: Execution of script has been aborted"); + return res(); + } catch (err) { + rej(err); + } + }, }); - results.on("end", () => rej()); - results.on("data", () => null); }); }); @@ -510,18 +588,28 @@ describe("ExeScriptExecutor", () => { throwError(() => mockedEventSourceErrorMessage), ); await script.before(); - const results = await executor.execute(script.getExeScriptRequest(), true); - return new Promise((res) => { - results.on("error", (error: GolemWorkError) => { - expect(error).toBeInstanceOf(GolemWorkError); - expect(error.code).toEqual(WorkErrorCode.ActivityResultsFetchingFailed); - expect(error.getActivity()).toBeDefined(); - expect(error.getAgreement()).toBeDefined(); - expect(error.getProvider()?.name).toEqual("test-provider-name"); - expect(error.toString()).toEqual('Error: Unable to get activity results. ["Some undefined error"]'); - return res(); + const executionMetadata = await executor.execute(script.getExeScriptRequest()); + const result$ = executor.getResultsObservable(executionMetadata, true); + + expect.assertions(7); + return new Promise((res, rej) => { + result$.subscribe({ + complete: () => rej("Shouldn't have completed"), + error: (error) => { + try { + expect(error).toBeInstanceOf(GolemWorkError); + expect(error.code).toEqual(WorkErrorCode.ActivityResultsFetchingFailed); + expect(error.getActivity()).toBeDefined(); + expect(error.getAgreement()).toBeDefined(); + expect(error.getProvider()?.name).toEqual("test-provider-name"); + expect(error.previous?.toString()).toEqual("Some undefined error"); + expect(error.toString()).toEqual("Error: Unable to get activity results. Some undefined error"); + return res(); + } catch (err) { + rej(err); + } + }, }); - results.on("data", () => null); }); }); }); diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index c3f037a46..266fadf96 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -1,15 +1,28 @@ -import { createAbortSignalFromTimeout, Logger } from "../shared/utils"; +import { + anyAbortSignal, + createAbortSignalFromTimeout, + Logger, + mergeUntilFirstComplete, + runOnNextEventLoopIteration, +} from "../shared/utils"; import { ExecutionConfig } from "./config"; -import { Readable } from "stream"; import { GolemWorkError, WorkErrorCode } from "./exe-unit"; import { withTimeout } from "../shared/utils/timeout"; import { GolemAbortError } from "../shared/error/golem-error"; import retry from "async-retry"; import { Result, StreamingBatchEvent } from "./results"; -import sleep from "../shared/utils/sleep"; import { Activity } from "./activity"; import { getMessageFromApiError } from "../shared/utils/apiErrorMessage"; import { ActivityModule } from "./activity.module"; +import { catchError, map, Observable, takeWhile } from "rxjs"; + +/** + * Information needed to fetch the results of a script execution + */ +export interface ScriptExecutionMetadata { + batchId: string; + batchSize: number; +} export interface ExeScriptRequest { text: string; @@ -41,39 +54,19 @@ export class ExeScriptExecutor { } /** - * Execute script - * - * @param script - exe script request - * @param stream - define type of getting results from execution (polling or streaming) - * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the execution - * @param maxRetries - maximum number of retries retrieving results when an error occurs, default: 10 + * Executes the provided script and returns the batch id and batch size that can be used + * to fetch it's results + * @param script + * @returns script execution metadata - batch id and batch size that can be used to fetch results using `getResultsObservable` */ - public async execute( - script: ExeScriptRequest, - stream?: boolean, - signalOrTimeout?: number | AbortSignal, - maxRetries?: number, - ): Promise { - let batchId: string, batchSize: number; - const abortController = new AbortController(); - // abort execution in case of cancellation by global signal or by local signal (from parameter) - this.abortSignal.addEventListener("abort", () => abortController.abort(this.abortSignal.reason)); - if (signalOrTimeout) { - const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); - abortSignal.addEventListener("abort", () => abortController.abort(abortSignal.reason)); - } - + public async execute(script: ExeScriptRequest): Promise { try { - abortController.signal.throwIfAborted(); - batchId = await this.send(script); - batchSize = JSON.parse(script.text).length; + this.abortSignal.throwIfAborted(); + const batchId = await this.send(script); + const batchSize = JSON.parse(script.text).length; - abortController.signal.throwIfAborted(); this.logger.debug(`Script sent.`, { batchId }); - - return stream - ? this.streamingBatch(batchId, batchSize, abortController.signal) - : this.pollingBatch(batchId, abortController.signal, maxRetries); + return { batchId, batchSize }; } catch (error) { const message = getMessageFromApiError(error); @@ -81,7 +74,7 @@ export class ExeScriptExecutor { reason: message, }); - if (abortController.signal.aborted) { + if (this.abortSignal.aborted) { throw new GolemAbortError("Executions of script has been aborted", this.abortSignal.reason); } throw new GolemWorkError( @@ -95,14 +88,51 @@ export class ExeScriptExecutor { } } + /** + * Given a batch id and batch size collect the results from yagna. You can choose to either + * stream them as they go or poll for them. When a timeout is reached (by either the timeout provided + * as an argument here or in the constructor) the observable will emit an error. + * + * + * @param batch - batch id and batch size + * @param stream - define type of getting results from execution (polling or streaming) + * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the execution + * @param maxRetries - maximum number of retries retrieving results when an error occurs, default: 10 + */ + public getResultsObservable( + batch: ScriptExecutionMetadata, + stream?: boolean, + signalOrTimeout?: number | AbortSignal, + maxRetries?: number, + ): Observable { + const signal = anyAbortSignal(this.abortSignal, createAbortSignalFromTimeout(signalOrTimeout)); + + // observable that emits when the script execution should be aborted + const abort$ = new Observable((subscriber) => { + const getError = () => new GolemAbortError("Execution of script has been aborted", signal.reason); + + if (signal.aborted) { + subscriber.error(getError()); + } + signal.addEventListener("abort", () => { + subscriber.error(getError()); + }); + }); + + // get an observable that will emit results of a batch execution + const results$ = stream + ? this.streamingBatch(batch.batchId, batch.batchSize) + : this.pollingBatch(batch.batchId, maxRetries); + + return mergeUntilFirstComplete(abort$, results$); + } + protected async send(script: ExeScriptRequest): Promise { return withTimeout(this.activityModule.executeScript(this.activity, script), 10_000); } - private async pollingBatch(batchId: string, abortSignal: AbortSignal, maxRetries?: number): Promise { - this.logger.debug("Starting to poll for batch results"); - - let isBatchFinished = false; + private pollingBatch(batchId: string, maxRetries?: number): Observable { + let isCompleted = false; let lastIndex: number; const { id: activityId, agreement } = this.activity; @@ -110,139 +140,96 @@ export class ExeScriptExecutor { const { activityExeBatchResultPollIntervalSeconds, activityExeBatchResultMaxRetries } = this.options; const { logger, activity, activityModule } = this; - return new Readable({ - objectMode: true, - async read() { - const abortError = new GolemAbortError("Execution of script has been aborted", abortSignal.reason); - abortSignal.addEventListener("abort", () => this.destroy(abortError)); - while (!isBatchFinished && !abortSignal.aborted) { - logger.debug("Polling for batch script execution result"); - - try { - const results = await retry( - async (bail, attempt) => { - logger.debug(`Trying to poll for batch execution results from yagna. Attempt: ${attempt}`); - try { - if (abortSignal.aborted) { - return bail(abortError); - } - return await activityModule.getBatchResults( - activity, - batchId, - undefined, - activityExeBatchResultPollIntervalSeconds, - ); - } catch (error) { - logger.debug(`Failed to fetch activity results. Attempt: ${attempt}. ${error}`); - if (RETRYABLE_ERROR_STATUS_CODES.includes(error?.status)) { - throw error; - } else { - bail(error); - } - } - }, - { - retries: maxRetries ?? activityExeBatchResultMaxRetries, - maxTimeout: 15_000, - }, - ); - - const newResults = results && results.slice(lastIndex + 1); - - logger.debug(`Received batch execution results`, { results: newResults, activityId }); - - if (Array.isArray(newResults) && newResults.length) { - newResults.forEach((result) => { - this.push(result); - isBatchFinished = result.isBatchFinished || false; - lastIndex = result.index; - }); - } - } catch (error) { - if (abortSignal.aborted) { - logger.warn(abortError.message, { activityId: activity.id, batchId, reason: abortSignal.reason }); - return this.destroy(abortError); - } - logger.error(`Processing script execution results failed`, error); - - return this.destroy( - error instanceof GolemWorkError - ? error - : new GolemWorkError( - `Unable to get activity results. ${error}`, - WorkErrorCode.ActivityResultsFetchingFailed, - agreement, - activity, - activity.provider, - error, - ), - ); - } + return new Observable((subscriber) => { + const pollForResults = async (): Promise => { + if (isCompleted) { + subscriber.complete(); + return; } - if (abortSignal.aborted) { - return this.destroy(abortError); + logger.debug("Polling for batch script execution result"); + + await retry( + async (bail, attempt) => { + logger.debug(`Trying to poll for batch execution results from yagna. Attempt: ${attempt}`); + try { + if (isCompleted) { + bail(new Error("Batch is finished")); + } + const results = await activityModule.getBatchResults( + activity, + batchId, + undefined, + activityExeBatchResultPollIntervalSeconds, + ); + + const newResults = results && results.slice(lastIndex + 1); + + logger.debug(`Received batch execution results`, { results: newResults, activityId }); + + if (Array.isArray(newResults) && newResults.length) { + newResults.forEach((result) => { + subscriber.next(result); + isCompleted ||= !!result.isBatchFinished; + lastIndex = result.index; + }); + } + } catch (error) { + logger.debug(`Failed to fetch activity results. Attempt: ${attempt}. ${error}`); + if (RETRYABLE_ERROR_STATUS_CODES.includes(error?.status)) { + throw error; + } else { + bail(error); + } + } + }, + { + retries: maxRetries ?? activityExeBatchResultMaxRetries, + maxTimeout: 15_000, + }, + ); + return runOnNextEventLoopIteration(pollForResults); + }; + + pollForResults().catch((error) => { + logger.error(`Polling for batch results failed`, error); + subscriber.error(error); + }); + return () => { + isCompleted = true; + }; + }).pipe( + catchError((error) => { + if (error instanceof GolemWorkError) { + throw error; } - this.push(null); - }, - }); + throw new GolemWorkError( + `Unable to get activity results. ${error}`, + WorkErrorCode.ActivityResultsFetchingFailed, + agreement, + activity, + activity.provider, + error, + ); + }), + ); } - private async streamingBatch(batchId: string, batchSize: number, abortSignal: AbortSignal): Promise { - const errors: object[] = []; - const results: Result[] = []; - - const source = this.activityModule.observeStreamingBatchEvents(this.activity, batchId).subscribe({ - next: (resultEvents) => results.push(this.parseEventToResult(resultEvents, batchSize)), - error: (err) => errors.push(err.data?.message ?? err), - complete: () => this.logger.debug("Finished reading batch results"), - }); - - let isBatchFinished = false; - - const { logger, activity } = this; - - return new Readable({ - objectMode: true, - async read() { - const abortError = new GolemAbortError("Execution of script has been aborted", abortSignal.reason); - abortSignal.addEventListener("abort", () => this.destroy(abortError)); - while (!isBatchFinished && !abortSignal.aborted) { - let error: Error | undefined; - - if (abortSignal.aborted) { - error = abortError; - } - - if (errors.length) { - error = new GolemWorkError( - `Unable to get activity results. ${JSON.stringify(errors)}`, - WorkErrorCode.ActivityResultsFetchingFailed, - activity.agreement, - activity, - activity.provider, - ); - } - if (error) { - source.unsubscribe(); - return this.destroy(error); - } - - if (results.length) { - const result = results.shift(); - this.push(result); - isBatchFinished = result?.isBatchFinished || false; - } - await sleep(500, true); - } - if (abortSignal.aborted) { - logger.warn(abortError.message, { activityId: activity.id, batchId, reason: abortSignal.reason }); - return this.destroy(abortError); - } else { - this.push(null); - } - source.unsubscribe(); - }, - }); + private streamingBatch(batchId: string, batchSize: number): Observable { + return this.activityModule.observeStreamingBatchEvents(this.activity, batchId).pipe( + map((resultEvents) => this.parseEventToResult(resultEvents, batchSize)), + takeWhile((result) => !result.isBatchFinished, true), + // transform to domain error + catchError((error) => { + throw new GolemWorkError( + `Unable to get activity results. ${error}`, + WorkErrorCode.ActivityResultsFetchingFailed, + this.activity.agreement, + this.activity, + this.activity.provider, + error, + ); + }), + ); } private parseEventToResult(event: StreamingBatchEvent, batchSize: number): Result { diff --git a/src/activity/exe-unit/batch.spec.ts b/src/activity/exe-unit/batch.spec.ts index cb70af1fa..bb7f4419c 100644 --- a/src/activity/exe-unit/batch.spec.ts +++ b/src/activity/exe-unit/batch.spec.ts @@ -13,6 +13,7 @@ import { import { Agreement } from "../../market/agreement"; import { ExeScriptExecutor } from "../exe-script-executor"; +import { lastValueFrom, of, toArray } from "rxjs"; const mockLogger = imock(); const mockYagna = mock(YagnaApi); @@ -27,6 +28,9 @@ describe("Batch", () => { beforeEach(() => { reset(mockLogger); reset(mockYagna); + reset(mockActivity); + reset(mockAgreement); + reset(mockExecutor); const providerInfo = { id: "provider-id", @@ -37,6 +41,7 @@ describe("Batch", () => { when(mockAgreement.provider).thenReturn(providerInfo); when(mockActivity.provider).thenReturn(providerInfo); when(mockActivity.agreement).thenReturn(instance(mockAgreement)); + when(mockExecutor.getResultsObservable(anything())).thenReturn(of()); activity = instance(mockActivity); @@ -118,7 +123,7 @@ describe("Batch", () => { }); it("should work", async () => { - when(mockExecutor.execute(anything())).thenResolve( + when(mockExecutor.getResultsObservable(anything())).thenReturn( buildExecutorResults([ buildExeScriptSuccessResult("Hello World"), buildExeScriptSuccessResult("Hello World 2"), @@ -134,7 +139,7 @@ describe("Batch", () => { it("should initialize script with script.before()", async () => { const spy = jest.spyOn(batch["script"], "before"); - when(mockExecutor.execute(anything())).thenResolve(buildExecutorResults([])); + when(mockExecutor.getResultsObservable(anything())).thenReturn(buildExecutorResults([])); await batch.end(); expect(spy).toHaveBeenCalled(); @@ -142,7 +147,7 @@ describe("Batch", () => { it("should call script.after() on success", async () => { const spy = jest.spyOn(batch["script"], "after"); - when(mockExecutor.execute(anything())).thenResolve(buildExecutorResults([])); + when(mockExecutor.getResultsObservable(anything())).thenReturn(buildExecutorResults([])); await batch.end(); expect(spy).toHaveBeenCalled(); @@ -151,7 +156,7 @@ describe("Batch", () => { it("should call script.after() on failure", async () => { const spy = jest.spyOn(batch["script"], "after"); - when(mockExecutor.execute(anything())).thenResolve( + when(mockExecutor.getResultsObservable(anything())).thenReturn( buildExecutorResults(undefined, undefined, new Error("FAILURE")), ); @@ -171,7 +176,7 @@ describe("Batch", () => { it("should call script.after() on execute error", async () => { const spy = jest.spyOn(batch["script"], "after"); - when(mockExecutor.execute(anything())).thenReject(new Error("ERROR")); + when(mockExecutor.execute(anything())).thenThrow(new Error("ERROR")); await expect(batch.end()).rejects.toStrictEqual( new GolemWorkError( @@ -188,7 +193,7 @@ describe("Batch", () => { }); it("should throw error on result stream error", async () => { - when(mockExecutor.execute(anything())).thenResolve( + when(mockExecutor.getResultsObservable(anything())).thenReturn( buildExecutorResults(undefined, undefined, new Error("FAILURE")), ); await expect(batch.end()).rejects.toStrictEqual( @@ -210,7 +215,7 @@ describe("Batch", () => { }); it("should work", async () => { - when(mockExecutor.execute(anything())).thenResolve( + when(mockExecutor.getResultsObservable(anything())).thenReturn( buildExecutorResults([ buildExeScriptSuccessResult("Hello World"), buildExeScriptSuccessResult("Hello World 2"), @@ -220,8 +225,9 @@ describe("Batch", () => { const results: Result[] = []; const stream = await batch.endStream(); + const allResults = await lastValueFrom(stream.pipe(toArray())); - for await (const result of stream) { + for (const result of allResults) { results.push(result); } @@ -241,26 +247,22 @@ describe("Batch", () => { it("should call script.after() on success", async () => { const spy = jest.spyOn(batch["script"], "after"); - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const r of await batch.endStream()) { - /* empty */ - } + const result$ = await batch.endStream(); + // lastValueFrom errors if the stream is empty, so we need to provide a default value + await lastValueFrom(result$, { defaultValue: undefined }); expect(spy).toHaveBeenCalled(); }); it("should call script.after() on result stream error", async () => { const spy = jest.spyOn(batch["script"], "after"); - when(mockExecutor.execute(anything())).thenResolve( + when(mockExecutor.getResultsObservable(anything())).thenReturn( buildExecutorResults(undefined, [buildExeScriptErrorResult("FAILURE", "FAILURE")]), ); const stream = await batch.endStream(); try { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const r of stream) { - /* empty */ - } + await lastValueFrom(stream); fail("Expected to throw"); } catch (e) { /* empty */ @@ -271,14 +273,9 @@ describe("Batch", () => { it("should call script.after() on execute error", async () => { const spy = jest.spyOn(batch["script"], "after"); - when(mockExecutor.execute(anything())).thenReject(new Error("ERROR")); - - await expect(async () => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const r of await batch.endStream()) { - /* empty */ - } - }).rejects.toMatchError( + when(mockExecutor.execute(anything())).thenThrow(new Error("ERROR")); + + await expect(batch.endStream()).rejects.toMatchError( new GolemWorkError( "Unable to execute script Error: ERROR", WorkErrorCode.ScriptExecutionFailed, @@ -291,22 +288,5 @@ describe("Batch", () => { expect(spy).toHaveBeenCalled(); }); - - it("should destroy the stream on result stream error", async () => { - when(mockExecutor.execute(anything())).thenResolve( - buildExecutorResults(undefined, [buildExeScriptErrorResult("FAILURE", "FAILURE")]), - ); - const stream = await batch.endStream(); - try { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const r of stream) { - /* empty */ - } - fail("Expected to throw"); - } catch (e) { - /* empty */ - } - expect(stream.destroyed).toBe(true); - }); }); }); diff --git a/src/activity/exe-unit/batch.ts b/src/activity/exe-unit/batch.ts index d99801ff5..b39ea8c8c 100644 --- a/src/activity/exe-unit/batch.ts +++ b/src/activity/exe-unit/batch.ts @@ -2,10 +2,10 @@ import { DownloadFile, Run, Script, Transfer, UploadData, UploadFile } from "../ import { Result } from "../index"; import { StorageProvider } from "../../shared/storage"; import { Logger } from "../../shared/utils"; -import { pipeline, Readable, Transform } from "stream"; import { GolemWorkError, WorkErrorCode } from "./error"; -import { ExeScriptExecutor } from "../exe-script-executor"; +import { ExeScriptExecutor, ScriptExecutionMetadata } from "../exe-script-executor"; +import { Observable, finalize, map, tap } from "rxjs"; export class Batch { private readonly script: Script; @@ -79,42 +79,42 @@ export class Batch { const script = this.script.getExeScriptRequest(); this.logger.debug(`Sending exec script request to the exe-unit on provider:`, { script }); - const results = await this.executor.execute(script); + const executionMetadata = await this.executor.execute(script); + const result$ = this.executor.getResultsObservable(executionMetadata); return new Promise((resolve, reject) => { this.logger.debug("Reading the results of the batch script"); - results.on("data", (res) => { - this.logger.debug(`Received data for batch script execution`, { res }); - - allResults.push(res); - }); - - results.on("end", () => { - this.logger.debug("End of batch script execution"); - this.script - .after(allResults) - .then((results) => resolve(results)) - .catch((error) => reject(error)); - }); - - results.on("error", (error) => { - const golemError = - error instanceof GolemWorkError - ? error - : new GolemWorkError( - `Unable to execute script ${error}`, - WorkErrorCode.ScriptExecutionFailed, - this.executor.activity.agreement, - this.executor.activity, - this.executor.activity.agreement.provider, - error, - ); - this.logger.debug("Error in batch script execution"); - this.script - .after(allResults) - .then(() => reject(golemError)) - .catch(() => reject(golemError)); // Return original error, as it might be more important. + result$.subscribe({ + next: (res) => { + this.logger.debug(`Received data for batch script execution`, { res }); + allResults.push(res); + }, + complete: () => { + this.logger.debug("End of batch script execution"); + this.script + .after(allResults) + .then((results) => resolve(results)) + .catch((error) => reject(error)); + }, + error: (error) => { + const golemError = + error instanceof GolemWorkError + ? error + : new GolemWorkError( + `Unable to execute script ${error}`, + WorkErrorCode.ScriptExecutionFailed, + this.executor.activity.agreement, + this.executor.activity, + this.executor.activity.agreement.provider, + error, + ); + this.logger.debug("Error in batch script execution"); + this.script + .after(allResults) + .then(() => reject(golemError)) + .catch(() => reject(golemError)); // Return original error, as it might be more important. + }, }); }); } catch (error) { @@ -136,12 +136,12 @@ export class Batch { } } - async endStream(): Promise { + async endStream(): Promise> { const script = this.script; await script.before(); - let results: Readable; + let executionMetadata: ScriptExecutionMetadata; try { - results = await this.executor.execute(this.script.getExeScriptRequest()); + executionMetadata = await this.executor.execute(this.script.getExeScriptRequest()); } catch (error) { // the original error is more important than the one from after() await script.after([]); @@ -159,32 +159,26 @@ export class Batch { } const decodedResults: Result[] = []; const { activity } = this.executor; - const errorResultHandler = new Transform({ - objectMode: true, - transform(chunk, encoding, callback) { - const error = - chunk?.result === "Error" - ? new GolemWorkError( - `${chunk?.message}. Stdout: ${chunk?.stdout?.trim()}. Stderr: ${chunk?.stderr?.trim()}`, - WorkErrorCode.ScriptExecutionFailed, - activity.agreement, - activity, - activity.provider, - ) - : null; - if (error) { - script.after(decodedResults).catch(); - this.destroy(error); - } else { - decodedResults.push(chunk); - // FIXME: This is broken, chunk result didn't go through after() at this point yet, it might be incomplete. - callback(null, chunk); + const result$ = this.executor.getResultsObservable(executionMetadata); + return result$.pipe( + map((chunk) => { + if (chunk.result !== "Error") { + return chunk; } - }, - }); - const resultsWithErrorHandling = pipeline(results, errorResultHandler, () => { - script.after(decodedResults).catch(); - }); - return resultsWithErrorHandling; + throw new GolemWorkError( + `${chunk?.message}. Stdout: ${String(chunk?.stdout).trim()}. Stderr: ${String(chunk?.stderr).trim()}`, + WorkErrorCode.ScriptExecutionFailed, + activity.agreement, + activity, + activity.provider, + ); + }), + tap((chunk) => { + decodedResults.push(chunk); + }), + finalize(() => + script.after(decodedResults).catch((error) => this.logger.error("Failed to cleanup script", { error })), + ), + ); } } diff --git a/src/activity/exe-unit/exe-unit.test.ts b/src/activity/exe-unit/exe-unit.test.ts index 5ea3b11de..3d1391d7f 100644 --- a/src/activity/exe-unit/exe-unit.test.ts +++ b/src/activity/exe-unit/exe-unit.test.ts @@ -58,7 +58,7 @@ describe("ExeUnit", () => { describe("Executing", () => { it("should execute run command with a single parameter", async () => { - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -78,7 +78,7 @@ describe("ExeUnit", () => { }); it("should execute run command with multiple parameters", async () => { - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -102,7 +102,7 @@ describe("ExeUnit", () => { it("should execute upload file command", async () => { const worker = async (exe: ExeUnit) => exe.uploadFile("./file.txt", "/golem/file.txt"); - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -126,7 +126,7 @@ describe("ExeUnit", () => { it("should execute upload json command", async () => { const worker = async (exe: ExeUnit) => exe.uploadJson({ test: true }, "/golem/file.txt"); - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -153,7 +153,7 @@ describe("ExeUnit", () => { it("should execute download file command", async () => { const worker = async (exe: ExeUnit) => exe.downloadFile("/golem/file.txt", "./file.txt"); - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -181,7 +181,7 @@ describe("ExeUnit", () => { const jsonStr = JSON.stringify(json); const encoded = new TextEncoder().encode(jsonStr); - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -214,7 +214,7 @@ describe("ExeUnit", () => { when(mockStorageProvider.receiveData(anything())).thenResolve(data.toString()); - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -251,7 +251,7 @@ describe("ExeUnit", () => { describe("Exec and stream", () => { it("should execute runAndStream command", async () => { const exe = new ExeUnit(instance(mockActivity), instance(mockActivityModule)); - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -272,7 +272,7 @@ describe("ExeUnit", () => { it("should execute transfer command", async () => { const exe = new ExeUnit(instance(mockActivity), instance(mockActivityModule)); - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -311,7 +311,7 @@ describe("ExeUnit", () => { { stdout: "ok_download_file" }, ]; - when(mockExecutor.execute(_)).thenResolve( + when(mockExecutor.getResultsObservable(_)).thenReturn( buildExecutorResults(expectedStdout.map((e) => buildExeScriptSuccessResult(e.stdout))), ); @@ -343,20 +343,22 @@ describe("ExeUnit", () => { { stdout: "ok_download_file" }, ]; - when(mockExecutor.execute(_)).thenResolve( + when(mockExecutor.getResultsObservable(_)).thenReturn( buildExecutorResults(expectedStdout.map((e) => buildExeScriptSuccessResult(e.stdout))), ); const results = await worker(exe); - await new Promise((res, rej) => { - results?.on("data", (result) => { - try { - expect(result.stdout).toEqual(expectedStdout?.shift()?.stdout); - } catch (e) { - rej(e); - } + await new Promise((res, rej) => { + results.subscribe({ + next: (result) => { + try { + expect(result.stdout).toEqual(expectedStdout?.shift()?.stdout); + } catch (e) { + rej(e); + } + }, + complete: () => res(), }); - results?.on("end", res); }); verify(mockStorageProvider.publishFile("./file.txt")).once(); @@ -428,7 +430,7 @@ describe("ExeUnit", () => { const eventDate = new Date().toISOString(); - when(mockExecutor.execute(_, _, _, _)).thenResolve( + when(mockExecutor.getResultsObservable(_, _, _, _)).thenReturn( buildExecutorResults([ { index: 0, @@ -470,7 +472,7 @@ describe("ExeUnit", () => { const worker = async (exe: ExeUnit) => exe.beginBatch().run("invalid_shell_command").end(); const exe = new ExeUnit(instance(mockActivity), instance(mockActivityModule)); - when(mockExecutor.execute(_)).thenResolve( + when(mockExecutor.getResultsObservable(_)).thenReturn( buildExecutorResults(undefined, [buildExeScriptErrorResult("error", "Some error occurred")]), ); const [result] = await worker(exe); @@ -483,18 +485,20 @@ describe("ExeUnit", () => { const worker = async (exe: ExeUnit) => exe.beginBatch().run("invalid_shell_command").endStream(); const exe = new ExeUnit(instance(mockActivity), instance(mockActivityModule)); - when(mockExecutor.execute(_)).thenResolve( + when(mockExecutor.getResultsObservable(_)).thenReturn( buildExecutorResults(undefined, [buildExeScriptErrorResult("error", "Some error occurred", "test_result")]), ); const results = await worker(exe); await new Promise((res) => - results.once("error", (error: GolemModuleError) => { - expect(error.message).toEqual("Some error occurred. Stdout: test_result. Stderr: error"); - expect(error).toBeInstanceOf(GolemWorkError); - expect(error.code).toEqual(WorkErrorCode.ScriptExecutionFailed); - res(true); + results.subscribe({ + error: (error: GolemModuleError) => { + expect(error.message).toEqual("Some error occurred. Stdout: test_result. Stderr: error"); + expect(error).toBeInstanceOf(GolemWorkError); + expect(error.code).toEqual(WorkErrorCode.ScriptExecutionFailed); + res(true); + }, }), ); }); diff --git a/src/activity/exe-unit/exe-unit.ts b/src/activity/exe-unit/exe-unit.ts index 203cc941a..223b8c76c 100644 --- a/src/activity/exe-unit/exe-unit.ts +++ b/src/activity/exe-unit/exe-unit.ts @@ -22,6 +22,7 @@ import { GolemAbortError, GolemConfigError, GolemTimeoutError } from "../../shar import { Agreement, ProviderInfo } from "../../market/agreement"; import { TcpProxy } from "../../network/tcpProxy"; import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor"; +import { lastValueFrom, tap, toArray } from "rxjs"; export type LifecycleFunction = (exe: ExeUnit) => Promise; @@ -156,14 +157,20 @@ export class ExeUnit { private async deployActivity() { try { - const result = await this.executor.execute( + const executionMetadata = await this.executor.execute( new Script([new Deploy(this.networkNode?.getNetworkConfig?.()), new Start()]).getExeScriptRequest(), ); - for await (const res of result) { - if (res.result === "Error") { - throw new Error(res.message); - } - } + const result$ = this.executor.getResultsObservable(executionMetadata); + // if any result is an error, throw an error + await lastValueFrom( + result$.pipe( + tap((result) => { + if (result.result === "Error") { + throw new Error(String(result.message)); + } + }), + ), + ); } catch (error) { throw new GolemWorkError( `Unable to deploy activity. ${error}`, @@ -244,22 +251,15 @@ export class ExeUnit { const script = new Script([run]); // In this case, the script consists only of one run command, // so we skip the execution of script.before and script.after - const streamOfActivityResults = await this.executor - .execute(script.getExeScriptRequest(), true, options?.signalOrTimeout, options?.maxRetries) - .catch((e) => { - throw new GolemWorkError( - `Script execution failed for command: ${JSON.stringify(run.toJson())}. ${ - e?.response?.data?.message || e?.message || e - }`, - WorkErrorCode.ScriptExecutionFailed, - this.activity.agreement, - this.activity, - this.activity.provider, - e, - ); - }); + const executionMetadata = await this.executor.execute(script.getExeScriptRequest()); + const activityResult$ = this.executor.getResultsObservable( + executionMetadata, + true, + options?.signalOrTimeout, + options?.maxRetries, + ); - return new RemoteProcess(this.activityModule, streamOfActivityResults, this.activity, this.logger); + return new RemoteProcess(this.activityModule, activityResult$, this.activity, this.logger); } /** @@ -392,16 +392,16 @@ export class ExeUnit { await sleep(100, true); // Send script. - const results = await this.executor.execute( - script.getExeScriptRequest(), + const executionMetadata = await this.executor.execute(script.getExeScriptRequest()); + const result$ = this.executor.getResultsObservable( + executionMetadata, false, options?.signalOrTimeout, options?.maxRetries, ); // Process result. - let allResults: Result[] = []; - for await (const result of results) allResults.push(result); + let allResults: Result[] = await lastValueFrom(result$.pipe(toArray())); allResults = await script.after(allResults); // Handle errors. diff --git a/src/activity/exe-unit/process.spec.ts b/src/activity/exe-unit/process.spec.ts index 73b745d8f..046903078 100644 --- a/src/activity/exe-unit/process.spec.ts +++ b/src/activity/exe-unit/process.spec.ts @@ -8,6 +8,7 @@ import { buildExeScriptErrorResult, buildExeScriptSuccessResult, } from "../../../tests/utils/helpers"; +import { lastValueFrom, toArray } from "rxjs"; const mockYagna = mock(YagnaApi); const mockAgreement = mock(Agreement); @@ -46,7 +47,8 @@ describe("RemoteProcess", () => { activity, instance(mockLogger), ); - for await (const stdout of remoteProcess.stdout) { + const allStdout = await lastValueFrom(remoteProcess.stdout.pipe(toArray())); + for (const stdout of allStdout) { expect(stdout).toEqual("Output"); } }); @@ -59,7 +61,8 @@ describe("RemoteProcess", () => { activity, instance(mockLogger), ); - for await (const stderr of remoteProcess.stderr) { + const allStderr = await lastValueFrom(remoteProcess.stderr.pipe(toArray())); + for (const stderr of allStderr) { expect(stderr).toEqual("Error"); } }); diff --git a/src/activity/exe-unit/process.ts b/src/activity/exe-unit/process.ts index 4fa7b50a4..44c69d0a0 100644 --- a/src/activity/exe-unit/process.ts +++ b/src/activity/exe-unit/process.ts @@ -1,8 +1,8 @@ -import { Readable, Transform } from "stream"; import { Activity, ActivityModule, Result } from "../index"; import { GolemWorkError, WorkErrorCode } from "./error"; import { GolemTimeoutError } from "../../shared/error/golem-error"; import { Logger } from "../../shared/utils"; +import { Observable, Subject, Subscription, finalize } from "rxjs"; const DEFAULTS = { exitWaitingTimeout: 20_000, @@ -15,27 +15,39 @@ export class RemoteProcess { /** * Stream connected to stdout from provider process */ - readonly stdout: Readable; + readonly stdout: Subject = new Subject(); /** * Stream connected to stderr from provider process */ - readonly stderr: Readable; + readonly stderr: Subject = new Subject(); private lastResult?: Result; private streamError?: Error; + private subscription: Subscription; + constructor( private readonly activityModule: ActivityModule, - private streamOfActivityResults: Readable, + activityResult$: Observable, private activity: Activity, private readonly logger: Logger, ) { - this.streamOfActivityResults.on("data", (data) => (this.lastResult = data)); - this.streamOfActivityResults.on("error", (error) => (this.streamError = error)); - const { stdout, stderr } = this.transformResultsStream(); - this.stdout = stdout; - this.stderr = stderr; + this.subscription = activityResult$ + .pipe( + finalize(() => { + this.stdout.complete(); + this.stderr.complete(); + }), + ) + .subscribe({ + next: (result) => { + this.lastResult = result; + if (result.stdout) this.stdout.next(result.stdout); + if (result.stderr) this.stderr.next(result.stderr); + }, + error: (error) => (this.streamError = error), + }); } /** @@ -80,22 +92,7 @@ export class RemoteProcess { .catch((err) => this.logger.error(`Error when destroying activity`, err)); } }; - if (this.streamOfActivityResults.closed) return end(); - this.streamOfActivityResults.on("close", end); + this.subscription.add(() => end()); }); } - - private transformResultsStream(): { stdout: Readable; stderr: Readable } { - const transform = (std: string) => - new Transform({ - objectMode: true, - transform(chunk, encoding, callback) { - callback(null, chunk?.[std]); - }, - }); - return { - stdout: this.streamOfActivityResults.pipe(transform("stdout")), - stderr: this.streamOfActivityResults.pipe(transform("stderr")), - }; - } } diff --git a/src/shared/utils/index.ts b/src/shared/utils/index.ts index ed6b3533e..63471f224 100644 --- a/src/shared/utils/index.ts +++ b/src/shared/utils/index.ts @@ -9,3 +9,4 @@ export * as EnvUtils from "./env"; export { YagnaApi, YagnaOptions } from "../yagna/yagnaApi"; export * from "./abortSignal"; export * from "./eventLoop"; +export * from "./rxjs"; diff --git a/src/shared/utils/rxjs.ts b/src/shared/utils/rxjs.ts new file mode 100644 index 000000000..b2962c9de --- /dev/null +++ b/src/shared/utils/rxjs.ts @@ -0,0 +1,23 @@ +import { Observable, Subject, finalize, mergeWith, takeUntil } from "rxjs"; + +/** + * Merges two observables until the first one completes (or errors). + * The difference between this and `merge` is that this will complete when the first observable completes, + * while `merge` would only complete when _all_ observables complete. + */ +export function mergeUntilFirstComplete( + observable1: Observable, + observable2: Observable, +): Observable { + const completionSubject = new Subject(); + const observable1WithCompletion = observable1.pipe( + takeUntil(completionSubject), + finalize(() => completionSubject.next()), + ); + const observable2WithCompletion = observable2.pipe( + takeUntil(completionSubject), + finalize(() => completionSubject.next()), + ); + + return observable1WithCompletion.pipe(mergeWith(observable2WithCompletion)); +} diff --git a/tests/utils/helpers.ts b/tests/utils/helpers.ts index 45deab00b..2cf24e028 100644 --- a/tests/utils/helpers.ts +++ b/tests/utils/helpers.ts @@ -1,4 +1,4 @@ -import { Readable } from "stream"; +import { map, of, throwError } from "rxjs"; import { Result, ResultData } from "../../src/activity/results"; /** @@ -44,18 +44,8 @@ export const simulateLongPoll = (response: T, pollingTimeMs: number = 10) => * Helper function that makes preparing activity result returned by Activity.execute function */ export const buildExecutorResults = (successResults?: ResultData[], errorResults?: ResultData[], error?: Error) => { - return new Readable({ - objectMode: true, - async read() { - if (error) { - this.emit("error", error); - } - const results = successResults?.length - ? successResults.shift() - : errorResults?.length - ? errorResults.shift() - : null; - this.push(results); - }, - }); + if (error) { + return throwError(() => error); + } + return of(...(successResults ?? []), ...(errorResults ?? [])).pipe(map((resultData) => new Result(resultData))); };