From edd4eb5891a9576d2a3f0196b361ac309981ef6c Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Mon, 15 Jan 2024 15:21:23 +0100 Subject: [PATCH] feat: use public whitelist endpoint to use good providers and handle aborts --- examples/usage.ts | 3 +- src/golem.ts | 187 +++++++++++++++++++++------------- src/tesseract-ocr-on-golem.ts | 74 ++++++++++++-- 3 files changed, 182 insertions(+), 82 deletions(-) diff --git a/examples/usage.ts b/examples/usage.ts index fc38dfd..519c0b7 100644 --- a/examples/usage.ts +++ b/examples/usage.ts @@ -40,7 +40,6 @@ const writeTextToResultFile = (text?: string) => { if (alreadyShuttingDown) { console.error("The process is already shutting down, will force quit"); process.exit(1); - return; } else { console.log( "Shutdown initiated, please wait for everything to finish, or hit ^C again to force exit", @@ -49,7 +48,7 @@ const writeTextToResultFile = (text?: string) => { alreadyShuttingDown = true; - await ocr.shutdown(); + await ocr.abort(); }; process.on("SIGINT", stop); diff --git a/src/golem.ts b/src/golem.ts index eed6511..efe6642 100644 --- a/src/golem.ts +++ b/src/golem.ts @@ -21,49 +21,80 @@ import { Proposal } from "@golem-sdk/golem-js/dist/market"; import genericPool from "generic-pool"; import debug, { Debugger } from "debug"; +export type GolemMarketConfig = { + /** How long you want to rent the resources in hours */ + rentHours: number; + + /** What's the desired hourly rate spend in GLM/hour */ + priceGlmPerHour: number; + + /** The payment network that should be considered while looking for providers and where payments will be done */ + paymentNetwork: string; + + /** + * List of provider Golem Node IDs that should be considered + * + * If not provided, the list will be pulled from: https://provider-health.golem.network/v1/provider-whitelist + */ + withProviders?: string[]; +}; + +export type ServiceDeploymentConfig = { + /** How many instances of that service you want to have at maximum, given the idle ones will be freed to control costs */ + maxReplicas: number; + + /** Specify the computation resource criteria to filter offers on the Golem Network */ + resources: Partial<{ + /** The minimum CPU requirement for each service instance. */ + minCpu: number; + //TODO: maxCpu: number; + /* The minimum memory requirement (in Gibibyte) for each service instance. */ + minMemGib: number; + // TODO: maxMemGib: number; + /** The minimum storage requirement (in Gibibyte) for each service instance. */ + minStorageGib: number; + // TODO: maxStorageGib: number; + }>; + + /** The time interval (in seconds) between checks to release unused resources. */ + downscaleIntervalSec: number; +}; + +export type GolemApiConfig = { + /** + * The URL to `yagna` API + * + * It can be provided via the `GOLEM_API_URL` environment variable. + * + * Defaults to `http://localhost:7465/` + */ + url: string; + + /** + * The API key that your script will use to contact `yagna` + * + * You can obtain this from `yagna app-key list` command. + */ + key: string; +}; + export interface GolemConfig { + /** + * Golem Node's (yagna) API related config params. + */ + api: GolemApiConfig; + /** * Specification of how long you want to rent the compute resources for * * These parameters will be used to find the providers matching your pricing criteria, estimate and allocate GLM budget for the operations. */ - market: { - /** How long you want to rent the resources in hours */ - rentHours: number; - /** What's the desired hourly rate spend in GLM/hour */ - priceGlmPerHour: number; - - /** - * List of provider Golem Node IDs that should be considered - * - * If not provided, the list will be pulled from: https://provider-health.golem.network/v1/provider-whitelist - */ - withProviders?: string[]; - }; + market: GolemMarketConfig; /** * Represents the deployment configuration for a service on Golem Network */ - deploy: { - /** How many instances of that service you want to have at maximum, given the idle ones will be freed to control costs */ - maxReplicas: number; - - /** Specify the computation resource criteria to filter offers on the Golem Network */ - resources: Partial<{ - /** The minimum CPU requirement for each service instance. */ - minCpu: number; - //TODO: maxCpu: number; - /* The minimum memory requirement (in Gibibyte) for each service instance. */ - minMemGib: number; - // TODO: maxMemGib: number; - /** The minimum storage requirement (in Gibibyte) for each service instance. */ - minStorageGib: number; - // TODO: maxStorageGib: number; - }>; - - /** The time interval (in seconds) between checks to release unused resources. */ - downscaleIntervalSec: number; - }; + deploy: ServiceDeploymentConfig; /** Number of seconds to wait for the Golem component to initialize (be ready to accept requests and order resources on Golem Network) */ initTimeoutSec: number; @@ -74,27 +105,6 @@ export interface GolemConfig { * This value has to consider time for a fresh replica to be added before the request is sent to one. */ requestStartTimeoutSec: number; - - /** - * Golem Node's (yagna) API related config params. - */ - api?: { - /** - * The URL to `yagna` API - * - * It can be provided via the `GOLEM_API_URL` environment variable. - * - * Defaults to `http://localhost:7465/` - */ - url?: string; - - /** - * The API key that your script will use to contact `yagna` - * - * You can obtain this from `yagna app-key list` command. - */ - key: string; - }; } export class Golem { @@ -112,6 +122,8 @@ export class Golem { private config: GolemConfig; + private abortController = new AbortController(); + constructor(config: GolemConfig) { this.logger = debug("golem"); @@ -119,13 +131,9 @@ export class Golem { // FIXME: This internally allocates resources like connections, which also have to be cleaned up this.yagna = new Yagna({ - apiKey: this.config.api?.key ?? process.env["GOLEM_API_KEY"], - basePath: - this.config.api?.url ?? - process.env["GOLEM_API_URL"] ?? - "http://localhost:7465", + apiKey: this.config.api.key, + basePath: this.config.api.url, }); - // TODO: Payment driver? this.api = this.yagna.getApi(); @@ -145,7 +153,7 @@ export class Golem { this.paymentService = new PaymentService(this.api, { logger: createLogger("golem-js:payment"), payment: { - network: process.env["GOLEM_PAYMENT_NETWORK"] ?? "goerli", + network: this.config.market.paymentNetwork, }, }); @@ -181,27 +189,62 @@ export class Golem { } async sendTask(task: Worker): Promise { + if (this.abortController.signal.aborted) { + throw new Error( + `No new task will be accepted because of the abort signal being already raised.`, + ); + } + const activity = await this.activityPool.acquire(); this.logger("Acquired activity %s to execute the task", activity.id); try { - const ctx = new WorkContext(activity, { - storageProvider: this.storageProvider, - }); - await ctx.before(); - const result = await task(ctx); - //await ctx.after(); // FIXME: It's kind of missing when you're using .before()... + if (this.abortController.signal.aborted) { + throw new Error( + `The task will not be served on activity because the abort signal is already raised.`, + ); + } + + // FIXME #sdk, I would like to have an ability to pass an abort controller signal to the SDK to handle it... + return await new Promise((resolve, reject) => { + if (this.abortController.signal.aborted) { + reject( + `Task execution aborted at start because of the abort signal being already raised.`, + ); + } + + this.abortController.signal.onabort = () => { + this.logger("Task received abort signal, will reject immediately."); + reject( + `Task execution aborted due to: ${this.abortController.signal.reason}`, + ); + }; - return result; + const ctx = new WorkContext(activity, { + storageProvider: this.storageProvider, + }); + + ctx + .before() + .then(() => task(ctx)) + .then((result) => resolve(result)) + .catch((err) => reject(err)); + }); } catch (err) { console.error(err, "Running the task on Golem failed with this error"); throw err; } finally { await this.activityPool.release(activity); - this.logger("Released activity %s after task execution", activity.id); + this.logger("Released activity %s", activity.id); } } + abort() { + this.logger("Aborting all operations on Golem"); + this.abortController.abort("The client is shutting down"); + return this.stop(); + } + async stop() { this.logger("Waiting for the activity pool to drain"); await this.activityPool.drain(); @@ -304,7 +347,11 @@ export class Golem { private buildProposalFilter(): ProposalFilter { return (proposal) => { - if (!this.config.market.withProviders?.includes(proposal.provider.id)) { + if ( + this.config.market.withProviders && + this.config.market.withProviders.length > 0 && + !this.config.market.withProviders.includes(proposal.provider.id) + ) { return false; } diff --git a/src/tesseract-ocr-on-golem.ts b/src/tesseract-ocr-on-golem.ts index 64c7668..c2e315e 100644 --- a/src/tesseract-ocr-on-golem.ts +++ b/src/tesseract-ocr-on-golem.ts @@ -1,5 +1,10 @@ import path from "path"; -import { Golem, GolemConfig } from "./golem"; +import { + Golem, + GolemApiConfig, + GolemMarketConfig, + ServiceDeploymentConfig, +} from "./golem"; import * as fs from "fs"; import debug from "debug"; @@ -20,13 +25,21 @@ export interface TesseractArgs { oem?: number; } +type MakeOptional = Omit & Partial>; + export interface TesseractOcrOnGolemConfig { /** * Configuration options to use when getting compute resources from the Golem Network * * This configuration is concerned only with the settings which are relevant in the Tesseract OCR use-case */ - service: GolemConfig; + service: { + api?: GolemApiConfig; + market: MakeOptional; + deploy: ServiceDeploymentConfig; + initTimeoutSec: number; + requestStartTimeoutSec: number; + }; /** * Tesseract OCR specific arguments that the user might want to use in order to tweak the performance or outcomes @@ -43,7 +56,6 @@ export class TesseractOcrOnGolem { constructor(private config: TesseractOcrOnGolemConfig) { this.logger = debug("tesseract"); - // this.workload = this.golem.createWorkload({ spec... }); } /** @@ -58,13 +70,44 @@ export class TesseractOcrOnGolem { const { initTimeoutSec } = this.config.service; - const golemConfig = { ...this.config.service }; + const apiKey = process.env["GOLEM_API_KEY"]; - if (golemConfig.market.withProviders === undefined) { - golemConfig.market.withProviders = await this.fetchRecommendedProviders(); + if (apiKey === undefined) { + throw new Error( + "You didn't specify the Golem API key in the config object or GOLEM_API_KEY environment setting", + ); } - this.golem = new Golem(golemConfig); + const API_DEFAULTS: Pick = { + key: apiKey, + url: process.env["GOLEM_API_URL"] ?? "http://localhost:7465", + }; + + const MARKET_DEFAULTS: Pick = { + paymentNetwork: process.env["GOLEM_PAYMENT_NETWORK"] ?? "goerli", + }; + + const marketConfig: GolemMarketConfig = { + ...MARKET_DEFAULTS, + ...this.config.service.market, + }; + + if (marketConfig.withProviders === undefined) { + marketConfig.withProviders = await this.fetchRecommendedProviders( + marketConfig.paymentNetwork, + ); + } + + this.golem = new Golem({ + api: { + ...API_DEFAULTS, + ...this.config.service.api, + }, + initTimeoutSec: this.config.service.initTimeoutSec, + requestStartTimeoutSec: this.config.service.requestStartTimeoutSec, + deploy: this.config.service.deploy, + market: marketConfig, + }); const timeout = () => new Promise((_resolve, reject) => { @@ -132,6 +175,13 @@ export class TesseractOcrOnGolem { }); } + async abort() { + this.logger("Aborting Tesseract On Golem"); + await this.golem?.abort(); + this.isInitialized = false; + this.logger("Aborted Tesseract On Golem"); + } + /** * Stops the Tesseract service gracefully by shutting down the Golem * @@ -174,8 +224,11 @@ export class TesseractOcrOnGolem { * Since the network can contain broken or failing providers, we make use of the public whitelist of validated * providers to increase the chance for a successful conversion */ - private async fetchRecommendedProviders() { - this.logger("Downloading recommended provider list"); + private async fetchRecommendedProviders(paymentNetwork: string) { + this.logger( + "Downloading recommended provider list for payment network %s", + paymentNetwork, + ); const FETCH_TIMEOUT_SEC = 30; const FALLBACK_LIST: string[] = []; @@ -191,7 +244,7 @@ export class TesseractOcrOnGolem { ); const response: Response = await fetch( - "https://provider-health.golem.network/v1/provider-whitelist", + `https://provider-health.golem.network/v1/provider-whitelist?paymentNetwork=${paymentNetwork}`, { headers: { Accept: "application/json", @@ -207,6 +260,7 @@ export class TesseractOcrOnGolem { "The response from the recommended providers endpoint was not OK %o. Using fallback.", response.body, ); + return FALLBACK_LIST; }