Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
geropl committed Jun 23, 2022
1 parent f9b52aa commit f444050
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 105 deletions.
208 changes: 112 additions & 96 deletions components/ws-manager-bridge/src/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import {
GetWorkspacesRequest,
WorkspaceConditionBool,
PortVisibility as WsManPortVisibility,
PromisifiedWorkspaceManagerClient,
} from "@gitpod/ws-manager/lib";
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
import { UserDB } from "@gitpod/gitpod-db/lib/user-db";
Expand Down Expand Up @@ -99,21 +98,17 @@ export class WorkspaceManagerBridge implements Disposable {
startStatusUpdateHandler(true);

// the actual "governing" part
const controllerInterval = this.config.controllerIntervalSeconds;
if (controllerInterval <= 0) {
throw new Error("controllerInterval <= 0!");
const { controllerIntervalSeconds, controllerMaxDisconnectSeconds } = this.config.timeouts;
if (controllerIntervalSeconds <= 0) {
throw new Error("controllerIntervalSeconds <= 0!");
}
if (controllerMaxDisconnectSeconds <= 0) {
throw new Error("controllerMaxDisconnectSeconds <= 0!");
}

log.debug(`Starting controllers: ${cluster.name}`, logPayload);
// Control all "running" workspace instances
this.startWsManagerController(
clientProvider,
controllerInterval,
this.config.controllerMaxDisconnectSeconds,
);

// Control all workspace instances that only live in the DB
this.startDBController(this.config.timeouts.controllerIntervalSeconds);
log.debug(`Starting controller: ${cluster.name}`, logPayload);
// Control all workspace instances, either against ws-manager or configured timeouts
this.startController(clientProvider, controllerIntervalSeconds, controllerMaxDisconnectSeconds);
} else {
// _DO NOT_ update the DB (another bridge is responsible for that)
// Still, listen to all updates, generate/derive new state and distribute it locally!
Expand Down Expand Up @@ -408,100 +403,118 @@ export class WorkspaceManagerBridge implements Disposable {
}
}

protected startWsManagerController(
protected startController(
clientProvider: ClientProvider,
controllerIntervalSeconds: number,
controllerMaxDisconnectSeconds: number,
) {
let disconnectStarted = Number.MAX_SAFE_INTEGER;
this.disposables.push(
repeat(async () => {
const span = TraceContext.startSpan("controlInstances");
const ctx = { span };
try {
const client = await clientProvider();
await this.controlWsManagerInstances(client);

disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period
} catch (e) {
if (durationLongerThanSeconds(disconnectStarted, controllerMaxDisconnectSeconds)) {
log.warn("Error while controlling installation's workspaces", e, {
installation: this.cluster.name,
});
} else if (disconnectStarted > Date.now()) {
disconnectStarted = Date.now();
const installation = this.cluster.name;
log.debug("Controlling instances...", { installation });

const runningInstances = await this.workspaceDB
.trace(ctx)
.findRunningInstancesWithWorkspaces(installation, undefined, true);

// Control running workspace instances against ws-manager
try {
await this.controlRunningInstances(ctx, runningInstances, clientProvider);

disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period
} catch (err) {
if (durationLongerThanSeconds(disconnectStarted, controllerMaxDisconnectSeconds)) {
log.warn("Error while controlling installation's workspaces", err, {
installation: this.cluster.name,
});
} else if (disconnectStarted > Date.now()) {
disconnectStarted = Date.now();
}
}

// Control workspace instances against timeouts
await this.controlInstanceTimeouts(ctx, runningInstances);

log.debug("Done controlling instances.", { installation });
} catch (err) {
TraceContext.setError(ctx, err);
log.error("Error while controlling installation's workspaces", err, {
installation: this.cluster.name,
});
} finally {
span.finish();
}
}, controllerIntervalSeconds * 1000),
);
}
protected async controlRunningInstances(
parentCtx: TraceContext,
runningInstances: RunningWorkspaceInfo[],
clientProvider: ClientProvider,
) {
const installation = this.config.installation;

protected async controlWsManagerInstances(client: PromisifiedWorkspaceManagerClient) {
const installation = this.cluster.name;
log.debug("controlling instances", { installation });
let ctx: TraceContext = {};

const runningInstances = await this.workspaceDB.trace(ctx).findRunningInstancesWithWorkspaces(installation);
const runningInstancesIdx = new Map<string, RunningWorkspaceInfo>();
runningInstances.forEach((i) => runningInstancesIdx.set(i.latestInstance.id, i));

const actuallyRunningInstances = await client.getWorkspaces(ctx, new GetWorkspacesRequest());
actuallyRunningInstances.getStatusList().forEach((s) => runningInstancesIdx.delete(s.getId()));

const promises: Promise<any>[] = [];
for (const [instanceId, ri] of runningInstancesIdx.entries()) {
const instance = ri.latestInstance;
if (instance.status.phase !== "running") {
log.debug({ instanceId }, "Skipping instance", {
phase: instance.status.phase,
creationTime: instance.creationTime,
region: instance.region,
});
continue;
const span = TraceContext.startSpan("controlRunningInstances", parentCtx);
const ctx = { span };
try {
log.debug("Controlling running instances...", { installation });

const runningInstancesIdx = new Map<string, RunningWorkspaceInfo>();
runningInstances.forEach((i) => runningInstancesIdx.set(i.latestInstance.id, i));

const client = await clientProvider();
const actuallyRunningInstances = await client.getWorkspaces(ctx, new GetWorkspacesRequest());
actuallyRunningInstances.getStatusList().forEach((s) => runningInstancesIdx.delete(s.getId()));

for (const [instanceId, ri] of runningInstancesIdx.entries()) {
const instance = ri.latestInstance;
if (instance.status.phase !== "running") {
log.debug({ instanceId }, "Skipping instance", {
phase: instance.status.phase,
creationTime: instance.creationTime,
region: instance.region,
});
continue;
}

log.info(
{ instanceId, workspaceId: instance.workspaceId },
"Database says the instance is running, but wsman does not know about it. Marking as stopped in database.",
{ installation: this.config.installation },
);
await this.stopWorkspaceInstance(ctx, ri, new Date());
}

log.info(
{ instanceId, workspaceId: instance.workspaceId },
"Database says the instance is running, but wsman does not know about it. Marking as stopped in database.",
{ installation },
);
instance.status.phase = "stopped";
instance.stoppingTime = new Date().toISOString();
instance.stoppedTime = new Date().toISOString();
promises.push(
(async () => {
await this.workspaceDB.trace({}).storeInstance(instance);
await this.onInstanceStopped({}, ri.workspace.ownerId, instance);
await this.prebuildUpdater.stopPrebuildInstance(ctx, instance);
})(),
);
log.debug("Done controlling running instances.", { installation });
} catch (err) {
TraceContext.setError(ctx, err);
throw err; // required by caller
}
await Promise.all(promises);
}

protected startDBController(controllerIntervalSeconds: number) {
protected async controlInstanceTimeouts(parentCtx: TraceContext, runningInstances: RunningWorkspaceInfo[]) {
const installation = this.config.installation;

this.disposables.push(
repeat(async () => {
const span = TraceContext.startSpan("controlDBInstances");
const ctx = { span };
try {
log.debug("controlling DB instances", { installation });
const span = TraceContext.startSpan("controlDBInstances", parentCtx);
const ctx = { span };
try {
log.debug("Controlling DB instances...", { installation });

const infos = await this.workspaceDB
.trace(ctx)
.findRunningInstancesWithWorkspaces(installation, undefined, true);
await Promise.all(runningInstances.map((info) => this.controlDBInstance(ctx, info)));

await Promise.all(infos.map((info) => this.controlDBInstance(ctx, info)));
} catch (err) {
log.error("Error while running controlDBInstances", err, {
installation: this.cluster.name,
});
TraceContext.setError(ctx, err);
} finally {
span.finish();
}
}, controllerIntervalSeconds * 1000),
);
log.debug("Done controlling DB instances.", { installation });
} catch (err) {
log.error("Error while running controlDBInstances", err, {
installation: this.cluster.name,
});
TraceContext.setError(ctx, err);
} finally {
span.finish();
}
}

/**
Expand All @@ -514,13 +527,13 @@ export class WorkspaceManagerBridge implements Disposable {
* - unknown (fallback)
* @param info
*/
protected async controlDBInstance(_ctx: TraceContext, info: RunningWorkspaceInfo) {
protected async controlDBInstance(parentCtx: TraceContext, info: RunningWorkspaceInfo) {
const logContext: LogContext = {
userId: info.workspace.ownerId,
workspaceId: info.workspace.id,
instanceId: info.latestInstance.id,
};
const ctx = TraceContext.childContext("controlDBInstance", _ctx);
const ctx = TraceContext.childContext("controlDBInstance", parentCtx);
try {
const now = Date.now();
const creationTime = new Date(info.latestInstance.creationTime).getTime();
Expand Down Expand Up @@ -549,15 +562,7 @@ export class WorkspaceManagerBridge implements Disposable {
creationTime,
currentPhase,
});

const nowISO = new Date(now).toISOString();
info.latestInstance.stoppingTime = nowISO;
info.latestInstance.stoppedTime = nowISO;
info.latestInstance.status.phase = "stopped";
await this.workspaceDB.trace(ctx).storeInstance(info.latestInstance);
await this.messagebus.notifyOnInstanceUpdate(ctx, info.workspace.ownerId, info.latestInstance);

await this.prebuildUpdater.stopPrebuildInstance(ctx, info.latestInstance);
await this.stopWorkspaceInstance(ctx, info, new Date(now));
}
} catch (err) {
log.warn(logContext, "Controller: Error while marking workspace instance as stopped", err);
Expand All @@ -567,6 +572,17 @@ export class WorkspaceManagerBridge implements Disposable {
}
}

protected async stopWorkspaceInstance(ctx: TraceContext, info: RunningWorkspaceInfo, now: Date) {
const nowISO = now.toISOString();
info.latestInstance.stoppingTime = nowISO;
info.latestInstance.stoppedTime = nowISO;
info.latestInstance.status.phase = "stopped";
await this.workspaceDB.trace(ctx).storeInstance(info.latestInstance);

await this.messagebus.notifyOnInstanceUpdate(ctx, info.workspace.ownerId, info.latestInstance);
await this.prebuildUpdater.stopPrebuildInstance(ctx, info.latestInstance);
}

protected async onInstanceStopped(
ctx: TraceContext,
ownerUserID: string,
Expand Down
14 changes: 5 additions & 9 deletions components/ws-manager-bridge/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ export interface Configuration {
// The interval in which fresh WorkspaceCluster-state is polled from the DB
wsClusterDBReconcileIntervalSeconds: number;

// controllerIntervalSeconds configures how often we check for invalid workspace states
controllerIntervalSeconds: number;

// controllerMaxDisconnect configures how long the controller may be disconnected from ws-manager before it emits a warning
controllerMaxDisconnectSeconds: number;

// maxTimeToRunningPhaseSeconds is the time that we are willing to give a workspce instance in which it has to reach a running state
maxTimeToRunningPhaseSeconds: number;

// timeouts configures the timeout behaviour of pre-workspace cluster workspaces
timeouts: {
// controllerIntervalSeconds configures how often we check for invalid workspace states
controllerIntervalSeconds: number;

// controllerMaxDisconnect configures how long the controller may be disconnected from ws-manager before it emits a warning
controllerMaxDisconnectSeconds: number;

preparingPhaseSeconds: number;
buildingPhaseSeconds: number;
stoppingPhaseSeconds: number;
Expand Down

0 comments on commit f444050

Please sign in to comment.