Skip to content

Commit

Permalink
feat: use public whitelist endpoint to use good providers and handle …
Browse files Browse the repository at this point in the history
…aborts
  • Loading branch information
grisha87 committed Jan 15, 2024
1 parent 314a038 commit edd4eb5
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 82 deletions.
3 changes: 1 addition & 2 deletions examples/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const writeTextToResultFile = (text?: string) => {
if (alreadyShuttingDown) {
console.error("The process is already shutting down, will force quit");
process.exit(1);
return;
} else {
console.log(
"Shutdown initiated, please wait for everything to finish, or hit ^C again to force exit",
Expand All @@ -49,7 +48,7 @@ const writeTextToResultFile = (text?: string) => {

alreadyShuttingDown = true;

await ocr.shutdown();
await ocr.abort();
};

process.on("SIGINT", stop);
Expand Down
187 changes: 117 additions & 70 deletions src/golem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,80 @@ import { Proposal } from "@golem-sdk/golem-js/dist/market";
import genericPool from "generic-pool";
import debug, { Debugger } from "debug";

export type GolemMarketConfig = {
/** How long you want to rent the resources in hours */
rentHours: number;

/** What's the desired hourly rate spend in GLM/hour */
priceGlmPerHour: number;

/** The payment network that should be considered while looking for providers and where payments will be done */
paymentNetwork: string;

/**
* List of provider Golem Node IDs that should be considered
*
* If not provided, the list will be pulled from: https://provider-health.golem.network/v1/provider-whitelist
*/
withProviders?: string[];
};

export type ServiceDeploymentConfig = {
/** How many instances of that service you want to have at maximum, given the idle ones will be freed to control costs */
maxReplicas: number;

/** Specify the computation resource criteria to filter offers on the Golem Network */
resources: Partial<{
/** The minimum CPU requirement for each service instance. */
minCpu: number;
//TODO: maxCpu: number;
/* The minimum memory requirement (in Gibibyte) for each service instance. */
minMemGib: number;
// TODO: maxMemGib: number;
/** The minimum storage requirement (in Gibibyte) for each service instance. */
minStorageGib: number;
// TODO: maxStorageGib: number;
}>;

/** The time interval (in seconds) between checks to release unused resources. */
downscaleIntervalSec: number;
};

export type GolemApiConfig = {
/**
* The URL to `yagna` API
*
* It can be provided via the `GOLEM_API_URL` environment variable.
*
* Defaults to `http://localhost:7465/`
*/
url: string;

/**
* The API key that your script will use to contact `yagna`
*
* You can obtain this from `yagna app-key list` command.
*/
key: string;
};

export interface GolemConfig {
/**
* Golem Node's (yagna) API related config params.
*/
api: GolemApiConfig;

/**
* Specification of how long you want to rent the compute resources for
*
* These parameters will be used to find the providers matching your pricing criteria, estimate and allocate GLM budget for the operations.
*/
market: {
/** How long you want to rent the resources in hours */
rentHours: number;
/** What's the desired hourly rate spend in GLM/hour */
priceGlmPerHour: number;

/**
* List of provider Golem Node IDs that should be considered
*
* If not provided, the list will be pulled from: https://provider-health.golem.network/v1/provider-whitelist
*/
withProviders?: string[];
};
market: GolemMarketConfig;

/**
* Represents the deployment configuration for a service on Golem Network
*/
deploy: {
/** How many instances of that service you want to have at maximum, given the idle ones will be freed to control costs */
maxReplicas: number;

/** Specify the computation resource criteria to filter offers on the Golem Network */
resources: Partial<{
/** The minimum CPU requirement for each service instance. */
minCpu: number;
//TODO: maxCpu: number;
/* The minimum memory requirement (in Gibibyte) for each service instance. */
minMemGib: number;
// TODO: maxMemGib: number;
/** The minimum storage requirement (in Gibibyte) for each service instance. */
minStorageGib: number;
// TODO: maxStorageGib: number;
}>;

/** The time interval (in seconds) between checks to release unused resources. */
downscaleIntervalSec: number;
};
deploy: ServiceDeploymentConfig;

/** Number of seconds to wait for the Golem component to initialize (be ready to accept requests and order resources on Golem Network) */
initTimeoutSec: number;
Expand All @@ -74,27 +105,6 @@ export interface GolemConfig {
* This value has to consider time for a fresh replica to be added before the request is sent to one.
*/
requestStartTimeoutSec: number;

/**
* Golem Node's (yagna) API related config params.
*/
api?: {
/**
* The URL to `yagna` API
*
* It can be provided via the `GOLEM_API_URL` environment variable.
*
* Defaults to `http://localhost:7465/`
*/
url?: string;

/**
* The API key that your script will use to contact `yagna`
*
* You can obtain this from `yagna app-key list` command.
*/
key: string;
};
}

export class Golem {
Expand All @@ -112,20 +122,18 @@ export class Golem {

private config: GolemConfig;

private abortController = new AbortController();

constructor(config: GolemConfig) {
this.logger = debug("golem");

this.config = config;

// FIXME: This internally allocates resources like connections, which also have to be cleaned up
this.yagna = new Yagna({
apiKey: this.config.api?.key ?? process.env["GOLEM_API_KEY"],
basePath:
this.config.api?.url ??
process.env["GOLEM_API_URL"] ??
"http://localhost:7465",
apiKey: this.config.api.key,
basePath: this.config.api.url,
});
// TODO: Payment driver?

this.api = this.yagna.getApi();

Expand All @@ -145,7 +153,7 @@ export class Golem {
this.paymentService = new PaymentService(this.api, {
logger: createLogger("golem-js:payment"),
payment: {
network: process.env["GOLEM_PAYMENT_NETWORK"] ?? "goerli",
network: this.config.market.paymentNetwork,
},
});

Expand Down Expand Up @@ -181,27 +189,62 @@ export class Golem {
}

async sendTask<T>(task: Worker<T>): Promise<T | undefined> {
if (this.abortController.signal.aborted) {
throw new Error(
`No new task will be accepted because of the abort signal being already raised.`,
);
}

const activity = await this.activityPool.acquire();
this.logger("Acquired activity %s to execute the task", activity.id);

try {
const ctx = new WorkContext(activity, {
storageProvider: this.storageProvider,
});
await ctx.before();
const result = await task(ctx);
//await ctx.after(); // FIXME: It's kind of missing when you're using .before()...
if (this.abortController.signal.aborted) {
throw new Error(
`The task will not be served on activity because the abort signal is already raised.`,
);
}

// FIXME #sdk, I would like to have an ability to pass an abort controller signal to the SDK to handle it...
return await new Promise((resolve, reject) => {
if (this.abortController.signal.aborted) {
reject(
`Task execution aborted at start because of the abort signal being already raised.`,
);
}

this.abortController.signal.onabort = () => {
this.logger("Task received abort signal, will reject immediately.");
reject(
`Task execution aborted due to: ${this.abortController.signal.reason}`,
);
};

return result;
const ctx = new WorkContext(activity, {
storageProvider: this.storageProvider,
});

ctx
.before()
.then(() => task(ctx))
.then((result) => resolve(result))
.catch((err) => reject(err));
});
} catch (err) {
console.error(err, "Running the task on Golem failed with this error");
throw err;
} finally {
await this.activityPool.release(activity);
this.logger("Released activity %s after task execution", activity.id);
this.logger("Released activity %s", activity.id);
}
}

abort() {
this.logger("Aborting all operations on Golem");
this.abortController.abort("The client is shutting down");
return this.stop();
}

async stop() {
this.logger("Waiting for the activity pool to drain");
await this.activityPool.drain();
Expand Down Expand Up @@ -304,7 +347,11 @@ export class Golem {

private buildProposalFilter(): ProposalFilter {
return (proposal) => {
if (!this.config.market.withProviders?.includes(proposal.provider.id)) {
if (
this.config.market.withProviders &&
this.config.market.withProviders.length > 0 &&
!this.config.market.withProviders.includes(proposal.provider.id)
) {
return false;
}

Expand Down
Loading

0 comments on commit edd4eb5

Please sign in to comment.