Skip to content

Commit

Permalink
[ws-man] Add started and completed metrics to track health
Browse files Browse the repository at this point in the history
  • Loading branch information
easyCZ committed Apr 20, 2022
1 parent 2abc2b7 commit 3110629
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 72 deletions.
176 changes: 138 additions & 38 deletions components/ws-manager-bridge/src/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,38 @@

import { inject, injectable, interfaces } from "inversify";
import { MessageBusIntegration } from "./messagebus-integration";
import { Disposable, WorkspaceInstance, Queue, WorkspaceInstancePort, PortVisibility, RunningWorkspaceInfo, DisposableCollection } from "@gitpod/gitpod-protocol";
import { WorkspaceStatus, WorkspacePhase, GetWorkspacesRequest, WorkspaceConditionBool, PortVisibility as WsManPortVisibility, PromisifiedWorkspaceManagerClient } from "@gitpod/ws-manager/lib";
import {
Disposable,
DisposableCollection,
PortVisibility,
Queue,
RunningWorkspaceInfo,
WorkspaceInstance,
WorkspaceInstancePort,
} from "@gitpod/gitpod-protocol";
import {
GetWorkspacesRequest,
PortVisibility as WsManPortVisibility,
PromisifiedWorkspaceManagerClient,
WorkspaceConditionBool,
WorkspacePhase,
WorkspaceStatus,
WorkspaceType,
} from "@gitpod/ws-manager/lib";
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
import { UserDB } from "@gitpod/gitpod-db/lib/user-db";
import { log } from '@gitpod/gitpod-protocol/lib/util/logging';
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics";
import { TracedWorkspaceDB, TracedUserDB, DBWithTracing } from '@gitpod/gitpod-db/lib/traced-db';
import { DBWithTracing, TracedUserDB, TracedWorkspaceDB } from "@gitpod/gitpod-db/lib/traced-db";
import { PrometheusMetricsExporter } from "./prometheus-metrics-exporter";
import { ClientProvider, WsmanSubscriber } from "./wsman-subscriber";
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
import { Configuration } from "./config";
import { WorkspaceCluster } from "@gitpod/gitpod-protocol/lib/workspace-cluster";
import { repeat } from "@gitpod/gitpod-protocol/lib/util/repeat";
import { PreparingUpdateEmulator, PreparingUpdateEmulatorFactory } from "./preparing-update-emulator";
import { performance } from "perf_hooks";

export const WorkspaceManagerBridgeFactory = Symbol("WorkspaceManagerBridgeFactory");

Expand Down Expand Up @@ -71,7 +88,7 @@ export class WorkspaceManagerBridge implements Disposable {
log.debug(`Starting status update handler: ${cluster.name}`, logPayload);
/* no await */ this.startStatusUpdateHandler(clientProvider, writeToDB, logPayload)
// this is a mere safe-guard: we do not expect the code inside to fail
.catch(err => log.error("Cannot start status update handler", err));
.catch((err) => log.error("Cannot start status update handler", err));
};

if (cluster.govern) {
Expand Down Expand Up @@ -102,20 +119,41 @@ export class WorkspaceManagerBridge implements Disposable {
this.dispose();
}

protected async startStatusUpdateHandler(clientProvider: ClientProvider, writeToDB: boolean, logPayload: {}): Promise<void> {
protected async startStatusUpdateHandler(
clientProvider: ClientProvider,
writeToDB: boolean,
logPayload: {},
): Promise<void> {
const subscriber = new WsmanSubscriber(clientProvider);
this.disposables.push(subscriber);

const onReconnect = (ctx: TraceContext, s: WorkspaceStatus[]) => {
s.forEach(sx => this.serializeMessagesByInstanceId<WorkspaceStatus>(ctx, sx, m => m.getId(), (ctx, msg) => this.handleStatusUpdate(ctx, msg, writeToDB)))
s.forEach((sx) =>
this.serializeMessagesByInstanceId<WorkspaceStatus>(
ctx,
sx,
(m) => m.getId(),
(ctx, msg) => this.handleStatusUpdate(ctx, msg, writeToDB),
),
);
};
const onStatusUpdate = (ctx: TraceContext, s: WorkspaceStatus) => {
this.serializeMessagesByInstanceId<WorkspaceStatus>(ctx, s, msg => msg.getId(), (ctx, s) => this.handleStatusUpdate(ctx, s, writeToDB))
this.serializeMessagesByInstanceId<WorkspaceStatus>(
ctx,
s,
(msg) => msg.getId(),
(ctx, s) => this.handleStatusUpdate(ctx, s, writeToDB),
);
};
await subscriber.subscribe({ onReconnect, onStatusUpdate }, logPayload);
}

protected serializeMessagesByInstanceId<M>(ctx: TraceContext, msg: M, getInstanceId: (msg: M) => string, handler: (ctx: TraceContext, msg: M) => Promise<void>) {
protected serializeMessagesByInstanceId<M>(
ctx: TraceContext,
msg: M,
getInstanceId: (msg: M) => string,
handler: (ctx: TraceContext, msg: M) => Promise<void>,
) {
const instanceId = getInstanceId(msg);
if (!instanceId) {
log.warn("Received invalid message, could not read instanceId!", { msg });
Expand All @@ -125,18 +163,48 @@ export class WorkspaceManagerBridge implements Disposable {
// We can't just handle the status update directly, but have to "serialize" it to ensure the updates stay in order.
// If we did not do this, the async nature of our code would allow for one message to overtake the other.
let q = this.queues.get(instanceId) || new Queue();
q.enqueue(() => handler(ctx, msg)).catch(e => log.error({instanceId}, e));
q.enqueue(() => handler(ctx, msg)).catch((e) => log.error({ instanceId }, e));
this.queues.set(instanceId, q);
}

protected async handleStatusUpdate(ctx: TraceContext, rawStatus: WorkspaceStatus, writeToDB: boolean) {
const start = performance.now();
const status = rawStatus.toObject();
log.info("Handling WorkspaceStatus update", status);

if (!status.spec || !status.metadata || !status.conditions) {
log.warn("Received invalid status update", status);
return;
}

try {
await this.statusUpdate(ctx, rawStatus, writeToDB);
} catch (e) {
const durationMs = performance.now() - start;
this.prometheusExporter.reportWorkspaceInstanceUpdateCompleted(
durationMs / 1000,
this.cluster.name,
status.spec.type,
e,
);
} finally {
const durationMs = performance.now() - start;
this.prometheusExporter.reportWorkspaceInstanceUpdateCompleted(
durationMs / 1000,
this.cluster.name,
status.spec.type,
);
}
}

private async statusUpdate(ctx: TraceContext, rawStatus: WorkspaceStatus, writeToDB: boolean) {
const status = rawStatus.toObject();
log.info("Handling WorkspaceStatus update", status);

if (!status.spec || !status.metadata || !status.conditions) {
return;
}

const span = TraceContext.startSpan("handleStatusUpdate", ctx);
span.setTag("status", JSON.stringify(filterStatus(status)));
span.setTag("writeToDB", writeToDB);
Expand All @@ -149,7 +217,7 @@ export class WorkspaceManagerBridge implements Disposable {
const userId = status.metadata!.owner!;
const logCtx = { instanceId, workspaceId, userId };

const instance = await this.workspaceDB.trace({span}).findInstanceById(instanceId);
const instance = await this.workspaceDB.trace({ span }).findInstanceById(instanceId);
if (instance) {
this.prometheusExporter.statusUpdateReceived(this.cluster.name, true);
} else {
Expand All @@ -159,7 +227,7 @@ export class WorkspaceManagerBridge implements Disposable {
}

if (!!status.spec.exposedPortsList) {
instance.status.exposedPorts = status.spec.exposedPortsList.map(p => {
instance.status.exposedPorts = status.spec.exposedPortsList.map((p) => {
return <WorkspaceInstancePort>{
port: p.port,
visibility: mapPortVisibility(p.visibility),
Expand All @@ -180,7 +248,9 @@ export class WorkspaceManagerBridge implements Disposable {
instance.status.conditions.pullingImages = toBool(status.conditions.pullingImages!);
instance.status.conditions.deployed = toBool(status.conditions.deployed);
instance.status.conditions.timeout = status.conditions.timeout;
instance.status.conditions.firstUserActivity = mapFirstUserActivity(rawStatus.getConditions()!.getFirstUserActivity());
instance.status.conditions.firstUserActivity = mapFirstUserActivity(
rawStatus.getConditions()!.getFirstUserActivity(),
);
instance.status.conditions.headlessTaskFailed = status.conditions.headlessTaskFailed;
instance.status.conditions.stoppedByRequest = toBool(status.conditions.stoppedByRequest);
instance.status.message = status.message;
Expand All @@ -191,7 +261,7 @@ export class WorkspaceManagerBridge implements Disposable {

if (status.repo) {
const r = status.repo;
const undefinedIfEmpty = <T>(l: T[]) => l.length > 0 ? l : undefined;
const undefinedIfEmpty = <T>(l: T[]) => (l.length > 0 ? l : undefined);

instance.status.repo = {
branch: r.branch,
Expand All @@ -201,8 +271,8 @@ export class WorkspaceManagerBridge implements Disposable {
unpushedCommits: undefinedIfEmpty(r.unpushedCommitsList),
totalUntrackedFiles: r.totalUntrackedFiles,
untrackedFiles: undefinedIfEmpty(r.untrackedFilesList),
totalUnpushedCommits: r.totalUnpushedCommits
}
totalUnpushedCommits: r.totalUnpushedCommits,
};
}

if (instance.status.conditions.deployed && !instance.deployedTime) {
Expand Down Expand Up @@ -238,7 +308,7 @@ export class WorkspaceManagerBridge implements Disposable {
instance.status.phase = "interrupted";
break;
case WorkspacePhase.STOPPING:
if (instance.status.phase != 'stopped') {
if (instance.status.phase != "stopped") {
instance.status.phase = "stopping";
if (!instance.stoppingTime) {
// The first time a workspace enters stopping we record that as it's stoppingTime time.
Expand All @@ -259,14 +329,14 @@ export class WorkspaceManagerBridge implements Disposable {
// yet. Just for this case we need to set it now.
instance.stoppingTime = now;
}
lifecycleHandler = () => this.onInstanceStopped({span}, userId, instance);
lifecycleHandler = () => this.onInstanceStopped({ span }, userId, instance);
break;
}

span.setTag("after", JSON.stringify(instance));

// now notify all prebuild listeners about updates - and update DB if needed
await this.updatePrebuiltWorkspace({span}, userId, status, writeToDB);
await this.updatePrebuiltWorkspace({ span }, userId, status, writeToDB);

if (writeToDB) {
await this.workspaceDB.trace(ctx).storeInstance(instance);
Expand All @@ -280,56 +350,78 @@ export class WorkspaceManagerBridge implements Disposable {
}
}
await this.messagebus.notifyOnInstanceUpdate(ctx, userId, instance);

} catch (e) {
TraceContext.setError({span}, e);
TraceContext.setError({ span }, e);
throw e;
} finally {
span.finish();
}
}

protected startController(clientProvider: ClientProvider, controllerIntervalSeconds: number, controllerMaxDisconnectSeconds: number, maxTimeToRunningPhaseSeconds = 60 * 60) {
protected startController(
clientProvider: ClientProvider,
controllerIntervalSeconds: number,
controllerMaxDisconnectSeconds: number,
maxTimeToRunningPhaseSeconds = 60 * 60,
) {
let disconnectStarted = Number.MAX_SAFE_INTEGER;
this.disposables.push(
repeat(async () => {
try {
const client = await clientProvider();
await this.controlInstallationInstances(client, maxTimeToRunningPhaseSeconds);

disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period
disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period
} catch (e) {
if (durationLongerThanSeconds(disconnectStarted, controllerMaxDisconnectSeconds)) {
log.warn("Error while controlling installation's workspaces", e, { installation: this.cluster.name });
log.warn("Error while controlling installation's workspaces", e, {
installation: this.cluster.name,
});
} else if (disconnectStarted > Date.now()) {
disconnectStarted = Date.now();
}
}
}, controllerIntervalSeconds * 1000)
}, controllerIntervalSeconds * 1000),
);
}

protected async controlInstallationInstances(client: PromisifiedWorkspaceManagerClient, maxTimeToRunningPhaseSeconds: number) {
protected async controlInstallationInstances(
client: PromisifiedWorkspaceManagerClient,
maxTimeToRunningPhaseSeconds: number,
) {
const installation = this.cluster.name;
log.debug("controlling instances", { installation });
let ctx: TraceContext = {};

const runningInstances = await this.workspaceDB.trace(ctx).findRunningInstancesWithWorkspaces(installation);
const runningInstancesIdx = new Map<string, RunningWorkspaceInfo>();
runningInstances.forEach(i => runningInstancesIdx.set(i.latestInstance.id, i));
runningInstances.forEach((i) => runningInstancesIdx.set(i.latestInstance.id, i));

const actuallyRunningInstances = await client.getWorkspaces(ctx, new GetWorkspacesRequest());
actuallyRunningInstances.getStatusList().forEach(s => runningInstancesIdx.delete(s.getId()));
actuallyRunningInstances.getStatusList().forEach((s) => runningInstancesIdx.delete(s.getId()));

const promises: Promise<any>[] = [];
for (const [instanceId, ri] of runningInstancesIdx.entries()) {
const instance = ri.latestInstance;
if (!(instance.status.phase === 'running' || durationLongerThanSeconds(Date.parse(instance.creationTime), maxTimeToRunningPhaseSeconds))) {
log.debug({ instanceId }, "Skipping instance", { phase: instance.status.phase, creationTime: instance.creationTime, region: instance.region });
if (
!(
instance.status.phase === "running" ||
durationLongerThanSeconds(Date.parse(instance.creationTime), maxTimeToRunningPhaseSeconds)
)
) {
log.debug({ instanceId }, "Skipping instance", {
phase: instance.status.phase,
creationTime: instance.creationTime,
region: instance.region,
});
continue;
}

log.info({instanceId, workspaceId: instance.workspaceId}, "Database says the instance is starting for too long or running, but wsman does not know about it. Marking as stopped in database.", {installation});
log.info(
{ instanceId, workspaceId: instance.workspaceId },
"Database says the instance is starting for too long or running, but wsman does not know about it. Marking as stopped in database.",
{ installation },
);
instance.status.phase = "stopped";
instance.stoppingTime = new Date().toISOString();
instance.stoppedTime = new Date().toISOString();
Expand All @@ -344,27 +436,36 @@ export class WorkspaceManagerBridge implements Disposable {
// probes are an EE feature - we just need the hook here
}

protected async updatePrebuiltWorkspace(ctx: TraceContext, userId: string, status: WorkspaceStatus.AsObject, writeToDB: boolean) {
protected async updatePrebuiltWorkspace(
ctx: TraceContext,
userId: string,
status: WorkspaceStatus.AsObject,
writeToDB: boolean,
) {
// prebuilds are an EE feature - we just need the hook here
}

protected async stopPrebuildInstance(ctx: TraceContext, instance: WorkspaceInstance): Promise<void> {
// prebuilds are an EE feature - we just need the hook here
}

protected async onInstanceStopped(ctx: TraceContext, ownerUserID: string, instance: WorkspaceInstance): Promise<void> {
protected async onInstanceStopped(
ctx: TraceContext,
ownerUserID: string,
instance: WorkspaceInstance,
): Promise<void> {
const span = TraceContext.startSpan("onInstanceStopped", ctx);

try {
await this.userDB.trace({span}).deleteGitpodTokensNamedLike(ownerUserID, `${instance.id}-%`);
await this.userDB.trace({ span }).deleteGitpodTokensNamedLike(ownerUserID, `${instance.id}-%`);
this.analytics.track({
userId: ownerUserID,
event: "workspace_stopped",
messageId: `bridge-wsstopped-${instance.id}`,
properties: { "instanceId": instance.id, "workspaceId": instance.workspaceId }
properties: { instanceId: instance.id, workspaceId: instance.workspaceId },
});
} catch (err) {
TraceContext.setError({span}, err);
TraceContext.setError({ span }, err);
throw err;
} finally {
span.finish();
Expand All @@ -374,7 +475,6 @@ export class WorkspaceManagerBridge implements Disposable {
public dispose() {
this.disposables.dispose();
}

}

const mapFirstUserActivity = (firstUserActivity: Timestamp | undefined): string | undefined => {
Expand Down Expand Up @@ -413,4 +513,4 @@ const filterStatus = (status: WorkspaceStatus.AsObject): Partial<WorkspaceStatus
conditions: status.conditions,
runtime: status.runtime,
};
}
};
Loading

0 comments on commit 3110629

Please sign in to comment.