From 083a0eb04a4b92ed4727240227ae92726f89368b Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 9 Oct 2024 16:30:55 +0200 Subject: [PATCH 1/7] feat: change the default storage provider from `gftp` to `ws` Added implementation for `uploadFile` and `receiveFile` to websocket storage provider. Changed the default provider to `ws`. You can still use the old `GftpStorageProvider` by setting `storageProvider: "gftp"` --- package-lock.json | 30 ++- package.json | 2 +- src/golem-network/golem-network.test.ts | 4 +- src/golem-network/golem-network.ts | 27 ++- ...rverAdapter.ts => StorageServerAdapter.ts} | 14 +- src/shared/storage/default.ts | 18 +- src/shared/storage/gftp.ts | 5 + src/shared/storage/index.ts | 2 +- .../{ws-browser.test.ts => ws.test.ts} | 172 +++++++++++++++++- src/shared/storage/{ws-browser.ts => ws.ts} | 98 ++++++++-- 10 files changed, 302 insertions(+), 70 deletions(-) rename src/shared/storage/{GftpServerAdapter.ts => StorageServerAdapter.ts} (80%) rename src/shared/storage/{ws-browser.test.ts => ws.test.ts} (70%) rename src/shared/storage/{ws-browser.ts => ws.ts} (64%) diff --git a/package-lock.json b/package-lock.json index a5b8c017e..7eea482f4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,6 @@ ], "dependencies": { "@golem-sdk/pino-logger": "^1.1.0", - "@rollup/rollup-win32-x64-msvc": "^4", "async-lock": "^1.4.1", "async-retry": "^1.3.3", "axios": "^1.6.7", @@ -50,7 +49,7 @@ "@types/eventsource": "^1.1.15", "@types/express": "^4.17.21", "@types/jest": "^29.5.12", - "@types/node": "^20.11.20", + "@types/node": "^18.19.55", "@types/semver": "^7.5.8", "@types/supertest": "^6.0.2", "@types/tmp": "^0.2.6", @@ -117,6 +116,15 @@ "node": ">=18.0.0" } }, + "examples/node_modules/@types/node": { + "version": "20.16.11", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.11.tgz", + "integrity": "sha512-y+cTCACu92FyA5fgQSAI8A1H429g7aSK2HsO7K4XYUWc4dY5IUz55JSDIYT6/VsOLfGy8vmvQYC2hfb0iF16Uw==", + "dev": true, + "dependencies": { + "undici-types": "~6.19.2" + } + }, "node_modules/@aashutoshrathi/word-wrap": { "version": "1.2.6", "dev": true, @@ -3830,13 +3838,20 @@ "license": "MIT" }, "node_modules/@types/node": { - "version": "20.12.2", + "version": "18.19.55", + "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.55.tgz", + "integrity": "sha512-zzw5Vw52205Zr/nmErSEkN5FLqXPuKX/k5d1D7RKHATGqU7y6YfX9QxZraUzUrFGqH6XzOzG196BC35ltJC4Cw==", "dev": true, - "license": "MIT", "dependencies": { "undici-types": "~5.26.4" } }, + "node_modules/@types/node/node_modules/undici-types": { + "version": "5.26.5", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", + "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", + "dev": true + }, "node_modules/@types/normalize-package-data": { "version": "2.4.4", "dev": true, @@ -18449,9 +18464,10 @@ } }, "node_modules/undici-types": { - "version": "5.26.5", - "dev": true, - "license": "MIT" + "version": "6.19.8", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz", + "integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==", + "dev": true }, "node_modules/unicode-emoji-modifier-base": { "version": "1.0.0", diff --git a/package.json b/package.json index cdaaf7888..d46b82e00 100644 --- a/package.json +++ b/package.json @@ -100,7 +100,7 @@ "@types/eventsource": "^1.1.15", "@types/express": "^4.17.21", "@types/jest": "^29.5.12", - "@types/node": "^20.11.20", + "@types/node": "^18.19.55", "@types/semver": "^7.5.8", "@types/supertest": "^6.0.2", "@types/tmp": "^0.2.6", diff --git a/src/golem-network/golem-network.test.ts b/src/golem-network/golem-network.test.ts index 09f6715fb..cdb9656f0 100644 --- a/src/golem-network/golem-network.test.ts +++ b/src/golem-network/golem-network.test.ts @@ -9,7 +9,7 @@ import { MarketApiAdapter, PaymentApiAdapter } from "../shared/yagna"; import { ActivityApiAdapter } from "../shared/yagna/adapters/activity-api-adapter"; import { GolemNetwork, MarketOrderSpec } from "./golem-network"; import { _, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito"; -import { GftpStorageProvider } from "../shared/storage"; +import { WebSocketStorageProvider } from "../shared/storage"; const order: MarketOrderSpec = Object.freeze({ demand: { @@ -34,7 +34,7 @@ const mockYagna = mock(YagnaApi); const mockPaymentApi = mock(PaymentApiAdapter); const mockActivityApi = mock(ActivityApiAdapter); const mockMarketApi = mock(MarketApiAdapter); -const mockStorageProvider = mock(GftpStorageProvider); +const mockStorageProvider = mock(WebSocketStorageProvider); afterEach(() => { reset(mockYagna); diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index acdb784db..13955f4ba 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -1,4 +1,4 @@ -import { anyAbortSignal, createAbortSignalFromTimeout, defaultLogger, isNode, Logger, YagnaApi } from "../shared/utils"; +import { anyAbortSignal, createAbortSignalFromTimeout, defaultLogger, Logger, YagnaApi } from "../shared/utils"; import { Demand, DraftOfferProposalPool, @@ -29,17 +29,13 @@ import { ProposalRepository } from "../shared/yagna/repository/proposal-reposito import { CacheService } from "../shared/cache/CacheService"; import { DemandRepository } from "../shared/yagna/repository/demand-repository"; import { IDemandRepository, OrderDemandOptions } from "../market/demand"; -import { GftpServerAdapter } from "../shared/storage/GftpServerAdapter"; -import { - GftpStorageProvider, - NullStorageProvider, - StorageProvider, - WebSocketBrowserStorageProvider, -} from "../shared/storage"; +import { StorageServerAdapter } from "../shared/storage/StorageServerAdapter"; +import { GftpStorageProvider, NullStorageProvider, StorageProvider, WebSocketStorageProvider } from "../shared/storage"; import { DataTransferProtocol } from "../shared/types"; import { NetworkApiAdapter } from "../shared/yagna/adapters/network-api-adapter"; import { IProposalRepository } from "../market/proposal"; import { Subscription } from "rxjs"; +import { GolemConfigError } from "../shared/error/golem-error"; /** * Instance of an object or a factory function that you can call `new` on. @@ -105,7 +101,7 @@ export interface GolemNetworkOptions { /** * Set the data transfer protocol to use for file transfers. - * Default is `gftp`. + * Default is `ws`. */ dataTransferProtocol?: DataTransferProtocol; @@ -228,7 +224,7 @@ export class GolemNetwork { constructor(options: Partial = {}) { const optDefaults: GolemNetworkOptions = { - dataTransferProtocol: isNode ? "gftp" : "ws", + dataTransferProtocol: "ws", }; this.options = { @@ -285,7 +281,7 @@ export class GolemNetwork { this.options.override?.marketApi || new MarketApiAdapter(this.yagna, agreementRepository, proposalRepository, demandRepository, this.logger), networkApi: this.options.override?.networkApi || new NetworkApiAdapter(this.yagna), - fileServer: this.options.override?.fileServer || new GftpServerAdapter(this.storageProvider), + fileServer: this.options.override?.fileServer || new StorageServerAdapter(this.storageProvider), }; this.network = getFactory(NetworkModuleImpl, this.options.override?.network)(this.services); this.market = getFactory(MarketModuleImpl, this.options.override?.market)( @@ -655,13 +651,16 @@ export class GolemNetwork { private createStorageProvider(): StorageProvider { if (typeof this.options.dataTransferProtocol === "string") { switch (this.options.dataTransferProtocol) { + case "gftp": + return new GftpStorageProvider(this.logger); case "ws": - return new WebSocketBrowserStorageProvider(this.yagna, { + return new WebSocketStorageProvider(this.yagna, { logger: this.logger, }); - case "gftp": default: - return new GftpStorageProvider(this.logger); + throw new GolemConfigError( + `Unsupported data transfer protocol ${this.options.dataTransferProtocol}. Supported protocols are "gftp" and "ws"`, + ); } } else if (this.options.dataTransferProtocol !== undefined) { return this.options.dataTransferProtocol; diff --git a/src/shared/storage/GftpServerAdapter.ts b/src/shared/storage/StorageServerAdapter.ts similarity index 80% rename from src/shared/storage/GftpServerAdapter.ts rename to src/shared/storage/StorageServerAdapter.ts index 84816bded..f499ab95e 100644 --- a/src/shared/storage/GftpServerAdapter.ts +++ b/src/shared/storage/StorageServerAdapter.ts @@ -5,9 +5,10 @@ import fs from "fs"; import jsSha3 from "js-sha3"; /** - * This class provides GFTP based implementation of the IFileServer interface used in the SDK + * IFileServer implementation that uses any StorageProvider to serve files. + * Make sure that the storage provider implements the `.publishFile()` method. */ -export class GftpServerAdapter implements IFileServer { +class StorageServerAdapter implements IFileServer { private published = new Map(); constructor(private readonly storage: StorageProvider) {} @@ -57,3 +58,12 @@ export class GftpServerAdapter implements IFileServer { }); } } + +/** + * @deprecated Use StorageServerAdapter instead. This will be removed in the next major version. + * + * This class provides GFTP based implementation of the IFileServer interface used in the SDK + */ +class GftpServerAdapter extends StorageServerAdapter {} + +export { GftpServerAdapter, StorageServerAdapter }; diff --git a/src/shared/storage/default.ts b/src/shared/storage/default.ts index bf9f75ad8..ec14c57d8 100644 --- a/src/shared/storage/default.ts +++ b/src/shared/storage/default.ts @@ -1,16 +1,8 @@ -import { GftpStorageProvider } from "./gftp"; -import { WebSocketBrowserStorageProvider } from "./ws-browser"; -import { NullStorageProvider } from "./null"; -import { Logger, YagnaApi, isNode, isBrowser } from "../utils"; +import { WebSocketStorageProvider } from "./ws"; +import { Logger, YagnaApi } from "../utils"; export function createDefaultStorageProvider(yagnaApi: YagnaApi, logger?: Logger) { - if (isNode) { - return new GftpStorageProvider(logger?.child("storage")); - } - if (isBrowser) { - return new WebSocketBrowserStorageProvider(yagnaApi, { - logger: logger?.child("storage"), - }); - } - return new NullStorageProvider(); + return new WebSocketStorageProvider(yagnaApi, { + logger: logger?.child("storage"), + }); } diff --git a/src/shared/storage/gftp.ts b/src/shared/storage/gftp.ts index 8efdc1781..8c7e1f731 100644 --- a/src/shared/storage/gftp.ts +++ b/src/shared/storage/gftp.ts @@ -8,6 +8,11 @@ import { GolemInternalError, GolemUserError } from "../error/golem-error"; import { v4 } from "uuid"; import AsyncLock from "async-lock"; +/** + * @deprecated Use WebSocketStorageProvider instead. This will be removed in the next major version. + * + * Storage provider that spawns a GFTP process and uses it to serve files. + */ export class GftpStorageProvider implements StorageProvider { private gftpServerProcess?: ChildProcess; private logger: Logger; diff --git a/src/shared/storage/index.ts b/src/shared/storage/index.ts index 51d7ba66f..2cb0e7b3d 100644 --- a/src/shared/storage/index.ts +++ b/src/shared/storage/index.ts @@ -1,5 +1,5 @@ export { StorageProvider } from "./provider"; export { GftpStorageProvider } from "./gftp"; export { NullStorageProvider } from "./null"; -export { WebSocketBrowserStorageProvider, WebSocketStorageProviderOptions } from "./ws-browser"; +export { WebSocketStorageProvider, WebSocketStorageProviderOptions } from "./ws"; export { createDefaultStorageProvider } from "./default"; diff --git a/src/shared/storage/ws-browser.test.ts b/src/shared/storage/ws.test.ts similarity index 70% rename from src/shared/storage/ws-browser.test.ts rename to src/shared/storage/ws.test.ts index 1d30cc987..37f453141 100644 --- a/src/shared/storage/ws-browser.test.ts +++ b/src/shared/storage/ws.test.ts @@ -1,13 +1,17 @@ // TODO: improve mocks - remove as any /* eslint-disable @typescript-eslint/no-explicit-any */ -import { GolemInternalError, Logger, nullLogger, WebSocketBrowserStorageProvider, YagnaApi } from "../../index"; +import { Logger, nullLogger, WebSocketStorageProvider, YagnaApi } from "../../index"; // .js added for ESM compatibility import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; import { GsbApi, IdentityApi } from "ya-ts-client"; import { anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; +import fs, { FileHandle } from "fs/promises"; +import { Stats } from "fs"; jest.mock("uuid", () => ({ v4: () => "uuid" })); +jest.mock("fs/promises"); +const mockFs = fs as jest.Mocked; type UploadChunkChunk = { offset: number; content: Uint8Array }; @@ -18,12 +22,12 @@ const logger = imock(); const yagnaApi = instance(mockYagna); const TEST_IDENTITY = "0x19ee20228a4c4bf8d4aebc79d9d3af2a01433456"; -describe("WebSocketBrowserStorageProvider", () => { +describe("WebSocketStorageProvider", () => { const createProvider = () => - new WebSocketBrowserStorageProvider(yagnaApi, { + new WebSocketStorageProvider(yagnaApi, { logger: instance(logger), }); - let provider: WebSocketBrowserStorageProvider; + let provider: WebSocketStorageProvider; beforeEach(() => { provider = createProvider(); @@ -53,13 +57,13 @@ describe("WebSocketBrowserStorageProvider", () => { describe("constructor", () => { it("should create default logger", () => { - const provider = new WebSocketBrowserStorageProvider(yagnaApi, {}); + const provider = new WebSocketStorageProvider(yagnaApi, {}); expect(provider["logger"]).toBeDefined(); }); it("should use provided logger", () => { const logger = nullLogger(); - const provider = new WebSocketBrowserStorageProvider(yagnaApi, { logger }); + const provider = new WebSocketStorageProvider(yagnaApi, { logger }); expect(provider["logger"]).toBe(logger); }); }); @@ -171,8 +175,90 @@ describe("WebSocketBrowserStorageProvider", () => { }); describe("publishFile()", () => { - it("should fail", async () => { - await expect(() => provider.publishFile()).rejects.toMatchError(new GolemInternalError("Not implemented")); + let socket: WebSocket; + let fileInfo: { id: string; url: string }; + let fileHandle: FileHandle; + + beforeEach(() => { + socket = Object.assign(new EventTarget(), { send: jest.fn() }) as unknown as WebSocket; + fileInfo = { + id: "10", + url: "http://localhost:8080", + }; + + jest.spyOn(provider as any, "createFileInfo").mockImplementation(() => Promise.resolve(fileInfo)); + jest.spyOn(provider as any, "createSocket").mockImplementation(() => Promise.resolve(socket)); + mockFs.stat.mockResolvedValue({ size: 10 } as unknown as Stats); + fileHandle = { + read: jest.fn(), + close: jest.fn(), + } as unknown as jest.Mocked; + mockFs.open.mockResolvedValue(fileHandle); + }); + + it("should read the file and upload it", async () => { + expect.assertions(9); + const result = await provider["publishFile"]("./file.txt"); + expect(result).toBe(fileInfo.url); + expect(provider["createSocket"]).toHaveBeenCalledWith(fileInfo, ["GetMetadata", "GetChunk"]); + expect(mockFs.stat).toHaveBeenCalledWith("./file.txt"); + expect(mockFs.open).toHaveBeenCalledWith("./file.txt", "r"); + + async function sendGetChunk(chunk: number[], offset: number, id: string) { + fileHandle.read = jest.fn().mockImplementationOnce((buffer: Buffer) => { + for (let i = 0; i < chunk.length; i++) { + buffer.writeUInt8(chunk[i], i); + } + }); + socket.dispatchEvent( + new MessageEvent("message", { + data: encode({ + id, + component: "GetChunk", + payload: { + offset, + size: chunk.length, + }, + }).buffer, + }), + ); + await new Promise(setImmediate); + const expectedBuffer = Buffer.alloc(chunk.length); + for (let i = 0; i < chunk.length; i++) { + expectedBuffer.writeUInt8(chunk[i], i); + } + expect(socket.send).toHaveBeenLastCalledWith( + encode({ + id, + payload: { + content: expectedBuffer, + offset, + }, + }), + ); + } + + socket.dispatchEvent( + new MessageEvent("message", { + data: encode({ + id: "1", + component: "GetMetadata", + }).buffer, + }), + ); + expect(socket.send).toHaveBeenCalledWith( + encode({ + id: "1", + payload: { + fileSize: 10, + }, + }), + ); + + await sendGetChunk([10, 11, 12, 13], 0, "2"); + await sendGetChunk([14, 15, 16, 17], 4, "3"); + await sendGetChunk([18, 19], 8, "4"); + expect(fileHandle.close).toHaveBeenCalledTimes(1); }); }); @@ -263,8 +349,74 @@ describe("WebSocketBrowserStorageProvider", () => { }); describe("receiveFile()", () => { - it("should fail", async () => { - await expect(() => provider.receiveFile()).rejects.toMatchError(new GolemInternalError("Not implemented")); + let socket: WebSocket; + let fileInfo: { id: string; url: string }; + let fileHandle: FileHandle; + + beforeEach(async () => { + socket = Object.assign(new EventTarget(), { send: jest.fn() }) as unknown as WebSocket; + fileInfo = { + id: "10", + url: "http://localhost:8080", + }; + + jest.spyOn(provider as any, "createFileInfo").mockImplementation(() => Promise.resolve(fileInfo)); + jest.spyOn(provider as any, "createSocket").mockImplementation(() => Promise.resolve(socket)); + fileHandle = { + write: jest.fn(), + close: jest.fn(), + } as unknown as jest.Mocked; + mockFs.open.mockResolvedValue(fileHandle); + }); + + it("should receive the file and write it to the disc", async () => { + expect.assertions(10); + const result = await provider["receiveFile"]("./file.txt"); + expect(result).toBe(fileInfo.url); + expect(provider["createSocket"]).toHaveBeenCalledWith(fileInfo, ["UploadChunk", "UploadFinished"]); + expect(mockFs.open).toHaveBeenCalledWith("./file.txt", "w"); + + async function sendUploadChunk(chunk: number[], id: string) { + const expectedBuffer = Buffer.alloc(chunk.length); + for (let i = 0; i < chunk.length; i++) { + expectedBuffer.writeUInt8(chunk[i], i); + } + socket.dispatchEvent( + new MessageEvent("message", { + data: encode({ + id, + component: "UploadChunk", + payload: { + chunk: { + content: expectedBuffer, + }, + }, + }).buffer, + }), + ); + await new Promise(setImmediate); + expect(fileHandle.write).toHaveBeenCalledWith(Uint8Array.from(expectedBuffer)); + expect(socket.send).toHaveBeenLastCalledWith( + encode({ + id, + payload: null, + }), + ); + } + + await sendUploadChunk([10, 11, 12, 13], "1"); + await sendUploadChunk([14, 15, 16, 17], "2"); + await sendUploadChunk([18, 19], "3"); + socket.dispatchEvent( + new MessageEvent("message", { + data: encode({ + id: "4", + component: "UploadFinished", + }).buffer, + }), + ); + await new Promise(setImmediate); + expect(fileHandle.close).toHaveBeenCalled(); }); }); diff --git a/src/shared/storage/ws-browser.ts b/src/shared/storage/ws.ts similarity index 64% rename from src/shared/storage/ws-browser.ts rename to src/shared/storage/ws.ts index fdf8115a9..c86ec5449 100644 --- a/src/shared/storage/ws-browser.ts +++ b/src/shared/storage/ws.ts @@ -3,8 +3,9 @@ import { v4 } from "uuid"; // .js added for ESM compatibility import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; -import { Logger, nullLogger, YagnaApi } from "../utils"; +import { defaultLogger, isBrowser, Logger, YagnaApi } from "../utils"; import { GolemInternalError } from "../error/golem-error"; +import fsPromises from "fs/promises"; export interface WebSocketStorageProviderOptions { logger?: Logger; @@ -53,7 +54,7 @@ type GftpFileInfo = { /** * Storage provider that uses GFTP over WebSockets. */ -export class WebSocketBrowserStorageProvider implements StorageProvider { +export class WebSocketStorageProvider implements StorageProvider { /** * Map of open services (IDs) indexed by GFTP url. */ @@ -63,9 +64,9 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { constructor( private readonly yagnaApi: YagnaApi, - private readonly options: WebSocketStorageProviderOptions, + private readonly options?: WebSocketStorageProviderOptions, ) { - this.logger = options.logger ?? nullLogger(); + this.logger = options?.logger || defaultLogger("storage"); } close(): Promise { @@ -92,19 +93,56 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { offset: req.payload.offset, }); } else { - this.logger.error( - `[WebSocketBrowserStorageProvider] Unsupported message in publishData(): ${ - (req as GsbRequest).component - }`, - ); + this.logger.error(`Unsupported message in publishData(): ${(req as GsbRequest).component}`); } }); return fileInfo.url; } - async publishFile(): Promise { - throw new GolemInternalError("Not implemented"); + async publishFile(src: string): Promise { + if (isBrowser) { + throw new GolemInternalError("Cannot publish files in browser context, did you mean to use `publishData()`?"); + } + + this.logger.info("Preparing file upload", { sourcePath: src }); + + const fileInfo = await this.createFileInfo(); + const ws = await this.createSocket(fileInfo, ["GetMetadata", "GetChunk"]); + const fileStats = await fsPromises.stat(src); + const fileSize = fileStats.size; + + const fd = await fsPromises.open(src, "r"); + + ws.addEventListener("message", async (event) => { + const req = toObject(event.data) as GsbRequestPublishUnion; + + if (req.component === "GetMetadata") { + this.respond(ws, req.id, { fileSize }); + } else if (req.component === "GetChunk") { + const { offset, size } = req.payload; + + const chunkSize = Math.min(size, fileSize - offset); + const chunk = Buffer.alloc(chunkSize); + + try { + await fd.read(chunk, 0, chunkSize, offset); + this.respond(ws, req.id, { + content: chunk, + offset, + }); + } finally { + // After the last chunk, close the file descriptor + if (offset + chunkSize >= fileSize) { + await fd.close(); + } + } + } else { + this.logger.error(`Unsupported message in publishFile(): ${(req as GsbRequest).component}`); + } + }); + + return fileInfo.url; } async receiveData(callback: StorageProviderDataCallback): Promise { @@ -122,19 +160,39 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { const result = this.completeReceive(req.payload.hash, data); callback(result); } else { - this.logger.error( - `[WebSocketBrowserStorageProvider] Unsupported message in receiveData(): ${ - (req as GsbRequest).component - }`, - ); + this.logger.error(`Unsupported message in receiveData(): ${(req as GsbRequest).component}`); } }); return fileInfo.url; } - async receiveFile(): Promise { - throw new GolemInternalError("Not implemented"); + async receiveFile(path: string): Promise { + if (isBrowser) { + throw new GolemInternalError("Cannot receive files in browser context, did you mean to use `receiveData()`?"); + } + + this.logger.info("Preparing file download", { destination: path }); + + const fileInfo = await this.createFileInfo(); + const fileHandle = await fsPromises.open(path, "w"); + const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]); + + ws.addEventListener("message", (event) => { + const req = toObject(event.data) as GsbRequestReceiveUnion; + if (req.component === "UploadChunk") { + fileHandle.write(req.payload.chunk.content); + this.respond(ws, req.id, null); + } else if (req.component === "UploadFinished") { + this.respond(ws, req.id, null); + + fileHandle.close(); + } else { + this.logger.error(`Unsupported message in receiveFile(): ${(req as GsbRequest).component}`); + } + }); + + return fileInfo.url; } async release(urls: string[]): Promise { @@ -142,7 +200,7 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { const serviceId = this.services.get(url); if (serviceId) { this.deleteService(serviceId).catch((error) => - this.logger.warn(`[WebSocketBrowserStorageProvider] Failed to delete service`, { serviceId, error }), + this.logger.warn(`Failed to delete service`, { serviceId, error }), ); } this.services.delete(url); @@ -168,7 +226,7 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { const service = await this.createService(fileInfo, components); const ws = new WebSocket(service.url, ["gsb+flexbuffers"]); ws.addEventListener("error", () => { - this.logger.error(`[WebSocketBrowserStorageProvider] Socket Error (${fileInfo.id})`); + this.logger.error(`Socket Error (${fileInfo.id})`); }); ws.binaryType = "arraybuffer"; return ws; From b069f5436bb234bee3709c4518682016f93cbc22 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 9 Oct 2024 16:32:27 +0200 Subject: [PATCH 2/7] docs: change the rental time to 15 minutes in examples and README files --- README.md | 2 +- docs/UPGRADING.md | 14 +++++++------- docs/USAGE.md | 4 ++-- .../advanced/local-image/local-image.ts | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index ee1940224..ac0c1cd58 100644 --- a/README.md +++ b/README.md @@ -149,7 +149,7 @@ const order: MarketOrderSpec = { }, market: { // We're only going to rent the provider for 5 minutes max - rentHours: 5 / 60, + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 0.5, diff --git a/docs/UPGRADING.md b/docs/UPGRADING.md index 03d789653..db86a5b85 100644 --- a/docs/UPGRADING.md +++ b/docs/UPGRADING.md @@ -47,14 +47,14 @@ import { GolemNetwork } from "@golem-sdk/golem-js"; try { await glm.connect(); - const retnal = await glm.oneOf({ + const rental = await glm.oneOf({ order: { demand: { workload: { imageTag: "golem/alpine:latest" }, }, - // You have to be now explicit about about your terms and expectatios from the market + // You have to be now explicit about about your terms and expectations from the market market: { - rentHours: 5 / 60, + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 0.5, @@ -132,9 +132,9 @@ import { GolemNetwork } from "@golem-sdk/golem-js"; demand: { workload: { imageTag: "golem/alpine:latest" }, }, - // You have to be now explicit about about your terms and expectatios from the market + // You have to be now explicit about about your terms and expectations from the market market: { - rentHours: 5 / 60, + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 0.5, @@ -147,13 +147,13 @@ import { GolemNetwork } from "@golem-sdk/golem-js"; const inputs = [1, 2, 3, 4, 5]; - // You still take the necessary precaucions, pipeline your work and processing + // You still take the necessary precautions, pipeline your work and processing const results = await Promise.allSettled( inputs.map((input) => // 🌟🌟 You access rentals from the pool pool.withRental((rental) => rental - // 🌟🌟🌟 You issue the comands as in case of a single-provider scenario + // 🌟🌟🌟 You issue the commands as in case of a single-provider scenario .getExeUnit() .then((exe) => exe.run(`echo 'Hello ${input}`)) .then((res) => res.stdout), diff --git a/docs/USAGE.md b/docs/USAGE.md index 5289ac82b..76e412c62 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -21,8 +21,8 @@ const order: MarketOrderSpec = { workload: { imageTag: "golem/alpine:latest" }, }, market: { - // We're only going to rent the provider for 5 minutes max - rentHours: 5 / 60, + // We're only going to rent the provider for 15 minutes max + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 0.5, diff --git a/examples/rental-model/advanced/local-image/local-image.ts b/examples/rental-model/advanced/local-image/local-image.ts index 3c4503539..9acd4eadf 100644 --- a/examples/rental-model/advanced/local-image/local-image.ts +++ b/examples/rental-model/advanced/local-image/local-image.ts @@ -29,7 +29,7 @@ const getImagePath = (path: string) => new URL(path, import.meta.url).toString() }, }, market: { - rentHours: 5 / 60, + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 1, From 74b37022974fd0bbd603179b7d72310b806f973f Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 9 Oct 2024 17:26:12 +0200 Subject: [PATCH 3/7] chore: add 'ws' import in node --- src/shared/storage/ws.ts | 41 +++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/src/shared/storage/ws.ts b/src/shared/storage/ws.ts index c86ec5449..23ec807a4 100644 --- a/src/shared/storage/ws.ts +++ b/src/shared/storage/ws.ts @@ -5,7 +5,8 @@ import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; import { defaultLogger, isBrowser, Logger, YagnaApi } from "../utils"; import { GolemInternalError } from "../error/golem-error"; -import fsPromises from "fs/promises"; +import fsPromises, { FileHandle } from "fs/promises"; +import WebSocket from "ws"; export interface WebSocketStorageProviderOptions { logger?: Logger; @@ -61,6 +62,7 @@ export class WebSocketStorageProvider implements StorageProvider { private services = new Map(); private logger: Logger; private ready = false; + private openHandles = new Set(); constructor( private readonly yagnaApi: YagnaApi, @@ -69,8 +71,9 @@ export class WebSocketStorageProvider implements StorageProvider { this.logger = options?.logger || defaultLogger("storage"); } - close(): Promise { + async close(): Promise { this.ready = false; + await Promise.allSettled(Array.from(this.openHandles).map((handle) => handle.close())); return this.release(Array.from(this.services.keys())); } @@ -84,6 +87,10 @@ export class WebSocketStorageProvider implements StorageProvider { const ws = await this.createSocket(fileInfo, ["GetMetadata", "GetChunk"]); ws.addEventListener("message", (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } const req = toObject(event.data) as GsbRequestPublishUnion; if (req.component === "GetMetadata") { this.respond(ws, req.id, { fileSize: data.byteLength }); @@ -112,9 +119,15 @@ export class WebSocketStorageProvider implements StorageProvider { const fileStats = await fsPromises.stat(src); const fileSize = fileStats.size; - const fd = await fsPromises.open(src, "r"); + const fileHandle = await fsPromises.open(src, "r"); + this.openHandles.add(fileHandle); ws.addEventListener("message", async (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } + const req = toObject(event.data) as GsbRequestPublishUnion; if (req.component === "GetMetadata") { @@ -126,7 +139,7 @@ export class WebSocketStorageProvider implements StorageProvider { const chunk = Buffer.alloc(chunkSize); try { - await fd.read(chunk, 0, chunkSize, offset); + await fileHandle.read(chunk, 0, chunkSize, offset); this.respond(ws, req.id, { content: chunk, offset, @@ -134,7 +147,8 @@ export class WebSocketStorageProvider implements StorageProvider { } finally { // After the last chunk, close the file descriptor if (offset + chunkSize >= fileSize) { - await fd.close(); + await fileHandle.close(); + this.openHandles.delete(fileHandle); } } } else { @@ -151,6 +165,10 @@ export class WebSocketStorageProvider implements StorageProvider { const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]); ws.addEventListener("message", (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } const req = toObject(event.data) as GsbRequestReceiveUnion; if (req.component === "UploadChunk") { data.push(req.payload.chunk); @@ -176,17 +194,22 @@ export class WebSocketStorageProvider implements StorageProvider { const fileInfo = await this.createFileInfo(); const fileHandle = await fsPromises.open(path, "w"); + this.openHandles.add(fileHandle); const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]); - ws.addEventListener("message", (event) => { + ws.addEventListener("message", async (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } const req = toObject(event.data) as GsbRequestReceiveUnion; if (req.component === "UploadChunk") { - fileHandle.write(req.payload.chunk.content); + await fileHandle.write(req.payload.chunk.content); this.respond(ws, req.id, null); } else if (req.component === "UploadFinished") { this.respond(ws, req.id, null); - - fileHandle.close(); + await fileHandle.close(); + this.openHandles.delete(fileHandle); } else { this.logger.error(`Unsupported message in receiveFile(): ${(req as GsbRequest).component}`); } From 2cb34de17af5d9fa19d3387ce83912c480ca9752 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 9 Oct 2024 17:40:31 +0200 Subject: [PATCH 4/7] chore: fix ws types in unit test --- src/shared/storage/ws.test.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/shared/storage/ws.test.ts b/src/shared/storage/ws.test.ts index 37f453141..d7e7a0c98 100644 --- a/src/shared/storage/ws.test.ts +++ b/src/shared/storage/ws.test.ts @@ -8,6 +8,7 @@ import { GsbApi, IdentityApi } from "ya-ts-client"; import { anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; import fs, { FileHandle } from "fs/promises"; import { Stats } from "fs"; +import WebSocket from "ws"; jest.mock("uuid", () => ({ v4: () => "uuid" })); jest.mock("fs/promises"); @@ -175,12 +176,12 @@ describe("WebSocketStorageProvider", () => { }); describe("publishFile()", () => { - let socket: WebSocket; + let socket: EventTarget & { send: jest.Mock }; let fileInfo: { id: string; url: string }; let fileHandle: FileHandle; beforeEach(() => { - socket = Object.assign(new EventTarget(), { send: jest.fn() }) as unknown as WebSocket; + socket = Object.assign(new EventTarget(), { send: jest.fn() }); fileInfo = { id: "10", url: "http://localhost:8080", @@ -349,12 +350,12 @@ describe("WebSocketStorageProvider", () => { }); describe("receiveFile()", () => { - let socket: WebSocket; + let socket: EventTarget & { send: jest.Mock }; let fileInfo: { id: string; url: string }; let fileHandle: FileHandle; beforeEach(async () => { - socket = Object.assign(new EventTarget(), { send: jest.fn() }) as unknown as WebSocket; + socket = Object.assign(new EventTarget(), { send: jest.fn() }); fileInfo = { id: "10", url: "http://localhost:8080", From 258a653d0e09ead55b24ba8a784c1db570041123 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 9 Oct 2024 18:33:11 +0200 Subject: [PATCH 5/7] build: update fs imports to get around issue in rollup node polyfill plugin --- src/shared/storage/ws.test.ts | 27 ++++++++++++++------------- src/shared/storage/ws.ts | 7 +++++-- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/shared/storage/ws.test.ts b/src/shared/storage/ws.test.ts index d7e7a0c98..ab7e47c9a 100644 --- a/src/shared/storage/ws.test.ts +++ b/src/shared/storage/ws.test.ts @@ -6,12 +6,13 @@ import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; import { GsbApi, IdentityApi } from "ya-ts-client"; import { anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; -import fs, { FileHandle } from "fs/promises"; -import { Stats } from "fs"; import WebSocket from "ws"; +import * as fs from "fs"; jest.mock("uuid", () => ({ v4: () => "uuid" })); -jest.mock("fs/promises"); +jest.mock("fs", () => ({ + promises: {}, +})); const mockFs = fs as jest.Mocked; type UploadChunkChunk = { offset: number; content: Uint8Array }; @@ -178,7 +179,7 @@ describe("WebSocketStorageProvider", () => { describe("publishFile()", () => { let socket: EventTarget & { send: jest.Mock }; let fileInfo: { id: string; url: string }; - let fileHandle: FileHandle; + let fileHandle: fs.promises.FileHandle; beforeEach(() => { socket = Object.assign(new EventTarget(), { send: jest.fn() }); @@ -189,12 +190,12 @@ describe("WebSocketStorageProvider", () => { jest.spyOn(provider as any, "createFileInfo").mockImplementation(() => Promise.resolve(fileInfo)); jest.spyOn(provider as any, "createSocket").mockImplementation(() => Promise.resolve(socket)); - mockFs.stat.mockResolvedValue({ size: 10 } as unknown as Stats); + mockFs.promises.stat = jest.fn().mockResolvedValue({ size: 10 } as unknown as fs.Stats); fileHandle = { read: jest.fn(), close: jest.fn(), - } as unknown as jest.Mocked; - mockFs.open.mockResolvedValue(fileHandle); + } as unknown as jest.Mocked; + mockFs.promises.open = jest.fn().mockResolvedValue(fileHandle); }); it("should read the file and upload it", async () => { @@ -202,8 +203,8 @@ describe("WebSocketStorageProvider", () => { const result = await provider["publishFile"]("./file.txt"); expect(result).toBe(fileInfo.url); expect(provider["createSocket"]).toHaveBeenCalledWith(fileInfo, ["GetMetadata", "GetChunk"]); - expect(mockFs.stat).toHaveBeenCalledWith("./file.txt"); - expect(mockFs.open).toHaveBeenCalledWith("./file.txt", "r"); + expect(mockFs.promises.stat).toHaveBeenCalledWith("./file.txt"); + expect(mockFs.promises.open).toHaveBeenCalledWith("./file.txt", "r"); async function sendGetChunk(chunk: number[], offset: number, id: string) { fileHandle.read = jest.fn().mockImplementationOnce((buffer: Buffer) => { @@ -352,7 +353,7 @@ describe("WebSocketStorageProvider", () => { describe("receiveFile()", () => { let socket: EventTarget & { send: jest.Mock }; let fileInfo: { id: string; url: string }; - let fileHandle: FileHandle; + let fileHandle: fs.promises.FileHandle; beforeEach(async () => { socket = Object.assign(new EventTarget(), { send: jest.fn() }); @@ -366,8 +367,8 @@ describe("WebSocketStorageProvider", () => { fileHandle = { write: jest.fn(), close: jest.fn(), - } as unknown as jest.Mocked; - mockFs.open.mockResolvedValue(fileHandle); + } as unknown as jest.Mocked; + mockFs.promises.open = jest.fn().mockResolvedValue(fileHandle); }); it("should receive the file and write it to the disc", async () => { @@ -375,7 +376,7 @@ describe("WebSocketStorageProvider", () => { const result = await provider["receiveFile"]("./file.txt"); expect(result).toBe(fileInfo.url); expect(provider["createSocket"]).toHaveBeenCalledWith(fileInfo, ["UploadChunk", "UploadFinished"]); - expect(mockFs.open).toHaveBeenCalledWith("./file.txt", "w"); + expect(mockFs.promises.open).toHaveBeenCalledWith("./file.txt", "w"); async function sendUploadChunk(chunk: number[], id: string) { const expectedBuffer = Buffer.alloc(chunk.length); diff --git a/src/shared/storage/ws.ts b/src/shared/storage/ws.ts index 23ec807a4..065839db8 100644 --- a/src/shared/storage/ws.ts +++ b/src/shared/storage/ws.ts @@ -5,9 +5,12 @@ import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; import { defaultLogger, isBrowser, Logger, YagnaApi } from "../utils"; import { GolemInternalError } from "../error/golem-error"; -import fsPromises, { FileHandle } from "fs/promises"; import WebSocket from "ws"; +// FIXME: cannot import fs/promises because the rollup polyfill doesn't work with it +import * as fs from "fs"; +const fsPromises = fs.promises; + export interface WebSocketStorageProviderOptions { logger?: Logger; } @@ -62,7 +65,7 @@ export class WebSocketStorageProvider implements StorageProvider { private services = new Map(); private logger: Logger; private ready = false; - private openHandles = new Set(); + private openHandles = new Set(); constructor( private readonly yagnaApi: YagnaApi, From 7d7b76770ed59bd6adc53509d60985fa6ad46276 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Thu, 10 Oct 2024 11:06:24 +0200 Subject: [PATCH 6/7] refactor: added debug logs to the gftp storage provider --- src/shared/storage/ws.ts | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/shared/storage/ws.ts b/src/shared/storage/ws.ts index 065839db8..2988771b6 100644 --- a/src/shared/storage/ws.ts +++ b/src/shared/storage/ws.ts @@ -69,9 +69,9 @@ export class WebSocketStorageProvider implements StorageProvider { constructor( private readonly yagnaApi: YagnaApi, - private readonly options?: WebSocketStorageProviderOptions, + options?: WebSocketStorageProviderOptions, ) { - this.logger = options?.logger || defaultLogger("storage"); + this.logger = options?.logger?.child("storage") || defaultLogger("storage"); } async close(): Promise { @@ -95,6 +95,9 @@ export class WebSocketStorageProvider implements StorageProvider { return; } const req = toObject(event.data) as GsbRequestPublishUnion; + + this.logger.debug("Received GFTP request for publishData", req); + if (req.component === "GetMetadata") { this.respond(ws, req.id, { fileSize: data.byteLength }); } else if (req.component === "GetChunk") { @@ -133,6 +136,8 @@ export class WebSocketStorageProvider implements StorageProvider { const req = toObject(event.data) as GsbRequestPublishUnion; + this.logger.debug("Received GFTP request for publishFile", req); + if (req.component === "GetMetadata") { this.respond(ws, req.id, { fileSize }); } else if (req.component === "GetChunk") { @@ -172,7 +177,11 @@ export class WebSocketStorageProvider implements StorageProvider { this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); return; } + const req = toObject(event.data) as GsbRequestReceiveUnion; + + this.logger.debug("Received GFTP request for receiveData", req); + if (req.component === "UploadChunk") { data.push(req.payload.chunk); this.respond(ws, req.id, null); @@ -206,6 +215,9 @@ export class WebSocketStorageProvider implements StorageProvider { return; } const req = toObject(event.data) as GsbRequestReceiveUnion; + + this.logger.debug("Received GFTP request for receiveFile", req); + if (req.component === "UploadChunk") { await fileHandle.write(req.payload.chunk.content); this.respond(ws, req.id, null); From 642a7eaf79a383074efe1c5d2e4f1dbed427c0b4 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Thu, 10 Oct 2024 11:40:33 +0200 Subject: [PATCH 7/7] chore: use proper error class, remove premature closing of fd, fix test logger behavior --- src/shared/storage/ws.test.ts | 18 ++++++++++++------ src/shared/storage/ws.ts | 14 +++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/shared/storage/ws.test.ts b/src/shared/storage/ws.test.ts index ab7e47c9a..a88533908 100644 --- a/src/shared/storage/ws.test.ts +++ b/src/shared/storage/ws.test.ts @@ -1,11 +1,11 @@ // TODO: improve mocks - remove as any /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Logger, nullLogger, WebSocketStorageProvider, YagnaApi } from "../../index"; +import { Logger, WebSocketStorageProvider, YagnaApi } from "../../index"; // .js added for ESM compatibility import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; import { GsbApi, IdentityApi } from "ya-ts-client"; -import { anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; +import { _, anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; import WebSocket from "ws"; import * as fs from "fs"; @@ -64,9 +64,13 @@ describe("WebSocketStorageProvider", () => { }); it("should use provided logger", () => { - const logger = nullLogger(); - const provider = new WebSocketStorageProvider(yagnaApi, { logger }); - expect(provider["logger"]).toBe(logger); + const mockLogger = imock(); + const mockLoggerChild = imock(); + const mockLoggerChildInstance = instance(mockLoggerChild); + when(mockLogger.child(_)).thenReturn(mockLoggerChildInstance); + const provider = new WebSocketStorageProvider(yagnaApi, { logger: instance(mockLogger) }); + expect(provider["logger"]).toBe(mockLoggerChildInstance); + verify(mockLogger.child("storage")).once(); }); }); @@ -199,7 +203,7 @@ describe("WebSocketStorageProvider", () => { }); it("should read the file and upload it", async () => { - expect.assertions(9); + expect.assertions(10); const result = await provider["publishFile"]("./file.txt"); expect(result).toBe(fileInfo.url); expect(provider["createSocket"]).toHaveBeenCalledWith(fileInfo, ["GetMetadata", "GetChunk"]); @@ -260,6 +264,8 @@ describe("WebSocketStorageProvider", () => { await sendGetChunk([10, 11, 12, 13], 0, "2"); await sendGetChunk([14, 15, 16, 17], 4, "3"); await sendGetChunk([18, 19], 8, "4"); + expect(fileHandle.close).toHaveBeenCalledTimes(0); + await provider.close(); expect(fileHandle.close).toHaveBeenCalledTimes(1); }); }); diff --git a/src/shared/storage/ws.ts b/src/shared/storage/ws.ts index 2988771b6..bab98054e 100644 --- a/src/shared/storage/ws.ts +++ b/src/shared/storage/ws.ts @@ -4,7 +4,7 @@ import { v4 } from "uuid"; import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; import { defaultLogger, isBrowser, Logger, YagnaApi } from "../utils"; -import { GolemInternalError } from "../error/golem-error"; +import { GolemInternalError, GolemUserError } from "../error/golem-error"; import WebSocket from "ws"; // FIXME: cannot import fs/promises because the rollup polyfill doesn't work with it @@ -115,7 +115,7 @@ export class WebSocketStorageProvider implements StorageProvider { async publishFile(src: string): Promise { if (isBrowser) { - throw new GolemInternalError("Cannot publish files in browser context, did you mean to use `publishData()`?"); + throw new GolemUserError("Cannot publish files in browser context, did you mean to use `publishData()`?"); } this.logger.info("Preparing file upload", { sourcePath: src }); @@ -152,12 +152,8 @@ export class WebSocketStorageProvider implements StorageProvider { content: chunk, offset, }); - } finally { - // After the last chunk, close the file descriptor - if (offset + chunkSize >= fileSize) { - await fileHandle.close(); - this.openHandles.delete(fileHandle); - } + } catch (error) { + this.logger.error("Something went wrong while sending the file chunk", { error }); } } else { this.logger.error(`Unsupported message in publishFile(): ${(req as GsbRequest).component}`); @@ -199,7 +195,7 @@ export class WebSocketStorageProvider implements StorageProvider { async receiveFile(path: string): Promise { if (isBrowser) { - throw new GolemInternalError("Cannot receive files in browser context, did you mean to use `receiveData()`?"); + throw new GolemUserError("Cannot receive files in browser context, did you mean to use `receiveData()`?"); } this.logger.info("Preparing file download", { destination: path });