Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JST-22 GFTP over websockets #521

Merged
merged 5 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/web/Dockerfile.imagemagick
mgordel marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM dpokidov/imagemagick
VOLUME /golem/input /golem/output
WORKDIR /golem/work
223 changes: 107 additions & 116 deletions examples/web/executor/image.html
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should now do a cypress integration test for this example. Previously, it was not much different from HelloWorld. Now we should test it end2end.

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
"bottleneck": "^2.19.5",
"collect.js": "^4.34.3",
"eventsource": "^2.0.2",
"flatbuffers": "^23.5.26",
"ip-num": "^1.4.1",
"js-sha3": "^0.8.0",
mgordel marked this conversation as resolved.
Show resolved Hide resolved
"p-limit": "^4.0.0",
"pino": "^8.11.0",
"pino-pretty": "^9.4.0",
"uuid": "^9.0.0",
"ya-ts-client": "^0.5.3"
},
"devDependencies": {
Expand All @@ -55,6 +58,7 @@
"@rollup/plugin-terser": "^0.4.0",
"@types/mocha": "^10.0.0",
"@types/node": "^18.16.18",
"@types/uuid": "^9.0.2",
"@typescript-eslint/eslint-plugin": "^5.42.1",
"@typescript-eslint/parser": "^5.42.1",
"buffer": "^6.0.3",
Expand Down
23 changes: 19 additions & 4 deletions yajsapi/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { PaymentService, PaymentOptions } from "../payment/index.js";
import { NetworkService } from "../network/index.js";
import { ActivityOptions, Result } from "../activity/index.js";
import { sleep, Logger, runtimeContextChecker } from "../utils/index.js";
import { StorageProvider, GftpStorageProvider, NullStorageProvider } from "../storage/index.js";
import { StorageProvider, GftpStorageProvider, NullStorageProvider, WebSocketBrowserStorageProvider } from "../storage/index.js";
import { ExecutorConfig } from "./config.js";
import { Events } from "../events/index.js";
import { StatsService } from "../stats/service.js";
Expand Down Expand Up @@ -138,9 +138,24 @@ export class TaskExecutor {
this.paymentService = new PaymentService(this.options);
this.marketService = new MarketService(this.agreementPoolService, this.options);
this.networkService = this.options.networkIp ? new NetworkService(this.options) : undefined;
this.storageProvider = runtimeContextChecker.isNode
? this.configOptions.storageProvider ?? new GftpStorageProvider(this.logger)
: new NullStorageProvider();

// Initialize storage provider.
if (this.configOptions.storageProvider) {
this.storageProvider = this.configOptions.storageProvider;
} else if (runtimeContextChecker.isNode) {
this.storageProvider = new GftpStorageProvider(this.logger);
} else if (runtimeContextChecker.isBrowser) {
this.storageProvider = new WebSocketBrowserStorageProvider({
yagnaOptions: {
apiKey: this.options.yagnaOptions.apiKey,
basePath: this.options.yagnaOptions.basePath,
},
logger: this.logger,
})
} else {
this.storageProvider = new NullStorageProvider();
}

this.taskService = new TaskService(
this.taskQueue,
this.agreementPoolService,
Expand Down
8 changes: 7 additions & 1 deletion yajsapi/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
// High level API
export { TaskExecutor, ExecutorOptions } from "./executor/index.js";
export { StorageProvider, GftpStorageProvider } from "./storage/index.js";
export {
StorageProvider,
GftpStorageProvider,
NullStorageProvider,
WebSocketBrowserStorageProvider,
WebSocketStorageProviderOptions
} from "./storage/index.js";

// Mid level API
export { Activity, ActivityOptions, ActivityStateEnum, Result } from "./activity/index.js";
Expand Down
1 change: 1 addition & 0 deletions yajsapi/storage/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { StorageProvider } from './provider.js';
export { GftpStorageProvider } from './gftp.js';
export { NullStorageProvider } from './null.js';
export { WebSocketBrowserStorageProvider, WebSocketStorageProviderOptions } from './ws-browser.js';
234 changes: 234 additions & 0 deletions yajsapi/storage/ws-browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import { StorageProvider, StorageProviderDataCallback } from "./provider";
import { v4 } from "uuid";
import { encode, toObject } from "flatbuffers/js/flexbuffers.js";
import { getIdentity } from "../network/identity.js";
import * as jsSha3 from "js-sha3";
import { Logger, nullLogger } from "../utils/index.js";


export interface WebSocketStorageProviderOptions {
yagnaOptions: {
apiKey: string;
basePath: string;
};
logger?: Logger
}

interface GsbRequest<T> {
id: string;
component: string;
payload: T;
}

interface GetMetadataRequest extends GsbRequest<void> {
component: "GetMetadata";
}

interface GetChunkRequest extends GsbRequest<{ offset: number, size: number }> {
component: "GetChunk";
}

type UploadChunkChunk = { offset: number, content: Uint8Array }
type UploadChunkPayload = {
chunk: UploadChunkChunk;
};

interface UploadChunkRequest extends GsbRequest<UploadChunkPayload> {
component: "UploadChunk";
}

interface UploadFinishedRequest extends GsbRequest<{ hash: string }> {
component: "UploadFinished"
}

type GsbRequestPublishUnion = GetMetadataRequest | GetChunkRequest;
type GsbRequestReceiveUnion = UploadFinishedRequest | UploadChunkRequest;

type ServiceInfo = {
url: URL;
serviceId: string;
}

type GftpFileInfo = {
id: string;
url: string;
}

/**
* Storage provider that uses GFTP over WebSockets.
*/
export class WebSocketBrowserStorageProvider implements StorageProvider {
/**
* Map of open services (IDs) indexed by GFTP url.
*/
private services = new Map<string, string>();
private logger: Logger;

constructor(private readonly options: WebSocketStorageProviderOptions) {
this.logger = options.logger ?? nullLogger();
mgordel marked this conversation as resolved.
Show resolved Hide resolved
}

close(): Promise<void> {
return this.release(Array.from(this.services.keys()));
}

init(): Promise<void> {
return Promise.resolve(undefined);
}

async publishData(data: Uint8Array): Promise<string> {
const fileInfo = await this.createFileInfo();

const ws = await this.createSocket(fileInfo, ["GetMetadata", "GetChunk"]);
ws.addEventListener("message", (event) => {
const req = toObject(event.data) as GsbRequestPublishUnion;
if (req.component === "GetMetadata") {
this.respond(ws, req.id, { fileSize: data.byteLength });
} else if (req.component === "GetChunk") {
this.respond(ws, req.id, {
content: data.slice(req.payload.offset, req.payload.offset + req.payload.size),
offset: req.payload.offset,
});
} else {
this.logger.warn(`[WebSocketBrowserStorageProvider] Unsupported message in publishData(): ${(req as GsbRequest<void>).component}`);
}
});

return fileInfo.url;
}


// eslint-disable-next-line @typescript-eslint/no-unused-vars
mgordel marked this conversation as resolved.
Show resolved Hide resolved
publishFile(srcPath: string): Promise<string> {
throw new Error("Not implemented");
}

async receiveData(callback: StorageProviderDataCallback): Promise<string> {
const data: UploadChunkChunk[] = [];
const fileInfo = await this.createFileInfo();

const ws = await this.createSocket(fileInfo, ["UploadChunk", "UploadFinished"]);
ws.addEventListener("message", (event) => {
const req = toObject(event.data) as GsbRequestReceiveUnion;
if (req.component === "UploadChunk") {
data.push(req.payload.chunk);
this.respond(ws, req.id, null);
} else if (req.component === "UploadFinished") {
this.respond(ws, req.id, null);
const result = this.completeReceive(req.payload.hash, data);
callback(result);
} else {
this.logger.warn(`[WebSocketBrowserStorageProvider] Unsupported message in receiveData(): ${(req as GsbRequest<void>).component}`);
}
});

return fileInfo.url;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
mgordel marked this conversation as resolved.
Show resolved Hide resolved
receiveFile(destPath: string): Promise<string> {
throw new Error("Not implemented");
}

async release(urls: string[]): Promise<void> {
urls.forEach((url) => {
const serviceId = this.services.get(url);
if (serviceId) {
this.deleteService(serviceId).catch(e => this.logger.warn(`[WebSocketBrowserStorageProvider] Failed to delete service ${serviceId}: ${e}`));
}
this.services.delete(url)
});
}

private async createFileInfo(): Promise<GftpFileInfo> {
const id = v4();
const me = await getIdentity({
yagnaOptions: this.options.yagnaOptions
});

return {
id,
url: `gftp://${me}/${id}`
};
}

private async createSocket(fileInfo: GftpFileInfo, components: string[]): Promise<WebSocket> {
const service = await this.createService(fileInfo.id, components);
const ws = new WebSocket(service.url, ["gsb+flexbuffers"]);
ws.addEventListener("error", () => {
this.logger.error(`[WebSocketBrowserStorageProvider] Socket Error (${fileInfo.id})`);
});
ws.binaryType = "arraybuffer";
this.services.set(fileInfo.url, service.serviceId);
return ws;
}

private async createService(id: string, components: string[]): Promise<ServiceInfo> {
const yagnaOptions = this.options.yagnaOptions;
const resp = await fetch(new URL("/gsb-api/v1/services", yagnaOptions.basePath), {
method: "POST",
headers: {
"Authorization": `Bearer ${yagnaOptions.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
listen: {
on: `/public/gftp/${id}`,
components
},
}),
});

if (resp.status !== 201) {
throw new Error(`Invalid response: ${resp.status}`);
}

const body = await resp.json();
const messages_link = `/gsb-api/v1/services/${body.servicesId}?authToken=${yagnaOptions.apiKey}`;
const url = new URL(messages_link, this.options.yagnaOptions.basePath);
url.protocol = "ws:";
this.services.set(url.toString(), body.servicesId);

return { url, serviceId: body.servicesId };
}

private async deleteService(id: string): Promise<void> {
const yagnaOptions = this.options.yagnaOptions;
const resp = await fetch(new URL(`/gsb-api/v1/services/${id}`, yagnaOptions.basePath), {
method: "DELETE",
headers: {
"Authorization": `Bearer ${yagnaOptions.apiKey}`,
"Content-Type": "application/json",
},
});

if (resp.status !== 200) {
throw new Error(`Invalid response: ${resp.status}`);
}
}

private respond(ws: WebSocket, id: string, payload: unknown) {
ws.send(encode({
id,
payload
}));
}

private completeReceive(hash: string, data: UploadChunkChunk[]): Uint8Array {
data.sort((a, b) => a.offset - b.offset);
const size = data.reduce((acc, cur) => acc + cur.content.byteLength, 0);
const buf = new Uint8Array(size);
data.forEach((chunk) => {
buf.set(chunk.content, chunk.offset);
});

// FIXME: Use digest.update and async, as it can only handle 14MB/s on my machine, which is way to slow to do synchronously.
const hashHex = jsSha3.sha3_256(buf);

if (hash !== hashHex) {
throw new Error(`File corrupted, expected hash ${hash}, got ${hashHex}`);
} else {
return buf;
}
}
}
Loading