From 8aabace13fc956ed5c0b695a70365caeb821718a Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Thu, 1 Aug 2024 09:16:28 +0200 Subject: [PATCH] fix: fixed TcpProxy implementation so that it works with non-http use-cases as well --- .../rental-model/advanced/tcp-proxy/server.js | 39 ++++ .../advanced/tcp-proxy/tcp-proxy.ts | 86 ++++++++ src/market/market.module.test.ts | 4 +- src/network/tcpProxy.ts | 201 ++++++++++++++---- src/resource-rental/resource-rental.ts | 4 +- src/shared/utils/index.ts | 1 + src/shared/utils/wait.ts | 2 +- src/shared/yagna/event-reader.ts | 4 +- 8 files changed, 289 insertions(+), 52 deletions(-) create mode 100644 examples/rental-model/advanced/tcp-proxy/server.js create mode 100644 examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts diff --git a/examples/rental-model/advanced/tcp-proxy/server.js b/examples/rental-model/advanced/tcp-proxy/server.js new file mode 100644 index 000000000..a1cb57ab1 --- /dev/null +++ b/examples/rental-model/advanced/tcp-proxy/server.js @@ -0,0 +1,39 @@ +/* eslint-disable */ +const http = require("http"); + +(async function main() { + const PORT = parseInt(process.env["PORT"] ?? "80"); + + // Increase the value if you want to test long response/liveliness scenarios + const SIMULATE_DELAY_SEC = parseInt(process.env["SIMULATE_DELAY_SEC"] ?? "0"); + + const respond = (res) => { + res.writeHead(200); + res.end("Hello Golem!"); + }; + + const app = http.createServer((req, res) => { + if (SIMULATE_DELAY_SEC > 0) { + setTimeout(() => { + respond(res); + }, SIMULATE_DELAY_SEC * 1000); + } else { + respond(res); + } + }); + + const server = app.listen(PORT, () => console.log(`HTTP server started at "http://localhost:${PORT}"`)); + + const shutdown = () => { + server.close((err) => { + if (err) { + console.error("Server close encountered an issue", err); + } else { + console.log("Server closed successfully"); + } + }); + }; + + process.on("SIGINT", shutdown); + process.on("SIGTERM", shutdown); +})(); diff --git a/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts b/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts new file mode 100644 index 000000000..da81e3a8c --- /dev/null +++ b/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts @@ -0,0 +1,86 @@ +import { GolemNetwork, waitFor } from "@golem-sdk/golem-js"; +import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; + +(async () => { + const logger = pinoPrettyLogger({ + level: "info", + }); + const glm = new GolemNetwork({ + logger, + }); + + try { + await glm.connect(); + + const network = await glm.createNetwork({ + ip: "10.0.0.0/24", + }); + + const rental = await glm.oneOf({ + order: { + demand: { + workload: { + imageTag: "golem/node:20-alpine", + capabilities: ["vpn"], + }, + }, + market: { + rentHours: 0.25, + pricing: { + model: "burn-rate", + avgGlmPerHour: 1, + }, + }, + network, + }, + }); + + const PORT_ON_PROVIDER = 80; + const PORT_ON_REQUESTOR = 8080; + + const exe = await rental.getExeUnit(); + + // Install the server script + await exe.uploadFile(`./rental-model/advanced/tcp-proxy/server.js`, "/golem/work/server.js"); + + // Start the server process on the provider + const server = await exe.runAndStream(`PORT=${PORT_ON_PROVIDER} node /golem/work/server.js`); + + server.stdout.subscribe((data) => console.log("provider>", data)); + server.stderr.subscribe((data) => console.error("provider>", data)); + + // Create a proxy instance + const proxy = exe.createTcpProxy(PORT_ON_PROVIDER); + proxy.events.on("error", (error) => console.error("TcpProxy reported an error:", error)); + + // Start listening and expose the port on your requestor machine + await proxy.listen(PORT_ON_REQUESTOR); + console.log(`Server Proxy listen at http://localhost:${PORT_ON_REQUESTOR}`); + + let isClosing = false; + const stopServer = async () => { + if (isClosing) { + console.log("Already closing, ignoring subsequent shutdown request"); + return; + } + + isClosing = true; + + console.log("Shutting down gracefully"); + await proxy.close(); + }; + + process.on("SIGINT", () => { + stopServer() + .then(() => rental.stopAndFinalize()) + .then(() => logger.info("Shutdown routine completed")) + .catch((err) => logger.error("Failed to shutdown cleanly", err)); + }); + + await waitFor(() => server.isFinished()); + } catch (error) { + logger.error("Failed to run the example", error); + } finally { + await glm.disconnect(); + } +})().catch(console.error); diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index c91634fa0..33d24f47a 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -12,7 +12,7 @@ import { Allocation, IPaymentApi } from "../payment"; import { INetworkApi, NetworkModule } from "../network"; import { DraftOfferProposalPool } from "./draft-offer-proposal-pool"; import { Agreement, AgreementEvent, ProviderInfo } from "./agreement"; -import { waitAndCall, waitForCondition } from "../shared/utils/wait"; +import { waitAndCall, waitFor } from "../shared/utils/wait"; import { MarketOrderSpec } from "../golem-network"; import { GolemAbortError } from "../shared/error/golem-error"; @@ -347,7 +347,7 @@ describe("Market module", () => { }); }); - await waitForCondition(() => draftListener.mock.calls.length > 0); + await waitFor(() => draftListener.mock.calls.length > 0); testSub.unsubscribe(); expect(draftListener).toHaveBeenCalledWith(draftProposal); diff --git a/src/network/tcpProxy.ts b/src/network/tcpProxy.ts index ae2bd70d9..d08a0b210 100644 --- a/src/network/tcpProxy.ts +++ b/src/network/tcpProxy.ts @@ -2,6 +2,7 @@ import net from "net"; import { WebSocket } from "ws"; import { EventEmitter } from "eventemitter3"; import { defaultLogger, Logger } from "../shared/utils"; +import { Buffer } from "buffer"; export interface TcpProxyEvents { /** Raised when the proxy encounters any sort of error */ @@ -31,6 +32,16 @@ export interface TcpProxyOptions { * **IMPORTANT** * * This feature is supported only in the Node.js environment. In has no effect in browsers. + * + * General solution description: + * + * - [x] Open a TCP server and listen to connections + * - [x] When a new connection arrives, establish a WS connection with yagna + * - [ ] Pass any incoming data from the client TCP socket to the WS, buffer it when the socket is not ready yet + * - [ ] Pass any returning data from the WS to the client TCP socket, but don't do it if the client socket already disconnected + * - [ ] When the WS will be closed, then close the client socket as well + * - [ ] When the client TCP socket will be closed, close the WS as well + * - [ ] Handle teardown of the TCP-WS bridge by clearing communication buffers to avoid memory leaks */ export class TcpProxy { private server: net.Server; @@ -58,68 +69,166 @@ export class TcpProxy { this.heartBeatSec = options.heartBeatSec ?? 10; this.logger = options.logger ? options.logger.child("tcp-proxy") : defaultLogger("tcp-proxy"); - this.server = new net.Server({ keepAlive: true }, (socket: net.Socket) => { - this.logger.debug("TcpProxy Server new incoming connection"); + this.server = net.createServer((client: net.Socket) => { + this.logger.debug("Client connected to TCP Server"); + + const state = { + /** Tells if the client socket is in a usable state */ + sReady: true, + /** Buffer for chunks of data that arrived from yagna's WS and should be delivered to the client socket when it's ready */ + sBuffer: [] as Buffer[], + /** Tells if the WS with yagna is ready for communication */ + wsReady: false, + /** Buffer for chunks of data that arrived from the client socket and should be sent to yagna's WS when it's ready */ + wsBuffer: [] as Buffer[], + }; + + const clearSocketBuffer = () => (state.sBuffer = []); + const clearWebSocketBuffer = () => (state.wsBuffer = []); + + // UTILITY METHODS + const flushSocketBuffer = () => { + this.logger.debug("Flushing Socket buffer"); + if (state.sBuffer.length > 0) { + client.write(Buffer.concat(state.sBuffer)); + } + clearSocketBuffer(); + }; + + const flushWebSocketBuffer = () => { + this.logger.debug("Flushing WebSocket buffer"); + if (state.wsBuffer.length > 0) { + ws.send(Buffer.concat(state.wsBuffer), { + binary: true, + mask: true, + }); + } + clearWebSocketBuffer(); + }; + + const teardownBridge = () => { + ws.close(); + client.end(); + clearWebSocketBuffer(); + clearSocketBuffer(); + }; const ws = new WebSocket(this.wsUrl, { headers: { authorization: `Bearer ${this.appKey}` } }); + // OPEN HANDLERS ws.on("open", () => { - this.logger.debug("TcpProxy Yagna WS opened"); + this.logger.debug("Yagna WS opened"); + state.wsReady = true; + // Push any pending data to the web-socket + flushWebSocketBuffer(); + }); + + // NOTE: That's not really required in our use-case, added for completeness of the flow + client.on("connect", () => { + this.logger.debug("Client socket connected"); + state.sReady = true; + // Push any pending data to the client socket + flushSocketBuffer(); + }); - // Register the actual data transfer - socket.on("data", async (chunk) => ws.send(chunk.toString())); + // ERROR HANDLERS + ws.on("error", (error) => { + this.notifyOfError("Yagna WS encountered an error", error); + teardownBridge(); }); - ws.on("message", (message) => socket.write(message.toString())); + client.on("error", (error) => { + this.notifyOfError("Server Socket encountered an error", error); + teardownBridge(); + }); + + // TERMINATION HANDLERS + + // When the WS socket will be closed + ws.on("close", () => { + clearInterval(heartBeatInt); + this.logger.debug("Yagna WS closed"); + client.end(); + clearWebSocketBuffer(); + clearSocketBuffer(); + }); ws.on("end", () => { - this.logger.debug("TcpProxy Yagna WS end"); - socket.end(); + this.logger.debug("Yagna WS end"); + client.end(); + clearWebSocketBuffer(); + clearSocketBuffer(); }); - ws.on("error", (error) => { - this.handleError("TcpProxy Yagna WS encountered an error", error); + // When the client will disconnect + client.on("close", (error) => { + if (error) { + this.logger.error("Server Socket encountered closed with an error error"); + } else { + this.logger.debug("Server Socket has been closed (client disconnected)"); + } + ws.close(); + clearWebSocketBuffer(); + clearSocketBuffer(); + }); + + // DATA TRANSFER + // Send data to the WebSocket or buffer if it's not ready yet + client.on("data", async (chunk) => { + this.logger.debug("Server Socket received data", { length: chunk.length, wsReady: state.wsReady }); + if (!state.wsReady) { + state.wsBuffer.push(chunk); + } else { + ws.send(chunk, { binary: true, mask: true }); + } }); + // Send data to the client or buffer if it's not ready yet + ws.on("message", (message) => { + const length = "length" in message ? message.length : null; + this.logger.debug("Yagna WS received data", { length, socketReady: state.sReady }); + if (message instanceof Buffer) { + if (!state.sReady) { + state.wsBuffer.push(message); + } else { + client.write(message); + } + } else { + // Defensive programming + this.logger.error("Encountered unsupported type of message", typeof message); + } + }); + + // WS health monitoring ws.on("ping", () => { - this.logger.debug("TcpProxy Yagna WS received ping event"); + this.logger.debug("Yagna WS received ping event"); }); // Configure pings to check the health of the WS to Yagna let isAlive = true; const heartBeat = () => { - this.logger.debug("TcpProxy Yagna WS checking if the socket is alive"); - if (!isAlive) { - this.handleError("TcpProxy Yagna WS doesn't seem to be healthy, going to terminate"); - // Previous check failed, time to terminate - return ws.terminate(); - } + if (state.wsReady) { + this.logger.debug("Yagna WS checking if the client is alive"); + if (!isAlive) { + this.notifyOfError("Yagna WS doesn't seem to be healthy, going to terminate"); + // Previous check failed, time to terminate + return ws.terminate(); + } - isAlive = false; - ws.ping(); + isAlive = false; + ws.ping(); + } else { + this.logger.debug("Yagna WS is not ready yet, skipping heart beat"); + } }; const heartBeatInt = setInterval(heartBeat, this.heartBeatSec * 1000); ws.on("pong", () => { - this.logger.debug("TcpProxy Yagna WS received pong event"); + this.logger.debug("Yagna WS received pong event"); isAlive = true; }); - - ws.on("close", () => { - clearInterval(heartBeatInt); - this.logger.debug("TcpProxy Yagna WS was closed"); - }); - - socket.on("error", (error) => { - this.handleError("TcpProxy Server Socket encountered an error", error); - }); - - socket.on("close", () => { - this.logger.debug("TcpProxy Server Socket has been closed"); - ws.close(); - }); }); this.attachDebugLogsToServer(); @@ -141,7 +250,7 @@ export class TcpProxy { return new Promise((resolve, reject) => { const handleError = (err: unknown) => { - this.handleError("TcpProxy failed to start listening", { port, err }); + this.notifyOfError("TcpProxy failed to start listening", { port, err }); this.server.removeListener("listening", handleListen); reject(err); }; @@ -161,35 +270,37 @@ export class TcpProxy { * Gracefully close the proxy */ public close() { - this.logger.debug("TcpProxy close initiated"); + this.logger.debug("TCP Server close initiated by the user"); return new Promise((resolve, reject) => { if (this.server.listening) { this.server?.close((err) => { if (err) { - this.handleError("TcpProxy failed to close properly", err); + this.notifyOfError("TCP Server closed with an error", err); reject(err); } else { - this.logger.info("TcpProxy closed - was listening"); + this.logger.info("TCP server closed - was listening"); resolve(); } }); } else { - this.logger.info("TcpProxy closed - was not listening"); + this.logger.info("TCP Server closed - was not listening"); resolve(); } }); } - private handleError(message: string, err?: unknown) { + private notifyOfError(message: string, err?: unknown) { this.logger.error(message, err); this.events.emit("error", `${message}: ${err}`); } private attachDebugLogsToServer() { - this.server.on("listening", () => this.logger.debug("TcpProxy Server event 'listening'")); - this.server.on("close", () => this.logger.debug("TcpProxy Server event 'close'")); - this.server.on("connection", () => this.logger.debug("TcpProxy Server event 'connection'")); - this.server.on("drop", (data) => this.logger.debug("TcpProxy Server event 'drop'", { data })); - this.server.on("error", (err) => this.logger.debug("TcpProxy Server event 'error'", err)); + this.server.on("listening", () => this.logger.debug("TCP Server started to listen")); + this.server.on("close", () => this.logger.debug("TCP Server closed")); + this.server.on("connection", () => this.logger.debug("TCP Server received new connection")); + this.server.on("drop", (data) => + this.logger.debug("TCP Server dropped a connection because of reaching `maxConnections`", { data }), + ); + this.server.on("error", (err) => this.logger.error("Server event 'error'", err)); } } diff --git a/src/resource-rental/resource-rental.ts b/src/resource-rental/resource-rental.ts index 9dfb99bc9..958b275c4 100644 --- a/src/resource-rental/resource-rental.ts +++ b/src/resource-rental/resource-rental.ts @@ -1,7 +1,7 @@ import { Agreement, MarketModule } from "../market"; import { AgreementPaymentProcess, PaymentProcessOptions } from "../payment/agreement_payment_process"; import { createAbortSignalFromTimeout, Logger } from "../shared/utils"; -import { waitForCondition } from "../shared/utils/wait"; +import { waitFor } from "../shared/utils/wait"; import { Activity, ActivityModule, ExeUnit, ExeUnitOptions } from "../activity"; import { StorageProvider } from "../shared/storage"; import { EventEmitter } from "eventemitter3"; @@ -77,7 +77,7 @@ export class ResourceRental { this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); - await waitForCondition(() => this.paymentProcess.isFinished(), { + await waitFor(() => this.paymentProcess.isFinished(), { signalOrTimeout: abortSignal, }).catch((error) => { this.paymentProcess.stop(); diff --git a/src/shared/utils/index.ts b/src/shared/utils/index.ts index 63471f224..e50539ca7 100644 --- a/src/shared/utils/index.ts +++ b/src/shared/utils/index.ts @@ -10,3 +10,4 @@ export { YagnaApi, YagnaOptions } from "../yagna/yagnaApi"; export * from "./abortSignal"; export * from "./eventLoop"; export * from "./rxjs"; +export * from "./wait"; diff --git a/src/shared/utils/wait.ts b/src/shared/utils/wait.ts index 2fe8bb636..802f13e8e 100644 --- a/src/shared/utils/wait.ts +++ b/src/shared/utils/wait.ts @@ -11,7 +11,7 @@ import { createAbortSignalFromTimeout } from "./abortSignal"; * * @return {Promise} - Resolves when the condition is met or rejects with a timeout error if it wasn't met on time. */ -export function waitForCondition( +export function waitFor( check: () => boolean | Promise, opts?: { signalOrTimeout?: number | AbortSignal; intervalSeconds?: number }, ): Promise { diff --git a/src/shared/yagna/event-reader.ts b/src/shared/yagna/event-reader.ts index 9322b9d7b..48de021e7 100644 --- a/src/shared/yagna/event-reader.ts +++ b/src/shared/yagna/event-reader.ts @@ -1,7 +1,7 @@ import { Logger } from "../utils"; import { Subject } from "rxjs"; import { EventDTO } from "ya-ts-client/dist/market-api"; -import { waitForCondition } from "../utils/wait"; +import { waitFor } from "../utils/wait"; export type CancellablePoll = { /** User defined name of the event stream for ease of debugging */ @@ -79,7 +79,7 @@ export class EventReader { if (currentPoll) { currentPoll.cancel(); } - await waitForCondition(() => isFinished, { intervalSeconds: 0 }); + await waitFor(() => isFinished, { intervalSeconds: 0 }); logger.debug("Cancelled reading the events", { eventType }); }, };