diff --git a/examples/web/image.html b/examples/web/image.html index 71c64b3fd..fce637879 100644 --- a/examples/web/image.html +++ b/examples/web/image.html @@ -6,142 +6,133 @@ -

WebRequestor - Meme Example

-
-
-

Credentials

-
-
- - -
-
- - -
-
- - -
-
- - -
-
- - -
-
- - -
- +

WebRequestor - Meme Example

+
+
+

Credentials

+
+
+ + +
+
+ + +
+
+ +
-

Actions

-
-
- -
+
+ +
-
-

Result Meme

- +
+ +
-
-
-

Logs

-
    +

    Actions

    +
    +
    +
    +
    +

    Result Meme

    + +
    +
    +
    +
    +

    Logs

    +
      +
      +
      - + await executor.end(); + setResponse(result.data); + } + window.run = run; + diff --git a/examples/web/imagemagick.Dockerfile b/examples/web/imagemagick.Dockerfile new file mode 100644 index 000000000..14ca2a446 --- /dev/null +++ b/examples/web/imagemagick.Dockerfile @@ -0,0 +1,3 @@ +FROM dpokidov/imagemagick +VOLUME /golem/input /golem/output +WORKDIR /golem/work \ No newline at end of file diff --git a/package.json b/package.json index 7ac198d4f..20726c9dc 100644 --- a/package.json +++ b/package.json @@ -40,9 +40,13 @@ "bottleneck": "^2.19.5", "collect.js": "^4.34.3", "eventsource": "^2.0.2", + "flatbuffers": "^23.5.26", "ip-num": "^1.4.1", + "js-sha3": "^0.8.0", + "p-limit": "^4.0.0", "pino": "^8.11.0", "pino-pretty": "^10.0.1", + "uuid": "^9.0.0", "ya-ts-client": "^0.5.3" }, "devDependencies": { @@ -54,6 +58,7 @@ "@rollup/plugin-terser": "^0.4.0", "@types/mocha": "^10.0.0", "@types/node": "^20.4.2", + "@types/uuid": "^9.0.2", "@typescript-eslint/eslint-plugin": "^6.0.0", "@typescript-eslint/parser": "^6.0.0", "buffer": "^6.0.3", diff --git a/yajsapi/executor/executor.ts b/yajsapi/executor/executor.ts index 7a7ebc5a3..67f1ce43f 100644 --- a/yajsapi/executor/executor.ts +++ b/yajsapi/executor/executor.ts @@ -6,7 +6,7 @@ import { PaymentService, PaymentOptions } from "../payment/index.js"; import { NetworkService } from "../network/index.js"; import { ActivityOptions, Result } from "../activity/index.js"; import { sleep, Logger, runtimeContextChecker } from "../utils/index.js"; -import { StorageProvider, GftpStorageProvider, NullStorageProvider } from "../storage/index.js"; +import { StorageProvider, GftpStorageProvider, NullStorageProvider, WebSocketBrowserStorageProvider } from "../storage/index.js"; import { ExecutorConfig } from "./config.js"; import { Events } from "../events/index.js"; import { StatsService } from "../stats/service.js"; @@ -135,9 +135,24 @@ export class TaskExecutor { this.paymentService = new PaymentService(this.options); this.marketService = new MarketService(this.agreementPoolService, this.options); this.networkService = this.options.networkIp ? new NetworkService(this.options) : undefined; - this.storageProvider = runtimeContextChecker.isNode - ? this.configOptions.storageProvider ?? new GftpStorageProvider(this.logger) - : new NullStorageProvider(); + + // Initialize storage provider. + if (this.configOptions.storageProvider) { + this.storageProvider = this.configOptions.storageProvider; + } else if (runtimeContextChecker.isNode) { + this.storageProvider = new GftpStorageProvider(this.logger); + } else if (runtimeContextChecker.isBrowser) { + this.storageProvider = new WebSocketBrowserStorageProvider({ + yagnaOptions: { + apiKey: this.options.yagnaOptions.apiKey, + basePath: this.options.yagnaOptions.basePath, + }, + logger: this.logger, + }) + } else { + this.storageProvider = new NullStorageProvider(); + } + this.taskService = new TaskService( this.taskQueue, this.agreementPoolService, diff --git a/yajsapi/index.ts b/yajsapi/index.ts index 82f23c584..8730ba63b 100755 --- a/yajsapi/index.ts +++ b/yajsapi/index.ts @@ -1,5 +1,11 @@ export { TaskExecutor, ExecutorOptions } from "./executor/index.js"; -export { StorageProvider, GftpStorageProvider } from "./storage/index.js"; +export { + StorageProvider, + GftpStorageProvider, + NullStorageProvider, + WebSocketBrowserStorageProvider, + WebSocketStorageProviderOptions +} from "./storage/index.js"; export { ActivityStateEnum, Result } from "./activity/index.js"; export { AgreementCandidate, AgreementSelectors } from "./agreement/index.js"; export { ProposalFilters, ProposalDTO } from "./market/index.js"; diff --git a/yajsapi/storage/index.ts b/yajsapi/storage/index.ts index e89c75c64..0a17499e6 100644 --- a/yajsapi/storage/index.ts +++ b/yajsapi/storage/index.ts @@ -1,3 +1,4 @@ export { StorageProvider } from './provider.js'; export { GftpStorageProvider } from './gftp.js'; export { NullStorageProvider } from './null.js'; +export { WebSocketBrowserStorageProvider, WebSocketStorageProviderOptions } from './ws-browser.js'; \ No newline at end of file diff --git a/yajsapi/storage/ws-browser.ts b/yajsapi/storage/ws-browser.ts new file mode 100644 index 000000000..55ce02f73 --- /dev/null +++ b/yajsapi/storage/ws-browser.ts @@ -0,0 +1,231 @@ +import { StorageProvider, StorageProviderDataCallback } from "./provider"; +import { v4 } from "uuid"; +import { encode, toObject } from "flatbuffers/js/flexbuffers.js"; +import { getIdentity } from "../network/identity.js"; +import * as jsSha3 from "js-sha3"; +import { Logger, nullLogger } from "../utils/index.js"; + + +export interface WebSocketStorageProviderOptions { + yagnaOptions: { + apiKey: string; + basePath: string; + }; + logger?: Logger +} + +interface GsbRequest { + id: string; + component: string; + payload: T; +} + +interface GetMetadataRequest extends GsbRequest { + component: "GetMetadata"; +} + +interface GetChunkRequest extends GsbRequest<{ offset: number, size: number }> { + component: "GetChunk"; +} + +type UploadChunkChunk = { offset: number, content: Uint8Array } +type UploadChunkPayload = { + chunk: UploadChunkChunk; +}; + +interface UploadChunkRequest extends GsbRequest { + component: "UploadChunk"; +} + +interface UploadFinishedRequest extends GsbRequest<{ hash: string }> { + component: "UploadFinished" +} + +type GsbRequestPublishUnion = GetMetadataRequest | GetChunkRequest; +type GsbRequestReceiveUnion = UploadFinishedRequest | UploadChunkRequest; + +type ServiceInfo = { + url: URL; + serviceId: string; +} + +type GftpFileInfo = { + id: string; + url: string; +} + +/** + * Storage provider that uses GFTP over WebSockets. + */ +export class WebSocketBrowserStorageProvider implements StorageProvider { + /** + * Map of open services (IDs) indexed by GFTP url. + */ + private services = new Map(); + private logger: Logger; + + constructor(private readonly options: WebSocketStorageProviderOptions) { + this.logger = options.logger ?? nullLogger(); + } + + close(): Promise { + return this.release(Array.from(this.services.keys())); + } + + init(): Promise { + return Promise.resolve(undefined); + } + + async publishData(data: Uint8Array): Promise { + const fileInfo = await this.createFileInfo(); + + const ws = await this.createSocket(fileInfo, ["GetMetadata", "GetChunk"]); + ws.addEventListener("message", (event) => { + const req = toObject(event.data) as GsbRequestPublishUnion; + if (req.component === "GetMetadata") { + this.respond(ws, req.id, { fileSize: data.byteLength }); + } else if (req.component === "GetChunk") { + this.respond(ws, req.id, { + content: data.slice(req.payload.offset, req.payload.offset + req.payload.size), + offset: req.payload.offset, + }); + } else { + this.logger.warn(`[WebSocketBrowserStorageProvider] Unsupported message in publishData(): ${(req as GsbRequest).component}`); + } + }); + + return fileInfo.url; + } + + + publishFile(): Promise { + throw new Error("Not implemented"); + } + + async receiveData(callback: StorageProviderDataCallback): Promise { + const data: UploadChunkChunk[] = []; + const fileInfo = await this.createFileInfo(); + + const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]); + ws.addEventListener("message", (event) => { + const req = toObject(event.data) as GsbRequestReceiveUnion; + if (req.component === "UploadChunk") { + data.push(req.payload.chunk); + this.respond(ws, req.id, null); + } else if (req.component === "UploadFinished") { + this.respond(ws, req.id, null); + const result = this.completeReceive(req.payload.hash, data); + callback(result); + } else { + this.logger.warn(`[WebSocketBrowserStorageProvider] Unsupported message in receiveData(): ${(req as GsbRequest).component}`); + } + }); + + return fileInfo.url; + } + + receiveFile(): Promise { + throw new Error("Not implemented"); + } + + async release(urls: string[]): Promise { + urls.forEach((url) => { + const serviceId = this.services.get(url); + if (serviceId) { + this.deleteService(serviceId).catch(e => this.logger.warn(`[WebSocketBrowserStorageProvider] Failed to delete service ${serviceId}: ${e}`)); + } + this.services.delete(url); + }); + } + + private async createFileInfo(): Promise { + const id = v4(); + const me = await getIdentity({ + yagnaOptions: this.options.yagnaOptions + }); + + return { + id, + url: `gftp://${me}/${id}` + }; + } + + private async createSocket(fileInfo: GftpFileInfo, components: string[]): Promise { + const service = await this.createService(fileInfo, components); + const ws = new WebSocket(service.url); // NOTE: protocol set to ["gsb+flexbuffers"] causes CORS error everywhere except Firefox + ws.addEventListener("error", () => { + this.logger.error(`[WebSocketBrowserStorageProvider] Socket Error (${fileInfo.id})`); + }); + ws.binaryType = "arraybuffer"; + return ws; + } + + private async createService(fileInfo: GftpFileInfo, components: string[]): Promise { + const yagnaOptions = this.options.yagnaOptions; + const resp = await fetch(new URL("/gsb-api/v1/services", yagnaOptions.basePath), { + method: "POST", + headers: { + "Authorization": `Bearer ${yagnaOptions.apiKey}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + listen: { + on: `/public/gftp/${fileInfo.id}`, + components + }, + }), + }); + + if (resp.status !== 201) { + throw new Error(`Invalid response: ${resp.status}`); + } + + const body = await resp.json(); + const messages_link = `/gsb-api/v1/services/${body.servicesId}?authToken=${yagnaOptions.apiKey}`; + const url = new URL(messages_link, this.options.yagnaOptions.basePath); + url.protocol = "ws:"; + this.services.set(fileInfo.url, body.servicesId); + + return { url, serviceId: body.servicesId }; + } + + private async deleteService(id: string): Promise { + const yagnaOptions = this.options.yagnaOptions; + const resp = await fetch(new URL(`/gsb-api/v1/services/${id}`, yagnaOptions.basePath), { + method: "DELETE", + headers: { + "Authorization": `Bearer ${yagnaOptions.apiKey}`, + "Content-Type": "application/json", + }, + }); + + if (resp.status !== 200) { + throw new Error(`Invalid response: ${resp.status}`); + } + } + + private respond(ws: WebSocket, id: string, payload: unknown) { + ws.send(encode({ + id, + payload + })); + } + + private completeReceive(hash: string, data: UploadChunkChunk[]): Uint8Array { + data.sort((a, b) => a.offset - b.offset); + const size = data.reduce((acc, cur) => acc + cur.content.byteLength, 0); + const buf = new Uint8Array(size); + data.forEach((chunk) => { + buf.set(chunk.content, chunk.offset); + }); + + // FIXME: Use digest.update and async, as it can only handle 14MB/s on my machine, which is way to slow to do synchronously. + const hashHex = jsSha3.sha3_256(buf); + + if (hash !== hashHex) { + throw new Error(`File corrupted, expected hash ${hash}, got ${hashHex}`); + } else { + return buf; + } + } +}