Skip to content

Commit

Permalink
Reduce log spam and min cpu threads to b0.4 (#233)
Browse files Browse the repository at this point in the history
* Support for filtering provider offers by golem.inf.cpu.threads and golem.inf.cpu.cores (#231)
* Send valid termination reason (#229)
* Reduce log entries related to confirmed proposals (#230)
  • Loading branch information
filipgolem authored Jul 2, 2021
1 parent b6cd6b7 commit 44e3fa3
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 22 deletions.
13 changes: 8 additions & 5 deletions yajsapi/executor/agreements_pool.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Activity, NodeInfo } from "../props";
import { Agreement, OfferProposal } from "../rest/market";
import { Agreement, OfferProposal, TerminationReason } from "../rest/market";
import { asyncWith, CancellationToken, Lock, logger } from "../utils";
import * as events from "./events";
import { ComputationHistory } from "./strategy";
Expand Down Expand Up @@ -153,7 +153,10 @@ export class AgreementsPool implements ComputationHistory {
}
});
}
private async _terminate_agreement(agreement_id: string, reason: object): Promise<void> {
private async _terminate_agreement(
agreement_id: string,
reason: TerminationReason
): Promise<void> {
const buffered_agreement = this._agreements.get(agreement_id)
if (buffered_agreement === undefined) {
logger.warn(`Trying to terminate agreement not in the pool. id: ${agreement_id}`);
Expand All @@ -167,17 +170,17 @@ export class AgreementsPool implements ComputationHistory {
buffered_agreement.worker_task.cancel();
}
if (buffered_agreement.has_multi_activity) {
if (!(await buffered_agreement.agreement.terminate(reason.toString()))) {
if (!(await buffered_agreement.agreement.terminate(reason))) {
logger.debug(`Couldn't terminate agreement. id: ${buffered_agreement.agreement.id()}`);
}
}
this._agreements.delete(agreement_id);
this.emitter(new events.AgreementTerminated({
agr_id: agreement_id,
reason: reason.toString(),
reason: JSON.stringify(reason),
}));
}
async terminate_all(reason: object): Promise<void> {
async terminate_all(reason: TerminationReason): Promise<void> {
await asyncWith(this._lock, async (lock) => {
for (const agreement_id of new Map(this._agreements).keys()) {
await this._terminate_agreement(agreement_id, reason)
Expand Down
18 changes: 13 additions & 5 deletions yajsapi/executor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ export class Executor {
private _chan_computation_done;
private _cancellation_token: CancellationToken;

private emit;

/**
* Create new executor
*
Expand Down Expand Up @@ -231,6 +233,9 @@ export class Executor {
this._wrapped_consumer =
event_consumer &&
new AsyncWrapper(event_consumer, null, cancellationToken);
this.emit = <Callable<[events.YaEvent], void>>(
this._wrapped_consumer.async_call.bind(this._wrapped_consumer)
)
// Each call to `submit()` will put an item in the channel.
// The channel can be used to wait until all calls to `submit()` are finished.
this._chan_computation_done = csp.chan();
Expand Down Expand Up @@ -375,10 +380,7 @@ export class Executor {
>,
data: Iterable<Task<D, R>>
): AsyncGenerator<Task<D, R>> {
const emit = <Callable<[events.YaEvent], void>>(
this._wrapped_consumer.async_call.bind(this._wrapped_consumer)
);

const emit = this.emit;
let multi_payment_decoration;
try {
multi_payment_decoration = await this._create_allocations();
Expand Down Expand Up @@ -824,7 +826,12 @@ export class Executor {
}
try {
await agreements_pool.cycle();
await agreements_pool.terminate_all({ reason: "Computation finished." })
await agreements_pool.terminate_all({
"message":
cancellationToken.cancelled ? "Work cancelled" : "Successfully finished all work",
"golem.requestor.code":
cancellationToken.cancelled ? "Cancelled" : "Success"
});
} catch (error) {
logger.debug(`Problem with agreements termination ${error}`);
}
Expand Down Expand Up @@ -939,6 +946,7 @@ export class Executor {
// TODO: prevent new computations at this point (if it's even possible to start one)
this._market_api = null;
this._payment_api = null;
this.emit(new events.ShutdownFinished());
try {
await this._stack.aclose();
logger.info("Executor has shut down");
Expand Down
1 change: 1 addition & 0 deletions yajsapi/package/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export type RepoOpts = {
image_hash: string;
min_mem_gib: number;
min_storage_gib: number;
min_cpu_threads?: number;
};

export class Constraints {
Expand Down
6 changes: 5 additions & 1 deletion yajsapi/package/vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class _VmConstrains extends Constraints {
constructor(
min_mem_gib: number,
min_storage_gib: number,
min_cpu_threads: number = 1,
cores: number = 1
) {
super();
Expand All @@ -13,6 +14,7 @@ class _VmConstrains extends Constraints {
`(${InfVmKeys["mem"]}>=${min_mem_gib})`,
`(${InfVmKeys["storage"]}>=${min_storage_gib})`,
`(${InfVmKeys["runtime"]}=${RuntimeType.VM})`,
`(${InfVmKeys["threads"]}>=${min_cpu_threads})`,
]);
}
}
Expand All @@ -21,19 +23,21 @@ export async function repo({
image_hash,
min_mem_gib = 0.5,
min_storage_gib = 2.0,
min_cpu_threads = 1,
}: RepoOpts): Promise<Package> {
/*
Builds reference to a demand decorator.
- *image_hash*: finds package by its contents hash.
- *min_mem_gib*: minimal memory required to execute application code.
- *min_storage_gib* minimal disk storage to execute tasks.
- *min_cpu_threads*: minimal available logical CPU cores (CPU threads).
*/

return new VmPackage({
repo_url: await resolve_repo_srv({repo_srv: DEFAULT_REPO_SRV}),
image_hash,
constraints: new _VmConstrains(min_mem_gib, min_storage_gib),
constraints: new _VmConstrains(min_mem_gib, min_storage_gib, min_cpu_threads),
});
}
4 changes: 3 additions & 1 deletion yajsapi/props/inf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Field, Model } from "./base";
export const INF_MEM: string = "golem.inf.mem.gib";
export const INF_STORAGE: string = "golem.inf.storage.gib";
export const INF_CORES: string = "golem.inf.cpu.cores";
export const INF_THREADS: string = "golem.inf.cpu.threads";
export const INF_RUNTIME: string = "golem.runtime.name";
export const TRANSFER_CAPS: string = "golem.activity.caps.transfer.protocol";

Expand All @@ -19,6 +20,7 @@ export enum RuntimeType {

export class InfBase {
cores: Field = new Field({ metadata: { key: INF_CORES } });
threads: Field = new Field({ metadata: { key: INF_THREADS } });
mem: Field = new Field({ metadata: { key: INF_MEM } });
runtime: Field = new Field({ metadata: { key: INF_RUNTIME } });

Expand All @@ -38,7 +40,7 @@ export class InfVm extends InfBase {
}
export const InfVmKeys = InfBase.fields(
new InfVm(),
["cores", "mem", "storage", "runtime"]
["cores", "mem", "storage", "runtime", "threads"]
);

function getFields(obj: object, keys: string[]) {
Expand Down
5 changes: 3 additions & 2 deletions yajsapi/rest/market.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { suppress_exceptions, is_intermittent_error } from "./common";
dayjs.extend(utc);

type _ModelType = Pick<Model, "from_properties"> & Partial<Model>;
export type TerminationReason = { "message": string, "golem.requestor.code"?: string };

class View {
private _properties!: object;
Expand Down Expand Up @@ -81,9 +82,9 @@ export class Agreement {
}
}

async terminate(reason: string = "Finished"): Promise<boolean> {
async terminate(reason: TerminationReason = { "message": "Finished" }): Promise<boolean> {
try {
await this._api.terminateAgreement(this._id, { message: reason }, { timeout: 5000 });
await this._api.terminateAgreement(this._id, reason, { timeout: 5000 });
logger.debug(`Terminated agreement ${this._id}.`);
return true;
} catch (error) {
Expand Down
35 changes: 27 additions & 8 deletions yajsapi/utils/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import winston from "winston";
import { Callable } from "./";
import * as events from "../executor/events";

const REPORT_CONFIRMED_PROVIDERS_INTERVAL: number = 3000;

dayjs.extend(duration);

const event_type_to_string = {
Expand Down Expand Up @@ -96,6 +98,9 @@ class SummaryLogger {
// Set of confirmed proposal ids
confirmed_proposals!: Set<string>;

// Last number of confirmed providers
prev_confirmed_providers!: number;

// Maps agreement ids to provider infos
agreement_provider_info!: { [key: string]: ProviderInfo };

Expand All @@ -117,13 +122,17 @@ class SummaryLogger {
// Has computation finished?
finished!: boolean;

// Has Executor shut down?
shutdown_complete: boolean = false;

error_occurred!: boolean;

time_waiting_for_proposals;

constructor(wrapped_emitter: Callable<[events.YaEvent], void> | null = null) {
this._wrapped_emitter = wrapped_emitter;
this._reset();
this._print_confirmed_providers();
}

_reset(): void {
Expand All @@ -139,6 +148,7 @@ class SummaryLogger {
this.finished = false;
this.error_occurred = false;
this.time_waiting_for_proposals = dayjs.duration(0);
this.prev_confirmed_providers = 0;
}

_print_cost(): void {
Expand All @@ -156,6 +166,21 @@ class SummaryLogger {
console.table(results);
}

_print_confirmed_providers(): void {
const confirmed_providers = new Set(
[...this.confirmed_proposals].map(
(prop_id) => this.received_proposals[prop_id]
)
);
if (this.prev_confirmed_providers < confirmed_providers.size) {
logger.info(`Received proposals from ${confirmed_providers.size} providers so far`);
this.prev_confirmed_providers = confirmed_providers.size;
}
if (!this.shutdown_complete) {
setTimeout(() => this._print_confirmed_providers(), REPORT_CONFIRMED_PROVIDERS_INTERVAL);
}
}

log(event: events.YaEvent): void {
// """Register an event."""

Expand All @@ -181,14 +206,6 @@ class SummaryLogger {
this.received_proposals[event["prop_id"]] = event["provider_id"];
else if (eventName === events.ProposalConfirmed.name) {
this.confirmed_proposals.add(event["prop_id"]);
const confirmed_providers = new Set(
[...this.confirmed_proposals].map(
(prop_id) => this.received_proposals[prop_id]
)
);
logger.info(
`Received proposals from ${confirmed_providers.size} providers so far`
);
} else if (eventName === events.NoProposalsConfirmed.name) {
this.time_waiting_for_proposals = this.time_waiting_for_proposals.add({
millisecond: parseInt(event["timeout"]),
Expand Down Expand Up @@ -337,6 +354,8 @@ class SummaryLogger {
logger.debug(
`Queued payment for agreement ${event["agr_id"].substr(0, 17)}`
);
} else if (eventName === events.ShutdownFinished.name) {
this.shutdown_complete = true;
}
}
}
Expand Down

0 comments on commit 44e3fa3

Please sign in to comment.