diff --git a/.werft/values.dev.yaml b/.werft/values.dev.yaml index d1a833a027378e..037efd5ded6144 100644 --- a/.werft/values.dev.yaml +++ b/.werft/values.dev.yaml @@ -218,10 +218,6 @@ mysql: memory: 350Mi rabbitmq: - # ensure shovels are configured on boot - shovels: - - name: messagebus-0 - srcUri: "amqp://$USERNAME:$PASSWORD@messagebus-0" auth: username: override-me password: override-me diff --git a/chart/templates/ws-manager-bridge-configmap.yaml b/chart/templates/ws-manager-bridge-configmap.yaml index 0d2903c68f3781..79d097cb7e6a00 100644 --- a/chart/templates/ws-manager-bridge-configmap.yaml +++ b/chart/templates/ws-manager-bridge-configmap.yaml @@ -31,6 +31,7 @@ data: "stoppingPhaseSeconds": 3600, "unknownPhaseSeconds": 600 }, + "emulatePreparingIntervalSeconds": "10", "staticBridges": {{ index (include "ws-manager-list" (dict "root" . "gp" $.Values "comp" .Values.components.server) | fromYaml) "manager" | default list | toJson }} } {{- end -}} diff --git a/chart/values.yaml b/chart/values.yaml index 4c6ac8ae7a9a1a..9ba553c05cdb0a 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -580,9 +580,6 @@ mysql: rabbitmq: enabled: true fullnameOverride: "messagebus" - # non-standard configuration - # defined by gitpod.io - shovels: [] persistence: enabled: false replicaCount: 1 @@ -605,7 +602,7 @@ rabbitmq: enabled: true allowExternal: true plugins: "rabbitmq_management rabbitmq_peer_discovery_k8s" - extraPlugins: "rabbitmq_shovel rabbitmq_shovel_management" + extraPlugins: "" extraSecrets: load-definition: load_definition.json: | @@ -618,7 +615,7 @@ rabbitmq: "vhosts": [{ "name": "/" }], - "parameters": {{ tpl (.Values.shovelsTemplate) . | fromYamlArray | toJson }}, + "parameters": [], "permissions": [{ "user": {{ .Values.auth.username | quote }}, "vhost": "/", @@ -627,23 +624,11 @@ rabbitmq: "read": ".*" }], "exchanges": [{ - "name": "gitpod.ws", - "vhost": "/", - "type": "topic", - "durable": true, - "auto_delete": false - }, { "name": "gitpod.ws.local", "vhost": "/", "type": "topic", "durable": true, "auto_delete": false - }, { - "name": "wsman", - "vhost": "/", - "type": "topic", - "durable": false, - "auto_delete": false }, { "name": "consensus-leader", "vhost": "/", @@ -651,14 +636,7 @@ rabbitmq: "durable": false, "auto_delete": false }], - "bindings": [{ - "source": "gitpod.ws.local", - "vhost": "/", - "destination": "gitpod.ws", - "destination_type": "exchange", - "routing_key": "#", - "arguments": {} - }], + "bindings": [], "queues": [{ "name": "consensus-peers", "vhost": "/", @@ -694,25 +672,6 @@ rabbitmq: create: true minAvailable: 0 maxUnavailable: 1 - shovelsTemplate: | - {{ $auth := .Values.auth }} - {{- range $index, $shovel := .Values.shovels }} - - name: {{ $shovel.name | default (randAlphaNum 20) | quote }} - vhost: "/" - component: "shovel" - value: - ack-mode: "on-publish" - src-delete-after: "never" - src-exchange: {{ $shovel.srcExchange | default "gitpod.ws.local" | quote }} - src-exchange-key: {{ $shovel.srcExchangeKey | default "#" | quote }} - src-protocol: "amqp091" - src-uri: {{ $shovel.srcUri | replace "$USERNAME" $auth.username | replace "$PASSWORD" $auth.password | quote }} - dest-add-forward-headers: {{ $shovel.destAddForwardHeaders | default true }} - dest-exchange: {{ $shovel.destExchange | default "gitpod.ws" | quote }} - dest-protocol: "amqp091" - dest-uri: {{ $shovel.destUri | default "amqp://" | quote }} - reconnect-delay: {{ $shovel.reconnectDelay | default 5 }} - {{- end }} cert-manager: enabled: false diff --git a/components/gitpod-db/src/typeorm/entity/db-workspace-instance.ts b/components/gitpod-db/src/typeorm/entity/db-workspace-instance.ts index bf88da13a2ab35..770f43f7f9b03c 100644 --- a/components/gitpod-db/src/typeorm/entity/db-workspace-instance.ts +++ b/components/gitpod-db/src/typeorm/entity/db-workspace-instance.ts @@ -13,6 +13,7 @@ import { Transformer } from "../transformer"; @Entity() @Index("ind_find_wsi_ws_in_period", ['workspaceId', 'startedTime', 'stoppedTime']) // findInstancesWithWorkspaceInPeriod +@Index("ind_phasePersisted_region", ['phasePersisted', 'region']) // findInstancesByPhaseAndRegion // on DB but not Typeorm: @Index("ind_lastModified", ["_lastModified"]) // DBSync export class DBWorkspaceInstance implements WorkspaceInstance { @PrimaryColumn(TypeORM.UUID_COLUMN_TYPE) diff --git a/components/gitpod-db/src/typeorm/migration/1645019483643-InstancesByPhaseAndRegion.ts b/components/gitpod-db/src/typeorm/migration/1645019483643-InstancesByPhaseAndRegion.ts new file mode 100644 index 00000000000000..3fdcf69630d389 --- /dev/null +++ b/components/gitpod-db/src/typeorm/migration/1645019483643-InstancesByPhaseAndRegion.ts @@ -0,0 +1,24 @@ +/** + * Copyright (c) 2022 Gitpod GmbH. All rights reserved. + * Licensed under the GNU Affero General Public License (AGPL). + * See License-AGPL.txt in the project root for license information. + */ + +import {MigrationInterface, QueryRunner} from "typeorm"; +import { indexExists } from "./helper/helper"; + +const TABLE_NAME = "d_b_workspace_instance"; +const INDEX_NAME = "ind_phasePersisted_region"; + +export class InstancesByPhaseAndRegion1645019483643 implements MigrationInterface { + + public async up(queryRunner: QueryRunner): Promise { + if(!(await indexExists(queryRunner, TABLE_NAME, INDEX_NAME))) { + await queryRunner.query(`CREATE INDEX ${INDEX_NAME} ON ${TABLE_NAME} (phasePersisted, region)`); + } + } + + public async down(queryRunner: QueryRunner): Promise { + } + +} diff --git a/components/gitpod-db/src/typeorm/workspace-db-impl.ts b/components/gitpod-db/src/typeorm/workspace-db-impl.ts index 0386444bab6ece..76b72ef97f1bc7 100644 --- a/components/gitpod-db/src/typeorm/workspace-db-impl.ts +++ b/components/gitpod-db/src/typeorm/workspace-db-impl.ts @@ -852,6 +852,15 @@ export abstract class AbstractTypeORMWorkspaceDBImpl implements WorkspaceDB { return (res); } + async findInstancesByPhaseAndRegion(phase: string, region: string): Promise { + const repo = await this.getWorkspaceInstanceRepo(); + // uses index: ind_phasePersisted_region + const qb = repo.createQueryBuilder("wsi") + .where("wsi.phasePersisted = :phase", { phase }) + .andWhere("wsi.region = :region", { region }); + return qb.getMany(); + } + async findPrebuiltWorkspacesByProject(projectId: string, branch?: string, limit?: number): Promise { const repo = await this.getPrebuiltWorkspaceRepo(); diff --git a/components/gitpod-db/src/workspace-db.ts b/components/gitpod-db/src/workspace-db.ts index dd93e9d1345fcf..680f31d9bc2293 100644 --- a/components/gitpod-db/src/workspace-db.ts +++ b/components/gitpod-db/src/workspace-db.ts @@ -83,6 +83,7 @@ export interface WorkspaceDB { findAllWorkspaces(offset: number, limit: number, orderBy: keyof Workspace, orderDir: "ASC" | "DESC", ownerId?: string, searchTerm?: string, minCreationTime?: Date, maxCreationDateTime?: Date, type?: WorkspaceType): Promise<{ total: number, rows: Workspace[] }>; findAllWorkspaceAndInstances(offset: number, limit: number, orderBy: keyof WorkspaceAndInstance, orderDir: "ASC" | "DESC", query?: AdminGetWorkspacesQuery, searchTerm?: string): Promise<{ total: number, rows: WorkspaceAndInstance[] }>; findWorkspaceAndInstance(id: string): Promise; + findInstancesByPhaseAndRegion(phase: string, region: string): Promise; findAllWorkspaceInstances(offset: number, limit: number, orderBy: keyof WorkspaceInstance, orderDir: "ASC" | "DESC", ownerId?: string, minCreationTime?: Date, maxCreationTime?: Date, onlyRunning?: boolean, type?: WorkspaceType): Promise<{ total: number, rows: WorkspaceInstance[] }>; diff --git a/components/gitpod-messagebus/src/messagebus.ts b/components/gitpod-messagebus/src/messagebus.ts index 04d7c45943f120..280336a38b18b7 100644 --- a/components/gitpod-messagebus/src/messagebus.ts +++ b/components/gitpod-messagebus/src/messagebus.ts @@ -76,7 +76,7 @@ const ASTERISK = "*"; @injectable() export class MessageBusHelperImpl implements MessageBusHelper { - readonly workspaceExchange = MessageBusHelperImpl.WORKSPACE_EXCHANGE; + readonly workspaceExchange = MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL; /** * Ensures that the gitpod workspace exchange is present @@ -155,7 +155,6 @@ export class MessageBusHelperImpl implements MessageBusHelper { } export namespace MessageBusHelperImpl { - export const WORKSPACE_EXCHANGE = "gitpod.ws"; export const WORKSPACE_EXCHANGE_LOCAL = "gitpod.ws.local"; export const PREBUILD_UPDATABLE_QUEUE = "pwsupdatable"; } diff --git a/components/installer/pkg/components/rabbitmq/helm.go b/components/installer/pkg/components/rabbitmq/helm.go index 1481b5f2655498..003033ff080792 100644 --- a/components/installer/pkg/components/rabbitmq/helm.go +++ b/components/installer/pkg/components/rabbitmq/helm.go @@ -7,10 +7,8 @@ package rabbitmq import ( "encoding/json" "fmt" - "strings" "helm.sh/helm/v3/pkg/cli/values" - "sigs.k8s.io/yaml" "github.com/gitpod-io/gitpod/installer/pkg/cluster" "github.com/gitpod-io/gitpod/installer/pkg/common" @@ -108,68 +106,12 @@ type config struct { Policies []policy `json:"policies"` } -func generateParameters(username string, password string, input []parameter) ([]parameter, error) { - // Ensures this defaults to [] not null when marshalled to JSON - params := make([]parameter, 0) - - for _, item := range input { - // Sort out default values - if item.Name == "" { - name, err := common.RandomString(20) - if err != nil { - return nil, err - } - item.Name = name - } - if item.Values.SrcExchange == "" { - item.Values.SrcExchange = "gitpod.ws.local" - } - if item.Values.SrcExchangeKey == "" { - item.Values.SrcExchangeKey = "#" - } - if item.Values.DestExchange == "" { - item.Values.DestExchange = "gitpod.ws" - } - if item.Values.DestUri == "" { - item.Values.DestUri = "amqp://" - } - - srcUri := strings.Replace(item.Values.SrcUri, "$USERNAME", username, -1) - srcUri = strings.Replace(srcUri, "$PASSWORD", password, -1) - - params = append(params, parameter{ - Name: item.Name, - Vhost: "/", - Component: "shovel", - Values: parameterValues{ - AckMode: "on-publish", - SrcDeleteAfter: "never", - SrcExchange: item.Values.SrcExchange, - SrcExchangeKey: item.Values.SrcExchangeKey, - SrcProtocol: "amqp091", - SrcUri: srcUri, - DestAddForwardHeaders: item.Values.DestAddForwardHeaders, - DestExchange: item.Values.DestExchange, - DestProtocol: "amqp091", - DestUri: item.Values.DestUri, - ReconnectDelay: item.Values.ReconnectDelay, - }, - }) - } - return params, nil -} - var Helm = common.CompositeHelmFunc( helm.ImportTemplate(charts.RabbitMQ(), helm.TemplateConfig{}, func(cfg *common.RenderContext) (*common.HelmConfig, error) { username := "gitpod" password := cfg.Values.MessageBusPassword - parameters, err := generateParameters(username, password, []parameter{}) - if err != nil { - return nil, err - } - loadDefinition, err := json.Marshal(config{ Users: []user{{ Name: username, @@ -177,7 +119,7 @@ var Helm = common.CompositeHelmFunc( Tags: "administrator", }}, Vhosts: []vhost{{Name: "/"}}, - Parameters: parameters, + Parameters: []parameter{}, Permissions: []permission{{ User: username, Vhost: "/", @@ -186,23 +128,11 @@ var Helm = common.CompositeHelmFunc( Read: ".*", }}, Exchanges: []exchange{{ - Name: "gitpod.ws", - Vhost: "/", - Type: "topic", - Durable: true, - AutoDelete: false, - }, { Name: "gitpod.ws.local", Vhost: "/", Type: "topic", Durable: true, AutoDelete: false, - }, { - Name: "wsman", - Vhost: "/", - Type: "topic", - Durable: false, - AutoDelete: false, }, { Name: "consensus-leader", Vhost: "/", @@ -210,14 +140,7 @@ var Helm = common.CompositeHelmFunc( Durable: false, AutoDelete: false, }}, - Bindings: []binding{{ - Source: "gitpod.ws.local", - Vhost: "/", - Destination: "gitpod.ws", - DestinationType: "exchange", - RoutingKey: "#", - Arguments: arguments{}, - }}, + Bindings: []binding{}, Queues: []queue{{ Name: "consensus-peers", Vhost: "/", @@ -251,16 +174,6 @@ var Helm = common.CompositeHelmFunc( return nil, err } - shovelsTemplate, err := yaml.Marshal(parameters) - if err != nil { - return nil, err - } - - shovelsTemplateFileName, err := helm.KeyFileValue("shovelsTemplate", shovelsTemplate) - if err != nil { - return nil, err - } - affinity, err := helm.AffinityYaml(cluster.AffinityLabelMeta) if err != nil { return nil, err @@ -293,7 +206,6 @@ var Helm = common.CompositeHelmFunc( FileValues: []string{ affinityTemplate, loadDefinitionFilename, - shovelsTemplateFileName, }, }, }, nil diff --git a/components/installer/pkg/components/ws-manager-bridge/configmap.go b/components/installer/pkg/components/ws-manager-bridge/configmap.go index 3b832532deb676..3bb7e863ac9334 100644 --- a/components/installer/pkg/components/ws-manager-bridge/configmap.go +++ b/components/installer/pkg/components/ws-manager-bridge/configmap.go @@ -30,7 +30,8 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) { StoppingPhaseSeconds: 3600, UnknownPhaseSeconds: 600, }, - StaticBridges: WSManagerList(), + EmulatePreparingIntervalSeconds: 10, + StaticBridges: WSManagerList(), } fc, err := common.ToJSONString(wsmbcfg) diff --git a/components/installer/pkg/components/ws-manager-bridge/types.go b/components/installer/pkg/components/ws-manager-bridge/types.go index aa8fd34bd386f8..8ea1f36de4ffe4 100644 --- a/components/installer/pkg/components/ws-manager-bridge/types.go +++ b/components/installer/pkg/components/ws-manager-bridge/types.go @@ -13,6 +13,7 @@ type Configuration struct { ControllerIntervalSeconds int32 `json:"controllerIntervalSeconds"` ControllerMaxDisconnectSeconds int32 `json:"controllerMaxDisconnectSeconds"` MaxTimeToRunningPhaseSeconds int32 `json:"maxTimeToRunningPhaseSeconds"` + EmulatePreparingIntervalSeconds int32 `json:"emulatePreparingIntervalSeconds"` Timeouts Timeouts `json:"timeouts"` } diff --git a/components/installer/third_party/charts/rabbitmq/values.yaml b/components/installer/third_party/charts/rabbitmq/values.yaml index eb1c46f4df0e0e..ba0a6506f707d0 100644 --- a/components/installer/third_party/charts/rabbitmq/values.yaml +++ b/components/installer/third_party/charts/rabbitmq/values.yaml @@ -26,7 +26,7 @@ rabbitmq: enabled: true allowExternal: true plugins: "rabbitmq_management rabbitmq_peer_discovery_k8s" - extraPlugins: "rabbitmq_shovel rabbitmq_shovel_management" + extraPlugins: "" loadDefinition: enabled: true existingSecret: load-definition diff --git a/components/server/src/workspace/messagebus-integration.ts b/components/server/src/workspace/messagebus-integration.ts index bf1790e5818f11..0022947500e7e5 100644 --- a/components/server/src/workspace/messagebus-integration.ts +++ b/components/server/src/workspace/messagebus-integration.ts @@ -176,7 +176,7 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration { await this.messageBusHelper.assertWorkspaceExchange(this.channel); // TODO(at) clarify on the exchange level - await super.publish(MessageBusHelperImpl.WORKSPACE_EXCHANGE, topic, Buffer.from(JSON.stringify(prebuildInfo))); + await super.publish(MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL, topic, Buffer.from(JSON.stringify(prebuildInfo))); } async notifyOnInstanceUpdate(userId: string, instance: WorkspaceInstance) { diff --git a/components/ws-manager-bridge/ee/src/bridge.ts b/components/ws-manager-bridge/ee/src/bridge.ts index 8289a7337db6f9..e4857252ec5c6c 100644 --- a/components/ws-manager-bridge/ee/src/bridge.ts +++ b/components/ws-manager-bridge/ee/src/bridge.ts @@ -10,7 +10,7 @@ import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing"; import { WorkspaceStatus, WorkspaceType, WorkspacePhase } from "@gitpod/ws-manager/lib"; import { HeadlessWorkspaceEvent, HeadlessWorkspaceEventType } from "@gitpod/gitpod-protocol/lib/headless-workspace-log"; import { WorkspaceInstance } from "@gitpod/gitpod-protocol"; -import { log } from "@gitpod/gitpod-protocol/lib/util/logging"; +import { log, LogContext } from "@gitpod/gitpod-protocol/lib/util/logging"; @injectable() export class WorkspaceManagerBridgeEE extends WorkspaceManagerBridge { @@ -38,15 +38,14 @@ export class WorkspaceManagerBridgeEE extends WorkspaceManagerBridge { } } - protected async updatePrebuiltWorkspace(ctx: TraceContext, status: WorkspaceStatus.AsObject) { + protected async updatePrebuiltWorkspace(ctx: TraceContext, userId: string, status: WorkspaceStatus.AsObject, writeToDB: boolean) { if (status.spec && status.spec.type != WorkspaceType.PREBUILD) { return; } const instanceId = status.id!; const workspaceId = status.metadata!.metaId!; - const userId = status.metadata!.owner!; - const logCtx = { instanceId, workspaceId, userId }; + const logCtx: LogContext = { instanceId, workspaceId, userId }; const span = TraceContext.startSpan("updatePrebuiltWorkspace", ctx); try { @@ -56,12 +55,15 @@ export class WorkspaceManagerBridgeEE extends WorkspaceManagerBridge { TraceContext.setError({span}, new Error("headless workspace without prebuild")); return } + span.setTag("updatePrebuiltWorkspace.prebuildId", prebuild.id); if (prebuild.state === 'queued') { // We've received an update from ws-man for this workspace, hence it must be running. prebuild.state = "building"; - await this.workspaceDB.trace({span}).storePrebuiltWorkspace(prebuild); + if (writeToDB) { + await this.workspaceDB.trace({span}).storePrebuiltWorkspace(prebuild); + } await this.messagebus.notifyHeadlessUpdate({span}, userId, workspaceId, { type: HeadlessWorkspaceEventType.Started, workspaceID: workspaceId, @@ -92,15 +94,19 @@ export class WorkspaceManagerBridgeEE extends WorkspaceManagerBridge { prebuild.snapshot = status.conditions!.snapshot; headlessUpdateType = HeadlessWorkspaceEventType.FinishedSuccessfully; } - await this.workspaceDB.trace({span}).storePrebuiltWorkspace(prebuild); + if (writeToDB) { + await this.workspaceDB.trace({span}).storePrebuiltWorkspace(prebuild); + } + + // notify updates + // headless update await this.messagebus.notifyHeadlessUpdate({span}, userId, workspaceId, { type: headlessUpdateType, workspaceID: workspaceId, }); - } - { // notify about prebuild updated + // prebuild info const info = (await this.workspaceDB.trace({span}).findPrebuildInfos([prebuild.id]))[0]; if (info) { this.messagebus.notifyOnPrebuildUpdate({ info, status: prebuild.state }); diff --git a/components/ws-manager-bridge/src/bridge-controller.ts b/components/ws-manager-bridge/src/bridge-controller.ts index 91d2299e8d5065..1e3b70dc2defd1 100644 --- a/components/ws-manager-bridge/src/bridge-controller.ts +++ b/components/ws-manager-bridge/src/bridge-controller.ts @@ -61,7 +61,7 @@ export class BridgeController { protected async reconcile() { return this.reconcileQueue.enqueue(async () => { const allClusters = await this.getAllWorkspaceClusters(); - log.info("reconciling clusters...", { allClusters }); + log.info("reconciling clusters...", { allClusters: Array.from(allClusters.values()) }); const toDelete: string[] = []; try { for (const [name, bridge] of this.bridges) { @@ -81,13 +81,13 @@ export class BridgeController { } } - this.metrics.updateClusterMetrics(Array.from(allClusters).map(c => c[1])); + this.metrics.updateClusterMetrics(Array.from(allClusters).map(([_, c]) => c)); for (const [name, newCluster] of allClusters) { log.debug("reconcile: create bridge for new cluster", { name }); const bridge = await this.createAndStartBridge(newCluster); this.bridges.set(newCluster.name, bridge); } - log.info("done reconciling.", { allClusters }); + log.info("done reconciling.", { allClusters: Array.from(allClusters.values()) }); }); } diff --git a/components/ws-manager-bridge/src/bridge.ts b/components/ws-manager-bridge/src/bridge.ts index 64642ca95da7c7..9913af72032701 100644 --- a/components/ws-manager-bridge/src/bridge.ts +++ b/components/ws-manager-bridge/src/bridge.ts @@ -6,7 +6,7 @@ import { inject, injectable } from "inversify"; import { MessageBusIntegration } from "./messagebus-integration"; -import { Disposable, WorkspaceInstance, Queue, WorkspaceInstancePort, PortVisibility, RunningWorkspaceInfo } from "@gitpod/gitpod-protocol"; +import { Disposable, WorkspaceInstance, Queue, WorkspaceInstancePort, PortVisibility, RunningWorkspaceInfo, DisposableCollection } from "@gitpod/gitpod-protocol"; import { WorkspaceStatus, WorkspacePhase, GetWorkspacesRequest, WorkspaceConditionBool, PortVisibility as WsManPortVisibility, WorkspaceType, PromisifiedWorkspaceManagerClient } from "@gitpod/ws-manager/lib"; import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db"; import { UserDB } from "@gitpod/gitpod-db/lib/user-db"; @@ -20,6 +20,7 @@ import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb"; import { Configuration } from "./config"; import { WorkspaceCluster } from "@gitpod/gitpod-protocol/lib/workspace-cluster"; import { repeat } from "@gitpod/gitpod-protocol/lib/util/repeat"; +import { PreparingUpdateEmulator } from "./preparing-update-emulator"; export const WorkspaceManagerBridgeFactory = Symbol("WorkspaceManagerBridgeFactory"); @@ -53,28 +54,43 @@ export class WorkspaceManagerBridge implements Disposable { @inject(IAnalyticsWriter) protected readonly analytics: IAnalyticsWriter; - protected readonly disposables: Disposable[] = []; + protected readonly disposables = new DisposableCollection(); protected readonly queues = new Map(); protected cluster: WorkspaceClusterInfo; public start(cluster: WorkspaceClusterInfo, clientProvider: ClientProvider) { - const logPayload = { name: cluster.name, url: cluster.url }; + const logPayload = { name: cluster.name, url: cluster.url, govern: cluster.govern }; log.info(`starting bridge to cluster...`, logPayload); this.cluster = cluster; - if (cluster.govern) { - log.debug(`starting DB updater: ${cluster.name}`, logPayload); - /* no await */ this.startDatabaseUpdater(clientProvider, logPayload) + const startStatusUpdateHandler = (writeToDB: boolean) => { + log.debug(`starting status update handler: ${cluster.name}`, logPayload); + /* no await */ this.startStatusUpdateHandler(clientProvider, writeToDB, logPayload) // this is a mere safe-guard: we do not expect the code inside to fail - .catch(err => log.error("cannot start database updater", err)); + .catch(err => log.error("cannot start status update handler", err)); + }; + if (cluster.govern) { + // notify servers and _update the DB_ + startStatusUpdateHandler(true); + + // the actual "governing" part const controllerInterval = this.config.controllerIntervalSeconds; if (controllerInterval <= 0) { throw new Error("controllerInterval <= 0!"); } log.debug(`starting controller: ${cluster.name}`, logPayload); this.startController(clientProvider, controllerInterval, this.config.controllerMaxDisconnectSeconds); + } else { + // _DO NOT_ update the DB (another bridge is responsible for that) + // Still, listen to all updates, generate/derive new state and distribute it locally! + startStatusUpdateHandler(false); + + // emulate WorkspaceInstance updates for all Workspaces in the "preparing" phase in this cluster + const updateEmulator = new PreparingUpdateEmulator(); + this.disposables.push(updateEmulator); + updateEmulator.start(cluster.name); } log.info(`started bridge to cluster.`, logPayload); } @@ -83,15 +99,15 @@ export class WorkspaceManagerBridge implements Disposable { this.dispose(); } - protected async startDatabaseUpdater(clientProvider: ClientProvider, logPayload: {}): Promise { + protected async startStatusUpdateHandler(clientProvider: ClientProvider, writeToDB: boolean, logPayload: {}): Promise { const subscriber = new WsmanSubscriber(clientProvider); this.disposables.push(subscriber); const onReconnect = (ctx: TraceContext, s: WorkspaceStatus[]) => { - s.forEach(sx => this.serializeMessagesByInstanceId(ctx, sx, m => m.getId(), (ctx, msg) => this.handleStatusUpdate(ctx, msg))) + s.forEach(sx => this.serializeMessagesByInstanceId(ctx, sx, m => m.getId(), (ctx, msg) => this.handleStatusUpdate(ctx, msg, writeToDB))) }; const onStatusUpdate = (ctx: TraceContext, s: WorkspaceStatus) => { - this.serializeMessagesByInstanceId(ctx, s, msg => msg.getId(), (ctx, s) => this.handleStatusUpdate(ctx, s)) + this.serializeMessagesByInstanceId(ctx, s, msg => msg.getId(), (ctx, s) => this.handleStatusUpdate(ctx, s, writeToDB)) }; await subscriber.subscribe({ onReconnect, onStatusUpdate }, logPayload); } @@ -110,7 +126,7 @@ export class WorkspaceManagerBridge implements Disposable { this.queues.set(instanceId, q); } - protected async handleStatusUpdate(ctx: TraceContext, rawStatus: WorkspaceStatus) { + protected async handleStatusUpdate(ctx: TraceContext, rawStatus: WorkspaceStatus, writeToDB: boolean) { const status = rawStatus.toObject(); if (!status.spec || !status.metadata || !status.conditions) { log.warn("Received invalid status update", status); @@ -123,6 +139,7 @@ export class WorkspaceManagerBridge implements Disposable { const span = TraceContext.startSpan("handleStatusUpdate", ctx); span.setTag("status", JSON.stringify(filterStatus(status))); + span.setTag("writeToDB", writeToDB); try { // Beware of the ID mapping here: What's a workspace to the ws-manager is a workspace instance to the rest of the system. // The workspace ID of ws-manager is the workspace instance ID in the database. @@ -246,20 +263,26 @@ export class WorkspaceManagerBridge implements Disposable { break; } - await this.updatePrebuiltWorkspace({span}, status); - span.setTag("after", JSON.stringify(instance)); - await this.workspaceDB.trace({span}).storeInstance(instance); - await this.messagebus.notifyOnInstanceUpdate({span}, userId, instance); - // important: call this after the DB update - await this.cleanupProbeWorkspace({span}, status); + // now notify all prebuild listeners about updates - and update DB if needed + await this.updatePrebuiltWorkspace({span}, userId, status, writeToDB); - if (!!lifecycleHandler) { - await lifecycleHandler(); + if (writeToDB) { + await this.workspaceDB.trace(ctx).storeInstance(instance); + + // cleanup + // important: call this after the DB update + await this.cleanupProbeWorkspace(ctx, status); + + if (!!lifecycleHandler) { + await lifecycleHandler(); + } } + await this.messagebus.notifyOnInstanceUpdate(ctx, userId, instance); + } catch (e) { - TraceContext.setError({ span }, e); + TraceContext.setError({span}, e); throw e; } finally { span.finish(); @@ -321,7 +344,7 @@ export class WorkspaceManagerBridge implements Disposable { // probes are an EE feature - we just need the hook here } - protected async updatePrebuiltWorkspace(ctx: TraceContext, status: WorkspaceStatus.AsObject) { + protected async updatePrebuiltWorkspace(ctx: TraceContext, userId: string, status: WorkspaceStatus.AsObject, writeToDB: boolean) { // prebuilds are an EE feature - we just need the hook here } @@ -349,7 +372,7 @@ export class WorkspaceManagerBridge implements Disposable { } public dispose() { - this.disposables.forEach(d => d.dispose()); + this.disposables.dispose(); } } diff --git a/components/ws-manager-bridge/src/config.ts b/components/ws-manager-bridge/src/config.ts index ccd40f9cb2289f..d73038ac918b3d 100644 --- a/components/ws-manager-bridge/src/config.ts +++ b/components/ws-manager-bridge/src/config.ts @@ -36,4 +36,7 @@ export interface Configuration { stoppingPhaseSeconds: number; unknownPhaseSeconds: number; } + + // emulatePreparingIntervalSeconds configures how often we check for Workspaces in phase "preparing" for clusters we do not govern + emulatePreparingIntervalSeconds: number; } diff --git a/components/ws-manager-bridge/src/container-module.ts b/components/ws-manager-bridge/src/container-module.ts index c3fee712c3728d..24b889014d49b3 100644 --- a/components/ws-manager-bridge/src/container-module.ts +++ b/components/ws-manager-bridge/src/container-module.ts @@ -25,6 +25,7 @@ import { newAnalyticsWriterFromEnv } from '@gitpod/gitpod-protocol/lib/util/anal import { MetaInstanceController } from './meta-instance-controller'; import { IClientCallMetrics } from '@gitpod/content-service/lib/client-call-metrics'; import { PrometheusClientCallMetrics } from "@gitpod/gitpod-protocol/lib/messaging/client-call-metrics"; +import { PreparingUpdateEmulator } from './preparing-update-emulator'; export const containerModule = new ContainerModule(bind => { @@ -68,4 +69,6 @@ export const containerModule = new ContainerModule(bind => { }).inSingletonScope(); bind(IAnalyticsWriter).toDynamicValue(newAnalyticsWriterFromEnv).inSingletonScope(); + + bind(PreparingUpdateEmulator).toSelf().inSingletonScope(); }); diff --git a/components/ws-manager-bridge/src/messagebus-integration.ts b/components/ws-manager-bridge/src/messagebus-integration.ts index 08ab721797a351..65d6f6afcce1d6 100644 --- a/components/ws-manager-bridge/src/messagebus-integration.ts +++ b/components/ws-manager-bridge/src/messagebus-integration.ts @@ -5,9 +5,7 @@ */ import { injectable, inject } from 'inversify'; -import { MessageBusHelper, AbstractMessageBusIntegration, TopicListener, AbstractTopicListener, MessageBusHelperImpl } from "@gitpod/gitpod-messagebus/lib"; -import { Disposable, CancellationTokenSource } from 'vscode-jsonrpc'; -import { WorkspaceStatus } from '@gitpod/ws-manager/lib'; +import { MessageBusHelper, AbstractMessageBusIntegration, MessageBusHelperImpl } from "@gitpod/gitpod-messagebus/lib"; import { HeadlessWorkspaceEventType, WorkspaceInstance, HeadlessWorkspaceEvent, PrebuildWithStatus } from '@gitpod/gitpod-protocol'; import { TraceContext } from '@gitpod/gitpod-protocol/lib/util/tracing'; @@ -15,25 +13,6 @@ import { TraceContext } from '@gitpod/gitpod-protocol/lib/util/tracing'; export class MessageBusIntegration extends AbstractMessageBusIntegration { @inject(MessageBusHelper) protected readonly messageBusHelper: MessageBusHelper; - async connect(): Promise { - await super.connect(); - if (!this.channel) { - return - } - - await this.channel.assertExchange(MessageBusIntegration.WORKSPACE_EXCHANGE, 'topic', { 'durable': false }); - await this.channel.assertExchange(MessageBusIntegration.LOCAL_WORKSPACE_EXCHANGE, 'topic', { 'durable': false }); - - await this.channel.bindExchange(MessageBusHelperImpl.WORKSPACE_EXCHANGE, MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL, "#"); - } - - listenForWorkspaceStatusUpdates(topic: string, callback: (ctx: TraceContext, status: WorkspaceStatus.AsObject) => void): Disposable { - const listener = new WorkspaceStatusUpdateListener(callback, MessageBusIntegration.WORKSPACE_EXCHANGE, topic); - const cancellationTokenSource = new CancellationTokenSource() - this.listen(listener, cancellationTokenSource.token); - return Disposable.create(() => cancellationTokenSource.cancel()) - } - async notifyOnPrebuildUpdate(prebuildInfo: PrebuildWithStatus) { if (!this.channel) { throw new Error("Not connected to message bus"); @@ -94,19 +73,3 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration { } } - -export namespace MessageBusIntegration { - export const WORKSPACE_EXCHANGE = "wsman"; - export const LOCAL_WORKSPACE_EXCHANGE = "wsman.local"; -} - -class WorkspaceStatusUpdateListener extends AbstractTopicListener { - - constructor(listener: TopicListener, workspaceExchange: string, protected readonly _topic: string) { - super(MessageBusIntegration.WORKSPACE_EXCHANGE, listener); - } - - topic() { - return this._topic; - } -} diff --git a/components/ws-manager-bridge/src/preparing-update-emulator.ts b/components/ws-manager-bridge/src/preparing-update-emulator.ts new file mode 100644 index 00000000000000..53f3424c390ccf --- /dev/null +++ b/components/ws-manager-bridge/src/preparing-update-emulator.ts @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2020 Gitpod GmbH. All rights reserved. + * Licensed under the GNU Affero General Public License (AGPL). + * See License-AGPL.txt in the project root for license information. + */ +import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db"; +import { Disposable, DisposableCollection, WorkspaceInstance } from "@gitpod/gitpod-protocol"; +import { log } from "@gitpod/gitpod-protocol/lib/util/logging"; +import { repeat } from "@gitpod/gitpod-protocol/lib/util/repeat"; +import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing"; +import { inject, injectable } from "inversify"; +import { Configuration } from "./config"; +import { MessageBusIntegration } from "./messagebus-integration"; +import { GarbageCollectedCache } from "@gitpod/gitpod-protocol/lib/util/garbage-collected-cache"; +import * as crypto from 'crypto'; + +interface CacheEntry { + instance: WorkspaceInstance, + userId: string, + hash: string, +} + +/** + * The purpose of this class is to emulate WorkspaceInstance updates for workspaces instances that are not governed by this bridge. + * It does so by polling the DB for the specific region, and if anything changed, push that update into the local messagebus. + * This is a work-around to enable decoupling cross-cluster messagebus instances from each other. + */ +@injectable() +export class PreparingUpdateEmulator implements Disposable { + + @inject(Configuration) protected readonly config: Configuration; + @inject(WorkspaceDB) protected readonly workspaceDb: WorkspaceDB; + @inject(MessageBusIntegration) protected readonly messagebus: MessageBusIntegration; + + + protected readonly cachedResponses = new GarbageCollectedCache(600, 150); + protected readonly disposables = new DisposableCollection(); + + start(region: string) { + this.disposables.push( + repeat(async () => { + const span = TraceContext.startSpan("preparingUpdateEmulatorRun"); + const ctx = {span}; + try { + const instances = await this.workspaceDb.findInstancesByPhaseAndRegion("preparing", region); + span.setTag("preparingUpdateEmulatorRun.nrOfInstances", instances.length); + for (const instance of instances) { + const hash = hasher(instance); + const entry = this.cachedResponses.get(instance.id); + if (entry && entry.hash === hash) { + continue; + } + + let userId = entry?.userId; + if (!userId) { + const ws = await this.workspaceDb.findById(instance.workspaceId); + if (!ws) { + log.debug({ instanceId: instance.id, workspaceId: instance.workspaceId }, "no workspace found for workspace instance"); + continue; + } + userId = ws.ownerId; + } + + await this.messagebus.notifyOnInstanceUpdate(ctx, userId, instance); + this.cachedResponses.set(instance.id, { + instance, + hash, + userId, + }); + } + } catch (err) { + TraceContext.setError(ctx, err); + } finally { + span.finish(); + } + }, this.config.emulatePreparingIntervalSeconds * 1000) + ); + } + + dispose() { + this.disposables.dispose(); + } +} + +function hasher(o: {}): string { + return crypto.createHash('md5') + .update(JSON.stringify(o)) + .digest('hex'); +} \ No newline at end of file diff --git a/components/ws-manager-bridge/src/prometheus-metrics-exporter.ts b/components/ws-manager-bridge/src/prometheus-metrics-exporter.ts index 46a2e4ea187021..6be619e515ef6f 100644 --- a/components/ws-manager-bridge/src/prometheus-metrics-exporter.ts +++ b/components/ws-manager-bridge/src/prometheus-metrics-exporter.ts @@ -17,7 +17,7 @@ export class PrometheusMetricsExporter { protected readonly clusterCordoned: prom.Gauge; protected readonly statusUpdatesTotal: prom.Counter; - protected activeClusterNames: string[] = []; + protected activeClusterNames = new Set(); constructor() { this.workspaceStartupTimeHistogram = new prom.Histogram({ @@ -69,18 +69,18 @@ export class PrometheusMetricsExporter { } updateClusterMetrics(clusters: WorkspaceClusterWoTLS[]): void { - let newActiveClusterNames: string[] = []; + let newActiveClusterNames = new Set(); clusters.forEach(cluster => { this.clusterCordoned.labels(cluster.name).set(cluster.state === 'cordoned' ? 1 : 0); this.clusterScore.labels(cluster.name).set(cluster.score); - newActiveClusterNames.push(cluster.name); + newActiveClusterNames.add(cluster.name); }); - const noLongerActiveCluster = this.activeClusterNames.filter(c => !newActiveClusterNames.includes(c)); - if (noLongerActiveCluster.length > 0) { - this.clusterScore.remove(...noLongerActiveCluster); - this.clusterCordoned.remove(...noLongerActiveCluster); - } + const noLongerActiveCluster = Array.from(this.activeClusterNames).filter(c => !newActiveClusterNames.has(c)); + noLongerActiveCluster.forEach(clusterName => { + this.clusterCordoned.remove(clusterName); + this.clusterScore.remove(clusterName); + }); this.activeClusterNames = newActiveClusterNames; }