diff --git a/components/ws-manager-bridge/src/bridge.ts b/components/ws-manager-bridge/src/bridge.ts index 39273922595d25..848e2d5631de63 100644 --- a/components/ws-manager-bridge/src/bridge.ts +++ b/components/ws-manager-bridge/src/bridge.ts @@ -21,11 +21,10 @@ import { GetWorkspacesRequest, WorkspaceConditionBool, PortVisibility as WsManPortVisibility, - PromisifiedWorkspaceManagerClient, } from "@gitpod/ws-manager/lib"; import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db"; import { UserDB } from "@gitpod/gitpod-db/lib/user-db"; -import { log } from "@gitpod/gitpod-protocol/lib/util/logging"; +import { log, LogContext } from "@gitpod/gitpod-protocol/lib/util/logging"; import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing"; import { IAnalyticsWriter } from "@gitpod/gitpod-protocol/lib/analytics"; import { TracedWorkspaceDB, TracedUserDB, DBWithTracing } from "@gitpod/gitpod-db/lib/traced-db"; @@ -99,12 +98,14 @@ export class WorkspaceManagerBridge implements Disposable { startStatusUpdateHandler(true); // the actual "governing" part - const controllerInterval = this.config.controllerIntervalSeconds; - if (controllerInterval <= 0) { - throw new Error("controllerInterval <= 0!"); + const controllerIntervalSeconds = this.config.controllerIntervalSeconds; + if (controllerIntervalSeconds <= 0) { + throw new Error("controllerIntervalSeconds <= 0!"); } + log.debug(`Starting controller: ${cluster.name}`, logPayload); - this.startController(clientProvider, controllerInterval, this.config.controllerMaxDisconnectSeconds); + // Control all workspace instances, either against ws-manager or configured timeouts + this.startController(clientProvider, controllerIntervalSeconds, this.config.controllerMaxDisconnectSeconds); } else { // _DO NOT_ update the DB (another bridge is responsible for that) // Still, listen to all updates, generate/derive new state and distribute it locally! @@ -408,69 +409,109 @@ export class WorkspaceManagerBridge implements Disposable { let disconnectStarted = Number.MAX_SAFE_INTEGER; this.disposables.push( repeat(async () => { + const span = TraceContext.startSpan("controlInstances"); + const ctx = { span }; try { - const client = await clientProvider(); - await this.controlInstallationInstances(client, maxTimeToRunningPhaseSeconds); - - disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period - } catch (e) { - if (durationLongerThanSeconds(disconnectStarted, controllerMaxDisconnectSeconds)) { - log.warn("Error while controlling installation's workspaces", e, { - installation: this.cluster.name, - }); - } else if (disconnectStarted > Date.now()) { - disconnectStarted = Date.now(); + const installation = this.cluster.name; + log.debug("Controlling instances...", { installation }); + + const runningInstances = await this.workspaceDB + .trace(ctx) + .findRunningInstancesWithWorkspaces(installation, undefined, true); + + // Control running workspace instances against ws-manager + try { + await this.controlRunningInstances( + ctx, + runningInstances, + clientProvider, + maxTimeToRunningPhaseSeconds, + ); + + disconnectStarted = Number.MAX_SAFE_INTEGER; // Reset disconnect period + } catch (err) { + if (durationLongerThanSeconds(disconnectStarted, controllerMaxDisconnectSeconds)) { + log.warn("Error while controlling installation's workspaces", err, { + installation: this.cluster.name, + }); + } else if (disconnectStarted > Date.now()) { + disconnectStarted = Date.now(); + } } + + log.debug("Done controlling instances.", { installation }); + } catch (err) { + TraceContext.setError(ctx, err); + log.error("Error while controlling installation's workspaces", err, { + installation: this.cluster.name, + }); + } finally { + span.finish(); } }, controllerIntervalSeconds * 1000), ); } - protected async controlInstallationInstances( - client: PromisifiedWorkspaceManagerClient, + protected async controlRunningInstances( + parentCtx: TraceContext, + runningInstances: RunningWorkspaceInfo[], + clientProvider: ClientProvider, maxTimeToRunningPhaseSeconds: number, ) { - const installation = this.cluster.name; - log.debug("controlling instances", { installation }); - let ctx: TraceContext = {}; + const installation = this.config.installation; - const runningInstances = await this.workspaceDB.trace(ctx).findRunningInstancesWithWorkspaces(installation); - const runningInstancesIdx = new Map(); - runningInstances.forEach((i) => runningInstancesIdx.set(i.latestInstance.id, i)); - - const actuallyRunningInstances = await client.getWorkspaces(ctx, new GetWorkspacesRequest()); - actuallyRunningInstances.getStatusList().forEach((s) => runningInstancesIdx.delete(s.getId())); + const span = TraceContext.startSpan("controlRunningInstances", parentCtx); + const ctx = { span }; + try { + log.debug("Controlling running instances...", { installation }); + + const runningInstancesIdx = new Map(); + runningInstances.forEach((i) => runningInstancesIdx.set(i.latestInstance.id, i)); + + const client = await clientProvider(); + const actuallyRunningInstances = await client.getWorkspaces(ctx, new GetWorkspacesRequest()); + actuallyRunningInstances.getStatusList().forEach((s) => runningInstancesIdx.delete(s.getId())); + + for (const [instanceId, ri] of runningInstancesIdx.entries()) { + const instance = ri.latestInstance; + if ( + !( + instance.status.phase === "running" || + durationLongerThanSeconds(Date.parse(instance.creationTime), maxTimeToRunningPhaseSeconds) + ) + ) { + log.debug({ instanceId }, "Skipping instance", { + phase: instance.status.phase, + creationTime: instance.creationTime, + region: instance.region, + }); + continue; + } - const promises: Promise[] = []; - for (const [instanceId, ri] of runningInstancesIdx.entries()) { - const instance = ri.latestInstance; - if ( - !( - instance.status.phase === "running" || - durationLongerThanSeconds(Date.parse(instance.creationTime), maxTimeToRunningPhaseSeconds) - ) - ) { - log.debug({ instanceId }, "Skipping instance", { - phase: instance.status.phase, - creationTime: instance.creationTime, - region: instance.region, - }); - continue; + log.info( + { instanceId, workspaceId: instance.workspaceId }, + "Database says the instance is running, but wsman does not know about it. Marking as stopped in database.", + { installation }, + ); + await this.markWorkspaceInstanceAsStopped(ctx, ri, new Date()); } - log.info( - { instanceId, workspaceId: instance.workspaceId }, - "Database says the instance is starting for too long or running, but wsman does not know about it. Marking as stopped in database.", - { installation }, - ); - instance.status.phase = "stopped"; - instance.stoppingTime = new Date().toISOString(); - instance.stoppedTime = new Date().toISOString(); - promises.push(this.workspaceDB.trace({}).storeInstance(instance)); - promises.push(this.onInstanceStopped({}, ri.workspace.ownerId, instance)); - promises.push(this.prebuildUpdater.stopPrebuildInstance(ctx, instance)); + log.debug("Done controlling running instances.", { installation }); + } catch (err) { + TraceContext.setError(ctx, err); + throw err; // required by caller } - await Promise.all(promises); + } + + protected async markWorkspaceInstanceAsStopped(ctx: TraceContext, info: RunningWorkspaceInfo, now: Date) { + const nowISO = now.toISOString(); + info.latestInstance.stoppingTime = nowISO; + info.latestInstance.stoppedTime = nowISO; + info.latestInstance.status.phase = "stopped"; + await this.workspaceDB.trace(ctx).storeInstance(info.latestInstance); + + await this.messagebus.notifyOnInstanceUpdate(ctx, info.workspace.ownerId, info.latestInstance); + await this.prebuildUpdater.stopPrebuildInstance(ctx, info.latestInstance); } protected async onInstanceStopped(