Skip to content

Commit

Permalink
[bridge] Add tracing for Bridge.controlInstances
Browse files Browse the repository at this point in the history
  • Loading branch information
geropl committed Jun 23, 2022
1 parent df32c3b commit 59c02a3
Showing 1 changed file with 96 additions and 55 deletions.
151 changes: 96 additions & 55 deletions components/ws-manager-bridge/src/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import {
GetWorkspacesRequest,
WorkspaceConditionBool,
PortVisibility as WsManPortVisibility,
PromisifiedWorkspaceManagerClient,
} from "@gitpod/ws-manager/lib";
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
import { UserDB } from "@gitpod/gitpod-db/lib/user-db";
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
import { log, LogContext } from "@gitpod/gitpod-protocol/lib/util/logging";
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics";
import { TracedWorkspaceDB, TracedUserDB, DBWithTracing } from "@gitpod/gitpod-db/lib/traced-db";
Expand Down Expand Up @@ -99,12 +98,14 @@ export class WorkspaceManagerBridge implements Disposable {
startStatusUpdateHandler(true);

// the actual "governing" part
const controllerInterval = this.config.controllerIntervalSeconds;
if (controllerInterval <= 0) {
throw new Error("controllerInterval <= 0!");
const controllerIntervalSeconds = this.config.controllerIntervalSeconds;
if (controllerIntervalSeconds <= 0) {
throw new Error("controllerIntervalSeconds <= 0!");
}

log.debug(`Starting controller: ${cluster.name}`, logPayload);
this.startController(clientProvider, controllerInterval, this.config.controllerMaxDisconnectSeconds);
// Control all workspace instances, either against ws-manager or configured timeouts
this.startController(clientProvider, controllerIntervalSeconds, this.config.controllerMaxDisconnectSeconds);
} else {
// _DO NOT_ update the DB (another bridge is responsible for that)
// Still, listen to all updates, generate/derive new state and distribute it locally!
Expand Down Expand Up @@ -408,69 +409,109 @@ export class WorkspaceManagerBridge implements Disposable {
let disconnectStarted = Number.MAX_SAFE_INTEGER;
this.disposables.push(
repeat(async () => {
const span = TraceContext.startSpan("controlInstances");
const ctx = { span };
try {
const client = await clientProvider();
await this.controlInstallationInstances(client, maxTimeToRunningPhaseSeconds);

disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period
} catch (e) {
if (durationLongerThanSeconds(disconnectStarted, controllerMaxDisconnectSeconds)) {
log.warn("Error while controlling installation's workspaces", e, {
installation: this.cluster.name,
});
} else if (disconnectStarted > Date.now()) {
disconnectStarted = Date.now();
const installation = this.cluster.name;
log.debug("Controlling instances...", { installation });

const runningInstances = await this.workspaceDB
.trace(ctx)
.findRunningInstancesWithWorkspaces(installation, undefined, true);

// Control running workspace instances against ws-manager
try {
await this.controlRunningInstances(
ctx,
runningInstances,
clientProvider,
maxTimeToRunningPhaseSeconds,
);

disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period
} catch (err) {
if (durationLongerThanSeconds(disconnectStarted, controllerMaxDisconnectSeconds)) {
log.warn("Error while controlling installation's workspaces", err, {
installation: this.cluster.name,
});
} else if (disconnectStarted > Date.now()) {
disconnectStarted = Date.now();
}
}

log.debug("Done controlling instances.", { installation });
} catch (err) {
TraceContext.setError(ctx, err);
log.error("Error while controlling installation's workspaces", err, {
installation: this.cluster.name,
});
} finally {
span.finish();
}
}, controllerIntervalSeconds * 1000),
);
}

protected async controlInstallationInstances(
client: PromisifiedWorkspaceManagerClient,
protected async controlRunningInstances(
parentCtx: TraceContext,
runningInstances: RunningWorkspaceInfo[],
clientProvider: ClientProvider,
maxTimeToRunningPhaseSeconds: number,
) {
const installation = this.cluster.name;
log.debug("controlling instances", { installation });
let ctx: TraceContext = {};
const installation = this.config.installation;

const runningInstances = await this.workspaceDB.trace(ctx).findRunningInstancesWithWorkspaces(installation);
const runningInstancesIdx = new Map<string, RunningWorkspaceInfo>();
runningInstances.forEach((i) => runningInstancesIdx.set(i.latestInstance.id, i));

const actuallyRunningInstances = await client.getWorkspaces(ctx, new GetWorkspacesRequest());
actuallyRunningInstances.getStatusList().forEach((s) => runningInstancesIdx.delete(s.getId()));
const span = TraceContext.startSpan("controlRunningInstances", parentCtx);
const ctx = { span };
try {
log.debug("Controlling running instances...", { installation });

const runningInstancesIdx = new Map<string, RunningWorkspaceInfo>();
runningInstances.forEach((i) => runningInstancesIdx.set(i.latestInstance.id, i));

const client = await clientProvider();
const actuallyRunningInstances = await client.getWorkspaces(ctx, new GetWorkspacesRequest());
actuallyRunningInstances.getStatusList().forEach((s) => runningInstancesIdx.delete(s.getId()));

for (const [instanceId, ri] of runningInstancesIdx.entries()) {
const instance = ri.latestInstance;
if (
!(
instance.status.phase === "running" ||
durationLongerThanSeconds(Date.parse(instance.creationTime), maxTimeToRunningPhaseSeconds)
)
) {
log.debug({ instanceId }, "Skipping instance", {
phase: instance.status.phase,
creationTime: instance.creationTime,
region: instance.region,
});
continue;
}

const promises: Promise<any>[] = [];
for (const [instanceId, ri] of runningInstancesIdx.entries()) {
const instance = ri.latestInstance;
if (
!(
instance.status.phase === "running" ||
durationLongerThanSeconds(Date.parse(instance.creationTime), maxTimeToRunningPhaseSeconds)
)
) {
log.debug({ instanceId }, "Skipping instance", {
phase: instance.status.phase,
creationTime: instance.creationTime,
region: instance.region,
});
continue;
log.info(
{ instanceId, workspaceId: instance.workspaceId },
"Database says the instance is running, but wsman does not know about it. Marking as stopped in database.",
{ installation },
);
await this.markWorkspaceInstanceAsStopped(ctx, ri, new Date());
}

log.info(
{ instanceId, workspaceId: instance.workspaceId },
"Database says the instance is starting for too long or running, but wsman does not know about it. Marking as stopped in database.",
{ installation },
);
instance.status.phase = "stopped";
instance.stoppingTime = new Date().toISOString();
instance.stoppedTime = new Date().toISOString();
promises.push(this.workspaceDB.trace({}).storeInstance(instance));
promises.push(this.onInstanceStopped({}, ri.workspace.ownerId, instance));
promises.push(this.prebuildUpdater.stopPrebuildInstance(ctx, instance));
log.debug("Done controlling running instances.", { installation });
} catch (err) {
TraceContext.setError(ctx, err);
throw err; // required by caller
}
await Promise.all(promises);
}

protected async markWorkspaceInstanceAsStopped(ctx: TraceContext, info: RunningWorkspaceInfo, now: Date) {
const nowISO = now.toISOString();
info.latestInstance.stoppingTime = nowISO;
info.latestInstance.stoppedTime = nowISO;
info.latestInstance.status.phase = "stopped";
await this.workspaceDB.trace(ctx).storeInstance(info.latestInstance);

await this.messagebus.notifyOnInstanceUpdate(ctx, info.workspace.ownerId, info.latestInstance);
await this.prebuildUpdater.stopPrebuildInstance(ctx, info.latestInstance);
}

protected async onInstanceStopped(
Expand Down

0 comments on commit 59c02a3

Please sign in to comment.