diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 31974e1e7..33f605f72 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -337,8 +337,14 @@ export class GolemNetwork { this.abortController.abort("Golem Network is disconnecting"); await Promise.allSettled(this.cleanupTasks.map((task) => task())); this.cleanupTasks = []; - await this.storageProvider.close(); - await this.yagna.disconnect(); + await this.storageProvider + .close() + .catch((err) => this.logger.warn("Closing storage provider resulted with an error, it will be ignored", err)); + await this.yagna + .disconnect() + .catch((err) => + this.logger.warn("Closing connections with yagna resulted with an error, it will be ignored", err), + ); this.services.proposalCache.flushAll(); this.abortController = new AbortController(); } catch (err) { diff --git a/src/shared/storage/gftp.ts b/src/shared/storage/gftp.ts index b15f98cb9..b0844fe12 100644 --- a/src/shared/storage/gftp.ts +++ b/src/shared/storage/gftp.ts @@ -1,9 +1,11 @@ import { StorageProvider } from "./provider"; -import { Logger, defaultLogger, isBrowser, sleep } from "../utils"; +import { defaultLogger, isBrowser, Logger } from "../utils"; import path from "path"; import fs from "fs"; import cp from "child_process"; import { GolemInternalError, GolemUserError } from "../error/golem-error"; +import { v4 } from "uuid"; +import AsyncLock from "async-lock"; export class GftpStorageProvider implements StorageProvider { private gftpServerProcess?: cp.ChildProcess; @@ -16,12 +18,13 @@ export class GftpStorageProvider implements StorageProvider { private publishedUrls = new Set(); private isInitialized = false; + private reader?: AsyncIterableIterator; /** * lock against parallel writing to stdin in gftp process * @private */ - private lock = false; + private lock = new AsyncLock(); constructor(logger?: Logger) { if (isBrowser) { @@ -37,7 +40,8 @@ export class GftpStorageProvider implements StorageProvider { } await this.startGftpServer(); - this.logger.info(`GFTP Version: ${await this.jsonrpc("version")}`); + + this.logger.info(`GFTP Version: ${await this.jsonRpc("version")}`); } private startGftpServer(): Promise { @@ -57,16 +61,18 @@ export class GftpStorageProvider implements StorageProvider { reject(error); }); + this.gftpServerProcess.on("close", (code, signal) => { + this.logger.info("GFTP server closed", { code, signal }); + this.isInitialized = false; + }); + this.gftpServerProcess?.stdout?.setEncoding("utf-8"); this.gftpServerProcess?.stderr?.setEncoding("utf-8"); + this.reader = this.gftpServerProcess?.stdout?.iterator(); }); } - isInitiated() { - return !!this.gftpServerProcess; - } - private async generateTempFileName(): Promise { const { randomUUID } = await import("crypto"); const tmp = await import("tmp"); @@ -76,7 +82,7 @@ export class GftpStorageProvider implements StorageProvider { } async receiveFile(path: string): Promise { - const { url } = await this.jsonrpc("receive", { output_file: path }); + const { url } = await this.jsonRpc("receive", { output_file: path }); return url; } @@ -92,6 +98,7 @@ export class GftpStorageProvider implements StorageProvider { async publishData(src: Uint8Array): Promise { let url: string; + if (Buffer.isBuffer(src)) { url = await this.uploadBytes(src); } else { @@ -99,6 +106,7 @@ export class GftpStorageProvider implements StorageProvider { } this.publishedUrls.add(url); + return url; } @@ -110,38 +118,53 @@ export class GftpStorageProvider implements StorageProvider { private async releaseAll(): Promise { const urls = Array.from(this.publishedUrls).filter((url) => !!url); - if (!urls.length) { - return; + + if (urls.length) { + await this.jsonRpc("close", { urls }); } - await this.jsonrpc("close", { urls }); } async close() { - await this.releaseAll(); - this.gftpServerProcess?.kill(); + if (this.isInitialized) { + await this.releaseAll(); + this.gftpServerProcess?.kill(); + } } - private async jsonrpc(method: string, params: object = {}) { - if (!this.isInitiated()) await this.init(); - while (this.lock) await sleep(100, true); - this.lock = true; - const paramsStr = JSON.stringify(params); - const query = `{"jsonrpc": "2.0", "id": "1", "method": "${method}", "params": ${paramsStr}}\n`; - try { + private async jsonRpc(method: string, params: Record = {}) { + return this.lock.acquire("gftp-io", async () => { + if (!this.isInitialized) { + throw new GolemInternalError( + `GFTP was not initialized when calling JSON-RPC ${method} with ${JSON.stringify(params)}`, + ); + } + + const callId = v4(); + + const request = { + jsonrpc: "2.0", + id: callId, + method: method, + params: params, + }; + + const query = `${JSON.stringify(request)}\n`; + + this.logger.debug("Sending GFTP command", { request }); this.gftpServerProcess?.stdin?.write(query); + const value = (await this.reader?.next())?.value; - if (!value) throw new GolemInternalError("Unable to get GFTP command result"); + if (!value) { + throw new GolemInternalError("Unable to get GFTP command result"); + } + const { result } = JSON.parse(value); - if (result === undefined) throw new GolemInternalError(value); + if (result === undefined) { + throw new GolemInternalError(value); + } + return result; - } catch (error) { - throw new GolemInternalError( - `Error while obtaining response to JSONRPC. query: ${query} error: ${JSON.stringify(error)}`, - error, - ); - } finally { - this.lock = false; - } + }); } private async uploadStream(stream: AsyncGenerator): Promise { @@ -158,7 +181,7 @@ export class GftpStorageProvider implements StorageProvider { } wStream.end(); }); - const links = await this.jsonrpc("publish", { files: [fileName.toString()] }); + const links = await this.jsonRpc("publish", { files: [fileName.toString()] }); if (links.length !== 1) throw "invalid gftp publish response"; return links[0]?.url; } @@ -172,7 +195,7 @@ export class GftpStorageProvider implements StorageProvider { } private async uploadFile(file: string): Promise { - const links = await this.jsonrpc("publish", { files: [file.toString()] }); + const links = await this.jsonRpc("publish", { files: [file.toString()] }); return links[0]?.url; }