Skip to content

Commit

Permalink
fix(golem-network): fixed gftp close issue preventing clean shutdown …
Browse files Browse the repository at this point in the history
…of golem-network
  • Loading branch information
grisha87 committed Aug 30, 2024
1 parent 3bbfe56 commit 7d0050e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 34 deletions.
10 changes: 8 additions & 2 deletions src/golem-network/golem-network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,14 @@ export class GolemNetwork {
this.abortController.abort("Golem Network is disconnecting");
await Promise.allSettled(this.cleanupTasks.map((task) => task()));
this.cleanupTasks = [];
await this.storageProvider.close();
await this.yagna.disconnect();
await this.storageProvider
.close()
.catch((err) => this.logger.warn("Closing storage provider resulted with an error, it will be ignored", err));
await this.yagna
.disconnect()
.catch((err) =>
this.logger.warn("Closing connections with yagna resulted with an error, it will be ignored", err),
);
this.services.proposalCache.flushAll();
this.abortController = new AbortController();
} catch (err) {
Expand Down
87 changes: 55 additions & 32 deletions src/shared/storage/gftp.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { StorageProvider } from "./provider";
import { Logger, defaultLogger, isBrowser, sleep } from "../utils";
import { defaultLogger, isBrowser, Logger } from "../utils";
import path from "path";
import fs from "fs";
import cp from "child_process";
import { GolemInternalError, GolemUserError } from "../error/golem-error";
import { v4 } from "uuid";
import AsyncLock from "async-lock";

export class GftpStorageProvider implements StorageProvider {
private gftpServerProcess?: cp.ChildProcess;
Expand All @@ -16,12 +18,13 @@ export class GftpStorageProvider implements StorageProvider {
private publishedUrls = new Set<string>();

private isInitialized = false;

private reader?: AsyncIterableIterator<string>;
/**
* lock against parallel writing to stdin in gftp process
* @private
*/
private lock = false;
private lock = new AsyncLock();

constructor(logger?: Logger) {
if (isBrowser) {
Expand All @@ -37,7 +40,8 @@ export class GftpStorageProvider implements StorageProvider {
}

await this.startGftpServer();
this.logger.info(`GFTP Version: ${await this.jsonrpc("version")}`);

this.logger.info(`GFTP Version: ${await this.jsonRpc("version")}`);
}

private startGftpServer(): Promise<void> {
Expand All @@ -57,16 +61,18 @@ export class GftpStorageProvider implements StorageProvider {
reject(error);
});

this.gftpServerProcess.on("close", (code, signal) => {
this.logger.info("GFTP server closed", { code, signal });
this.isInitialized = false;
});

this.gftpServerProcess?.stdout?.setEncoding("utf-8");
this.gftpServerProcess?.stderr?.setEncoding("utf-8");

this.reader = this.gftpServerProcess?.stdout?.iterator();
});
}

isInitiated() {
return !!this.gftpServerProcess;
}

private async generateTempFileName(): Promise<string> {
const { randomUUID } = await import("crypto");
const tmp = await import("tmp");
Expand All @@ -76,7 +82,7 @@ export class GftpStorageProvider implements StorageProvider {
}

async receiveFile(path: string): Promise<string> {
const { url } = await this.jsonrpc("receive", { output_file: path });
const { url } = await this.jsonRpc("receive", { output_file: path });
return url;
}

Expand All @@ -92,13 +98,15 @@ export class GftpStorageProvider implements StorageProvider {

async publishData(src: Uint8Array): Promise<string> {
let url: string;

if (Buffer.isBuffer(src)) {
url = await this.uploadBytes(src);
} else {
url = await this.uploadBytes(Buffer.from(src));
}

this.publishedUrls.add(url);

return url;
}

Expand All @@ -110,38 +118,53 @@ export class GftpStorageProvider implements StorageProvider {

private async releaseAll(): Promise<void> {
const urls = Array.from(this.publishedUrls).filter((url) => !!url);
if (!urls.length) {
return;

if (urls.length) {
await this.jsonRpc("close", { urls });
}
await this.jsonrpc("close", { urls });
}

async close() {
await this.releaseAll();
this.gftpServerProcess?.kill();
if (this.isInitialized) {
await this.releaseAll();
this.gftpServerProcess?.kill();
}
}

private async jsonrpc(method: string, params: object = {}) {
if (!this.isInitiated()) await this.init();
while (this.lock) await sleep(100, true);
this.lock = true;
const paramsStr = JSON.stringify(params);
const query = `{"jsonrpc": "2.0", "id": "1", "method": "${method}", "params": ${paramsStr}}\n`;
try {
private async jsonRpc(method: string, params: Record<string, string | number | string[]> = {}) {
return this.lock.acquire("gftp-io", async () => {
if (!this.isInitialized) {
throw new GolemInternalError(
`GFTP was not initialized when calling JSON-RPC ${method} with ${JSON.stringify(params)}`,
);
}

const callId = v4();

const request = {
jsonrpc: "2.0",
id: callId,
method: method,
params: params,
};

const query = `${JSON.stringify(request)}\n`;

this.logger.debug("Sending GFTP command", { request });
this.gftpServerProcess?.stdin?.write(query);

const value = (await this.reader?.next())?.value;
if (!value) throw new GolemInternalError("Unable to get GFTP command result");
if (!value) {
throw new GolemInternalError("Unable to get GFTP command result");
}

const { result } = JSON.parse(value);
if (result === undefined) throw new GolemInternalError(value);
if (result === undefined) {
throw new GolemInternalError(value);
}

return result;
} catch (error) {
throw new GolemInternalError(
`Error while obtaining response to JSONRPC. query: ${query} error: ${JSON.stringify(error)}`,
error,
);
} finally {
this.lock = false;
}
});
}

private async uploadStream(stream: AsyncGenerator<Buffer>): Promise<string> {
Expand All @@ -158,7 +181,7 @@ export class GftpStorageProvider implements StorageProvider {
}
wStream.end();
});
const links = await this.jsonrpc("publish", { files: [fileName.toString()] });
const links = await this.jsonRpc("publish", { files: [fileName.toString()] });
if (links.length !== 1) throw "invalid gftp publish response";
return links[0]?.url;
}
Expand All @@ -172,7 +195,7 @@ export class GftpStorageProvider implements StorageProvider {
}

private async uploadFile(file: string): Promise<string> {
const links = await this.jsonrpc("publish", { files: [file.toString()] });
const links = await this.jsonRpc("publish", { files: [file.toString()] });
return links[0]?.url;
}

Expand Down

0 comments on commit 7d0050e

Please sign in to comment.