diff --git a/README.md b/README.md index ee1940224..ac0c1cd58 100644 --- a/README.md +++ b/README.md @@ -149,7 +149,7 @@ const order: MarketOrderSpec = { }, market: { // We're only going to rent the provider for 5 minutes max - rentHours: 5 / 60, + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 0.5, diff --git a/docs/UPGRADING.md b/docs/UPGRADING.md index 03d789653..db86a5b85 100644 --- a/docs/UPGRADING.md +++ b/docs/UPGRADING.md @@ -47,14 +47,14 @@ import { GolemNetwork } from "@golem-sdk/golem-js"; try { await glm.connect(); - const retnal = await glm.oneOf({ + const rental = await glm.oneOf({ order: { demand: { workload: { imageTag: "golem/alpine:latest" }, }, - // You have to be now explicit about about your terms and expectatios from the market + // You have to be now explicit about about your terms and expectations from the market market: { - rentHours: 5 / 60, + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 0.5, @@ -132,9 +132,9 @@ import { GolemNetwork } from "@golem-sdk/golem-js"; demand: { workload: { imageTag: "golem/alpine:latest" }, }, - // You have to be now explicit about about your terms and expectatios from the market + // You have to be now explicit about about your terms and expectations from the market market: { - rentHours: 5 / 60, + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 0.5, @@ -147,13 +147,13 @@ import { GolemNetwork } from "@golem-sdk/golem-js"; const inputs = [1, 2, 3, 4, 5]; - // You still take the necessary precaucions, pipeline your work and processing + // You still take the necessary precautions, pipeline your work and processing const results = await Promise.allSettled( inputs.map((input) => // 🌟🌟 You access rentals from the pool pool.withRental((rental) => rental - // 🌟🌟🌟 You issue the comands as in case of a single-provider scenario + // 🌟🌟🌟 You issue the commands as in case of a single-provider scenario .getExeUnit() .then((exe) => exe.run(`echo 'Hello ${input}`)) .then((res) => res.stdout), diff --git a/docs/USAGE.md b/docs/USAGE.md index 5289ac82b..76e412c62 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -21,8 +21,8 @@ const order: MarketOrderSpec = { workload: { imageTag: "golem/alpine:latest" }, }, market: { - // We're only going to rent the provider for 5 minutes max - rentHours: 5 / 60, + // We're only going to rent the provider for 15 minutes max + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 0.5, diff --git a/examples/rental-model/advanced/local-image/local-image.ts b/examples/rental-model/advanced/local-image/local-image.ts index 3c4503539..9acd4eadf 100644 --- a/examples/rental-model/advanced/local-image/local-image.ts +++ b/examples/rental-model/advanced/local-image/local-image.ts @@ -29,7 +29,7 @@ const getImagePath = (path: string) => new URL(path, import.meta.url).toString() }, }, market: { - rentHours: 5 / 60, + rentHours: 15 / 60, pricing: { model: "linear", maxStartPrice: 1, diff --git a/package-lock.json b/package-lock.json index 112a4f256..7bb707a90 100644 --- a/package-lock.json +++ b/package-lock.json @@ -47,7 +47,7 @@ "@types/eventsource": "^1.1.15", "@types/express": "^4.17.21", "@types/jest": "^29.5.12", - "@types/node": "^20.11.20", + "@types/node": "^18.19.55", "@types/semver": "^7.5.8", "@types/supertest": "^6.0.2", "@types/tmp": "^0.2.6", @@ -114,6 +114,15 @@ "node": ">=18.0.0" } }, + "examples/node_modules/@types/node": { + "version": "20.16.11", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.11.tgz", + "integrity": "sha512-y+cTCACu92FyA5fgQSAI8A1H429g7aSK2HsO7K4XYUWc4dY5IUz55JSDIYT6/VsOLfGy8vmvQYC2hfb0iF16Uw==", + "dev": true, + "dependencies": { + "undici-types": "~6.19.2" + } + }, "node_modules/@aashutoshrathi/word-wrap": { "version": "1.2.6", "dev": true, @@ -3789,9 +3798,9 @@ "license": "MIT" }, "node_modules/@types/node": { - "version": "20.16.11", - "resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.11.tgz", - "integrity": "sha512-y+cTCACu92FyA5fgQSAI8A1H429g7aSK2HsO7K4XYUWc4dY5IUz55JSDIYT6/VsOLfGy8vmvQYC2hfb0iF16Uw==", + "version": "18.19.55", + "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.55.tgz", + "integrity": "sha512-zzw5Vw52205Zr/nmErSEkN5FLqXPuKX/k5d1D7RKHATGqU7y6YfX9QxZraUzUrFGqH6XzOzG196BC35ltJC4Cw==", "dev": true, "dependencies": { "undici-types": "~6.19.2" diff --git a/package.json b/package.json index 1ade2c4b4..9ccc8c190 100644 --- a/package.json +++ b/package.json @@ -97,7 +97,7 @@ "@types/eventsource": "^1.1.15", "@types/express": "^4.17.21", "@types/jest": "^29.5.12", - "@types/node": "^20.11.20", + "@types/node": "^18.19.55", "@types/semver": "^7.5.8", "@types/supertest": "^6.0.2", "@types/tmp": "^0.2.6", diff --git a/src/golem-network/golem-network.test.ts b/src/golem-network/golem-network.test.ts index 09f6715fb..cdb9656f0 100644 --- a/src/golem-network/golem-network.test.ts +++ b/src/golem-network/golem-network.test.ts @@ -9,7 +9,7 @@ import { MarketApiAdapter, PaymentApiAdapter } from "../shared/yagna"; import { ActivityApiAdapter } from "../shared/yagna/adapters/activity-api-adapter"; import { GolemNetwork, MarketOrderSpec } from "./golem-network"; import { _, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito"; -import { GftpStorageProvider } from "../shared/storage"; +import { WebSocketStorageProvider } from "../shared/storage"; const order: MarketOrderSpec = Object.freeze({ demand: { @@ -34,7 +34,7 @@ const mockYagna = mock(YagnaApi); const mockPaymentApi = mock(PaymentApiAdapter); const mockActivityApi = mock(ActivityApiAdapter); const mockMarketApi = mock(MarketApiAdapter); -const mockStorageProvider = mock(GftpStorageProvider); +const mockStorageProvider = mock(WebSocketStorageProvider); afterEach(() => { reset(mockYagna); diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index acdb784db..13955f4ba 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -1,4 +1,4 @@ -import { anyAbortSignal, createAbortSignalFromTimeout, defaultLogger, isNode, Logger, YagnaApi } from "../shared/utils"; +import { anyAbortSignal, createAbortSignalFromTimeout, defaultLogger, Logger, YagnaApi } from "../shared/utils"; import { Demand, DraftOfferProposalPool, @@ -29,17 +29,13 @@ import { ProposalRepository } from "../shared/yagna/repository/proposal-reposito import { CacheService } from "../shared/cache/CacheService"; import { DemandRepository } from "../shared/yagna/repository/demand-repository"; import { IDemandRepository, OrderDemandOptions } from "../market/demand"; -import { GftpServerAdapter } from "../shared/storage/GftpServerAdapter"; -import { - GftpStorageProvider, - NullStorageProvider, - StorageProvider, - WebSocketBrowserStorageProvider, -} from "../shared/storage"; +import { StorageServerAdapter } from "../shared/storage/StorageServerAdapter"; +import { GftpStorageProvider, NullStorageProvider, StorageProvider, WebSocketStorageProvider } from "../shared/storage"; import { DataTransferProtocol } from "../shared/types"; import { NetworkApiAdapter } from "../shared/yagna/adapters/network-api-adapter"; import { IProposalRepository } from "../market/proposal"; import { Subscription } from "rxjs"; +import { GolemConfigError } from "../shared/error/golem-error"; /** * Instance of an object or a factory function that you can call `new` on. @@ -105,7 +101,7 @@ export interface GolemNetworkOptions { /** * Set the data transfer protocol to use for file transfers. - * Default is `gftp`. + * Default is `ws`. */ dataTransferProtocol?: DataTransferProtocol; @@ -228,7 +224,7 @@ export class GolemNetwork { constructor(options: Partial = {}) { const optDefaults: GolemNetworkOptions = { - dataTransferProtocol: isNode ? "gftp" : "ws", + dataTransferProtocol: "ws", }; this.options = { @@ -285,7 +281,7 @@ export class GolemNetwork { this.options.override?.marketApi || new MarketApiAdapter(this.yagna, agreementRepository, proposalRepository, demandRepository, this.logger), networkApi: this.options.override?.networkApi || new NetworkApiAdapter(this.yagna), - fileServer: this.options.override?.fileServer || new GftpServerAdapter(this.storageProvider), + fileServer: this.options.override?.fileServer || new StorageServerAdapter(this.storageProvider), }; this.network = getFactory(NetworkModuleImpl, this.options.override?.network)(this.services); this.market = getFactory(MarketModuleImpl, this.options.override?.market)( @@ -655,13 +651,16 @@ export class GolemNetwork { private createStorageProvider(): StorageProvider { if (typeof this.options.dataTransferProtocol === "string") { switch (this.options.dataTransferProtocol) { + case "gftp": + return new GftpStorageProvider(this.logger); case "ws": - return new WebSocketBrowserStorageProvider(this.yagna, { + return new WebSocketStorageProvider(this.yagna, { logger: this.logger, }); - case "gftp": default: - return new GftpStorageProvider(this.logger); + throw new GolemConfigError( + `Unsupported data transfer protocol ${this.options.dataTransferProtocol}. Supported protocols are "gftp" and "ws"`, + ); } } else if (this.options.dataTransferProtocol !== undefined) { return this.options.dataTransferProtocol; diff --git a/src/shared/storage/GftpServerAdapter.ts b/src/shared/storage/StorageServerAdapter.ts similarity index 80% rename from src/shared/storage/GftpServerAdapter.ts rename to src/shared/storage/StorageServerAdapter.ts index 84816bded..f499ab95e 100644 --- a/src/shared/storage/GftpServerAdapter.ts +++ b/src/shared/storage/StorageServerAdapter.ts @@ -5,9 +5,10 @@ import fs from "fs"; import jsSha3 from "js-sha3"; /** - * This class provides GFTP based implementation of the IFileServer interface used in the SDK + * IFileServer implementation that uses any StorageProvider to serve files. + * Make sure that the storage provider implements the `.publishFile()` method. */ -export class GftpServerAdapter implements IFileServer { +class StorageServerAdapter implements IFileServer { private published = new Map(); constructor(private readonly storage: StorageProvider) {} @@ -57,3 +58,12 @@ export class GftpServerAdapter implements IFileServer { }); } } + +/** + * @deprecated Use StorageServerAdapter instead. This will be removed in the next major version. + * + * This class provides GFTP based implementation of the IFileServer interface used in the SDK + */ +class GftpServerAdapter extends StorageServerAdapter {} + +export { GftpServerAdapter, StorageServerAdapter }; diff --git a/src/shared/storage/default.ts b/src/shared/storage/default.ts index bf9f75ad8..ec14c57d8 100644 --- a/src/shared/storage/default.ts +++ b/src/shared/storage/default.ts @@ -1,16 +1,8 @@ -import { GftpStorageProvider } from "./gftp"; -import { WebSocketBrowserStorageProvider } from "./ws-browser"; -import { NullStorageProvider } from "./null"; -import { Logger, YagnaApi, isNode, isBrowser } from "../utils"; +import { WebSocketStorageProvider } from "./ws"; +import { Logger, YagnaApi } from "../utils"; export function createDefaultStorageProvider(yagnaApi: YagnaApi, logger?: Logger) { - if (isNode) { - return new GftpStorageProvider(logger?.child("storage")); - } - if (isBrowser) { - return new WebSocketBrowserStorageProvider(yagnaApi, { - logger: logger?.child("storage"), - }); - } - return new NullStorageProvider(); + return new WebSocketStorageProvider(yagnaApi, { + logger: logger?.child("storage"), + }); } diff --git a/src/shared/storage/gftp.ts b/src/shared/storage/gftp.ts index 8efdc1781..8c7e1f731 100644 --- a/src/shared/storage/gftp.ts +++ b/src/shared/storage/gftp.ts @@ -8,6 +8,11 @@ import { GolemInternalError, GolemUserError } from "../error/golem-error"; import { v4 } from "uuid"; import AsyncLock from "async-lock"; +/** + * @deprecated Use WebSocketStorageProvider instead. This will be removed in the next major version. + * + * Storage provider that spawns a GFTP process and uses it to serve files. + */ export class GftpStorageProvider implements StorageProvider { private gftpServerProcess?: ChildProcess; private logger: Logger; diff --git a/src/shared/storage/index.ts b/src/shared/storage/index.ts index 51d7ba66f..2cb0e7b3d 100644 --- a/src/shared/storage/index.ts +++ b/src/shared/storage/index.ts @@ -1,5 +1,5 @@ export { StorageProvider } from "./provider"; export { GftpStorageProvider } from "./gftp"; export { NullStorageProvider } from "./null"; -export { WebSocketBrowserStorageProvider, WebSocketStorageProviderOptions } from "./ws-browser"; +export { WebSocketStorageProvider, WebSocketStorageProviderOptions } from "./ws"; export { createDefaultStorageProvider } from "./default"; diff --git a/src/shared/storage/ws-browser.test.ts b/src/shared/storage/ws.test.ts similarity index 67% rename from src/shared/storage/ws-browser.test.ts rename to src/shared/storage/ws.test.ts index 1d30cc987..a88533908 100644 --- a/src/shared/storage/ws-browser.test.ts +++ b/src/shared/storage/ws.test.ts @@ -1,13 +1,19 @@ // TODO: improve mocks - remove as any /* eslint-disable @typescript-eslint/no-explicit-any */ -import { GolemInternalError, Logger, nullLogger, WebSocketBrowserStorageProvider, YagnaApi } from "../../index"; +import { Logger, WebSocketStorageProvider, YagnaApi } from "../../index"; // .js added for ESM compatibility import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; import { GsbApi, IdentityApi } from "ya-ts-client"; -import { anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; +import { _, anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; +import WebSocket from "ws"; +import * as fs from "fs"; jest.mock("uuid", () => ({ v4: () => "uuid" })); +jest.mock("fs", () => ({ + promises: {}, +})); +const mockFs = fs as jest.Mocked; type UploadChunkChunk = { offset: number; content: Uint8Array }; @@ -18,12 +24,12 @@ const logger = imock(); const yagnaApi = instance(mockYagna); const TEST_IDENTITY = "0x19ee20228a4c4bf8d4aebc79d9d3af2a01433456"; -describe("WebSocketBrowserStorageProvider", () => { +describe("WebSocketStorageProvider", () => { const createProvider = () => - new WebSocketBrowserStorageProvider(yagnaApi, { + new WebSocketStorageProvider(yagnaApi, { logger: instance(logger), }); - let provider: WebSocketBrowserStorageProvider; + let provider: WebSocketStorageProvider; beforeEach(() => { provider = createProvider(); @@ -53,14 +59,18 @@ describe("WebSocketBrowserStorageProvider", () => { describe("constructor", () => { it("should create default logger", () => { - const provider = new WebSocketBrowserStorageProvider(yagnaApi, {}); + const provider = new WebSocketStorageProvider(yagnaApi, {}); expect(provider["logger"]).toBeDefined(); }); it("should use provided logger", () => { - const logger = nullLogger(); - const provider = new WebSocketBrowserStorageProvider(yagnaApi, { logger }); - expect(provider["logger"]).toBe(logger); + const mockLogger = imock(); + const mockLoggerChild = imock(); + const mockLoggerChildInstance = instance(mockLoggerChild); + when(mockLogger.child(_)).thenReturn(mockLoggerChildInstance); + const provider = new WebSocketStorageProvider(yagnaApi, { logger: instance(mockLogger) }); + expect(provider["logger"]).toBe(mockLoggerChildInstance); + verify(mockLogger.child("storage")).once(); }); }); @@ -171,8 +181,92 @@ describe("WebSocketBrowserStorageProvider", () => { }); describe("publishFile()", () => { - it("should fail", async () => { - await expect(() => provider.publishFile()).rejects.toMatchError(new GolemInternalError("Not implemented")); + let socket: EventTarget & { send: jest.Mock }; + let fileInfo: { id: string; url: string }; + let fileHandle: fs.promises.FileHandle; + + beforeEach(() => { + socket = Object.assign(new EventTarget(), { send: jest.fn() }); + fileInfo = { + id: "10", + url: "http://localhost:8080", + }; + + jest.spyOn(provider as any, "createFileInfo").mockImplementation(() => Promise.resolve(fileInfo)); + jest.spyOn(provider as any, "createSocket").mockImplementation(() => Promise.resolve(socket)); + mockFs.promises.stat = jest.fn().mockResolvedValue({ size: 10 } as unknown as fs.Stats); + fileHandle = { + read: jest.fn(), + close: jest.fn(), + } as unknown as jest.Mocked; + mockFs.promises.open = jest.fn().mockResolvedValue(fileHandle); + }); + + it("should read the file and upload it", async () => { + expect.assertions(10); + const result = await provider["publishFile"]("./file.txt"); + expect(result).toBe(fileInfo.url); + expect(provider["createSocket"]).toHaveBeenCalledWith(fileInfo, ["GetMetadata", "GetChunk"]); + expect(mockFs.promises.stat).toHaveBeenCalledWith("./file.txt"); + expect(mockFs.promises.open).toHaveBeenCalledWith("./file.txt", "r"); + + async function sendGetChunk(chunk: number[], offset: number, id: string) { + fileHandle.read = jest.fn().mockImplementationOnce((buffer: Buffer) => { + for (let i = 0; i < chunk.length; i++) { + buffer.writeUInt8(chunk[i], i); + } + }); + socket.dispatchEvent( + new MessageEvent("message", { + data: encode({ + id, + component: "GetChunk", + payload: { + offset, + size: chunk.length, + }, + }).buffer, + }), + ); + await new Promise(setImmediate); + const expectedBuffer = Buffer.alloc(chunk.length); + for (let i = 0; i < chunk.length; i++) { + expectedBuffer.writeUInt8(chunk[i], i); + } + expect(socket.send).toHaveBeenLastCalledWith( + encode({ + id, + payload: { + content: expectedBuffer, + offset, + }, + }), + ); + } + + socket.dispatchEvent( + new MessageEvent("message", { + data: encode({ + id: "1", + component: "GetMetadata", + }).buffer, + }), + ); + expect(socket.send).toHaveBeenCalledWith( + encode({ + id: "1", + payload: { + fileSize: 10, + }, + }), + ); + + await sendGetChunk([10, 11, 12, 13], 0, "2"); + await sendGetChunk([14, 15, 16, 17], 4, "3"); + await sendGetChunk([18, 19], 8, "4"); + expect(fileHandle.close).toHaveBeenCalledTimes(0); + await provider.close(); + expect(fileHandle.close).toHaveBeenCalledTimes(1); }); }); @@ -263,8 +357,74 @@ describe("WebSocketBrowserStorageProvider", () => { }); describe("receiveFile()", () => { - it("should fail", async () => { - await expect(() => provider.receiveFile()).rejects.toMatchError(new GolemInternalError("Not implemented")); + let socket: EventTarget & { send: jest.Mock }; + let fileInfo: { id: string; url: string }; + let fileHandle: fs.promises.FileHandle; + + beforeEach(async () => { + socket = Object.assign(new EventTarget(), { send: jest.fn() }); + fileInfo = { + id: "10", + url: "http://localhost:8080", + }; + + jest.spyOn(provider as any, "createFileInfo").mockImplementation(() => Promise.resolve(fileInfo)); + jest.spyOn(provider as any, "createSocket").mockImplementation(() => Promise.resolve(socket)); + fileHandle = { + write: jest.fn(), + close: jest.fn(), + } as unknown as jest.Mocked; + mockFs.promises.open = jest.fn().mockResolvedValue(fileHandle); + }); + + it("should receive the file and write it to the disc", async () => { + expect.assertions(10); + const result = await provider["receiveFile"]("./file.txt"); + expect(result).toBe(fileInfo.url); + expect(provider["createSocket"]).toHaveBeenCalledWith(fileInfo, ["UploadChunk", "UploadFinished"]); + expect(mockFs.promises.open).toHaveBeenCalledWith("./file.txt", "w"); + + async function sendUploadChunk(chunk: number[], id: string) { + const expectedBuffer = Buffer.alloc(chunk.length); + for (let i = 0; i < chunk.length; i++) { + expectedBuffer.writeUInt8(chunk[i], i); + } + socket.dispatchEvent( + new MessageEvent("message", { + data: encode({ + id, + component: "UploadChunk", + payload: { + chunk: { + content: expectedBuffer, + }, + }, + }).buffer, + }), + ); + await new Promise(setImmediate); + expect(fileHandle.write).toHaveBeenCalledWith(Uint8Array.from(expectedBuffer)); + expect(socket.send).toHaveBeenLastCalledWith( + encode({ + id, + payload: null, + }), + ); + } + + await sendUploadChunk([10, 11, 12, 13], "1"); + await sendUploadChunk([14, 15, 16, 17], "2"); + await sendUploadChunk([18, 19], "3"); + socket.dispatchEvent( + new MessageEvent("message", { + data: encode({ + id: "4", + component: "UploadFinished", + }).buffer, + }), + ); + await new Promise(setImmediate); + expect(fileHandle.close).toHaveBeenCalled(); }); }); diff --git a/src/shared/storage/ws-browser.ts b/src/shared/storage/ws.ts similarity index 54% rename from src/shared/storage/ws-browser.ts rename to src/shared/storage/ws.ts index fdf8115a9..bab98054e 100644 --- a/src/shared/storage/ws-browser.ts +++ b/src/shared/storage/ws.ts @@ -3,8 +3,13 @@ import { v4 } from "uuid"; // .js added for ESM compatibility import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; import * as jsSha3 from "js-sha3"; -import { Logger, nullLogger, YagnaApi } from "../utils"; -import { GolemInternalError } from "../error/golem-error"; +import { defaultLogger, isBrowser, Logger, YagnaApi } from "../utils"; +import { GolemInternalError, GolemUserError } from "../error/golem-error"; +import WebSocket from "ws"; + +// FIXME: cannot import fs/promises because the rollup polyfill doesn't work with it +import * as fs from "fs"; +const fsPromises = fs.promises; export interface WebSocketStorageProviderOptions { logger?: Logger; @@ -53,23 +58,25 @@ type GftpFileInfo = { /** * Storage provider that uses GFTP over WebSockets. */ -export class WebSocketBrowserStorageProvider implements StorageProvider { +export class WebSocketStorageProvider implements StorageProvider { /** * Map of open services (IDs) indexed by GFTP url. */ private services = new Map(); private logger: Logger; private ready = false; + private openHandles = new Set(); constructor( private readonly yagnaApi: YagnaApi, - private readonly options: WebSocketStorageProviderOptions, + options?: WebSocketStorageProviderOptions, ) { - this.logger = options.logger ?? nullLogger(); + this.logger = options?.logger?.child("storage") || defaultLogger("storage"); } - close(): Promise { + async close(): Promise { this.ready = false; + await Promise.allSettled(Array.from(this.openHandles).map((handle) => handle.close())); return this.release(Array.from(this.services.keys())); } @@ -83,7 +90,14 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { const ws = await this.createSocket(fileInfo, ["GetMetadata", "GetChunk"]); ws.addEventListener("message", (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } const req = toObject(event.data) as GsbRequestPublishUnion; + + this.logger.debug("Received GFTP request for publishData", req); + if (req.component === "GetMetadata") { this.respond(ws, req.id, { fileSize: data.byteLength }); } else if (req.component === "GetChunk") { @@ -92,19 +106,61 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { offset: req.payload.offset, }); } else { - this.logger.error( - `[WebSocketBrowserStorageProvider] Unsupported message in publishData(): ${ - (req as GsbRequest).component - }`, - ); + this.logger.error(`Unsupported message in publishData(): ${(req as GsbRequest).component}`); } }); return fileInfo.url; } - async publishFile(): Promise { - throw new GolemInternalError("Not implemented"); + async publishFile(src: string): Promise { + if (isBrowser) { + throw new GolemUserError("Cannot publish files in browser context, did you mean to use `publishData()`?"); + } + + this.logger.info("Preparing file upload", { sourcePath: src }); + + const fileInfo = await this.createFileInfo(); + const ws = await this.createSocket(fileInfo, ["GetMetadata", "GetChunk"]); + const fileStats = await fsPromises.stat(src); + const fileSize = fileStats.size; + + const fileHandle = await fsPromises.open(src, "r"); + this.openHandles.add(fileHandle); + + ws.addEventListener("message", async (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } + + const req = toObject(event.data) as GsbRequestPublishUnion; + + this.logger.debug("Received GFTP request for publishFile", req); + + if (req.component === "GetMetadata") { + this.respond(ws, req.id, { fileSize }); + } else if (req.component === "GetChunk") { + const { offset, size } = req.payload; + + const chunkSize = Math.min(size, fileSize - offset); + const chunk = Buffer.alloc(chunkSize); + + try { + await fileHandle.read(chunk, 0, chunkSize, offset); + this.respond(ws, req.id, { + content: chunk, + offset, + }); + } catch (error) { + this.logger.error("Something went wrong while sending the file chunk", { error }); + } + } else { + this.logger.error(`Unsupported message in publishFile(): ${(req as GsbRequest).component}`); + } + }); + + return fileInfo.url; } async receiveData(callback: StorageProviderDataCallback): Promise { @@ -113,7 +169,15 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]); ws.addEventListener("message", (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } + const req = toObject(event.data) as GsbRequestReceiveUnion; + + this.logger.debug("Received GFTP request for receiveData", req); + if (req.component === "UploadChunk") { data.push(req.payload.chunk); this.respond(ws, req.id, null); @@ -122,19 +186,47 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { const result = this.completeReceive(req.payload.hash, data); callback(result); } else { - this.logger.error( - `[WebSocketBrowserStorageProvider] Unsupported message in receiveData(): ${ - (req as GsbRequest).component - }`, - ); + this.logger.error(`Unsupported message in receiveData(): ${(req as GsbRequest).component}`); } }); return fileInfo.url; } - async receiveFile(): Promise { - throw new GolemInternalError("Not implemented"); + async receiveFile(path: string): Promise { + if (isBrowser) { + throw new GolemUserError("Cannot receive files in browser context, did you mean to use `receiveData()`?"); + } + + this.logger.info("Preparing file download", { destination: path }); + + const fileInfo = await this.createFileInfo(); + const fileHandle = await fsPromises.open(path, "w"); + this.openHandles.add(fileHandle); + const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]); + + ws.addEventListener("message", async (event) => { + if (!(event.data instanceof ArrayBuffer)) { + this.logger.error("Received non-ArrayBuffer data from the socket", { data: event.data }); + return; + } + const req = toObject(event.data) as GsbRequestReceiveUnion; + + this.logger.debug("Received GFTP request for receiveFile", req); + + if (req.component === "UploadChunk") { + await fileHandle.write(req.payload.chunk.content); + this.respond(ws, req.id, null); + } else if (req.component === "UploadFinished") { + this.respond(ws, req.id, null); + await fileHandle.close(); + this.openHandles.delete(fileHandle); + } else { + this.logger.error(`Unsupported message in receiveFile(): ${(req as GsbRequest).component}`); + } + }); + + return fileInfo.url; } async release(urls: string[]): Promise { @@ -142,7 +234,7 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { const serviceId = this.services.get(url); if (serviceId) { this.deleteService(serviceId).catch((error) => - this.logger.warn(`[WebSocketBrowserStorageProvider] Failed to delete service`, { serviceId, error }), + this.logger.warn(`Failed to delete service`, { serviceId, error }), ); } this.services.delete(url); @@ -168,7 +260,7 @@ export class WebSocketBrowserStorageProvider implements StorageProvider { const service = await this.createService(fileInfo, components); const ws = new WebSocket(service.url, ["gsb+flexbuffers"]); ws.addEventListener("error", () => { - this.logger.error(`[WebSocketBrowserStorageProvider] Socket Error (${fileInfo.id})`); + this.logger.error(`Socket Error (${fileInfo.id})`); }); ws.binaryType = "arraybuffer"; return ws;