Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[messagebus, bridge] Remove messagebus cross-cluster dependency #7523

Merged
merged 4 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .werft/values.dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,6 @@ mysql:
memory: 350Mi

rabbitmq:
# ensure shovels are configured on boot
shovels:
- name: messagebus-0
srcUri: "amqp://$USERNAME:$PASSWORD@messagebus-0"
auth:
username: override-me
password: override-me
Expand Down
1 change: 1 addition & 0 deletions chart/templates/ws-manager-bridge-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ data:
"stoppingPhaseSeconds": 3600,
"unknownPhaseSeconds": 600
},
"emulatePreparingIntervalSeconds": "10",
"staticBridges": {{ index (include "ws-manager-list" (dict "root" . "gp" $.Values "comp" .Values.components.server) | fromYaml) "manager" | default list | toJson }}
}
{{- end -}}
Expand Down
47 changes: 3 additions & 44 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,6 @@ mysql:
rabbitmq:
enabled: true
fullnameOverride: "messagebus"
# non-standard configuration
# defined by gitpod.io
shovels: []
persistence:
enabled: false
replicaCount: 1
Expand All @@ -605,7 +602,7 @@ rabbitmq:
enabled: true
allowExternal: true
plugins: "rabbitmq_management rabbitmq_peer_discovery_k8s"
extraPlugins: "rabbitmq_shovel rabbitmq_shovel_management"
extraPlugins: ""
extraSecrets:
load-definition:
load_definition.json: |
Expand All @@ -618,7 +615,7 @@ rabbitmq:
"vhosts": [{
"name": "/"
}],
"parameters": {{ tpl (.Values.shovelsTemplate) . | fromYamlArray | toJson }},
"parameters": [],
"permissions": [{
"user": {{ .Values.auth.username | quote }},
"vhost": "/",
Expand All @@ -627,38 +624,19 @@ rabbitmq:
"read": ".*"
}],
"exchanges": [{
"name": "gitpod.ws",
"vhost": "/",
"type": "topic",
"durable": true,
"auto_delete": false
}, {
"name": "gitpod.ws.local",
"vhost": "/",
"type": "topic",
"durable": true,
"auto_delete": false
}, {
"name": "wsman",
"vhost": "/",
"type": "topic",
"durable": false,
"auto_delete": false
}, {
"name": "consensus-leader",
"vhost": "/",
"type": "fanout",
"durable": false,
"auto_delete": false
}],
"bindings": [{
"source": "gitpod.ws.local",
"vhost": "/",
"destination": "gitpod.ws",
"destination_type": "exchange",
"routing_key": "#",
"arguments": {}
}],
"bindings": [],
"queues": [{
"name": "consensus-peers",
"vhost": "/",
Expand Down Expand Up @@ -694,25 +672,6 @@ rabbitmq:
create: true
minAvailable: 0
maxUnavailable: 1
shovelsTemplate: |
{{ $auth := .Values.auth }}
{{- range $index, $shovel := .Values.shovels }}
- name: {{ $shovel.name | default (randAlphaNum 20) | quote }}
vhost: "/"
component: "shovel"
value:
ack-mode: "on-publish"
src-delete-after: "never"
src-exchange: {{ $shovel.srcExchange | default "gitpod.ws.local" | quote }}
src-exchange-key: {{ $shovel.srcExchangeKey | default "#" | quote }}
src-protocol: "amqp091"
src-uri: {{ $shovel.srcUri | replace "$USERNAME" $auth.username | replace "$PASSWORD" $auth.password | quote }}
dest-add-forward-headers: {{ $shovel.destAddForwardHeaders | default true }}
dest-exchange: {{ $shovel.destExchange | default "gitpod.ws" | quote }}
dest-protocol: "amqp091"
dest-uri: {{ $shovel.destUri | default "amqp://" | quote }}
reconnect-delay: {{ $shovel.reconnectDelay | default 5 }}
{{- end }}

cert-manager:
enabled: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { Transformer } from "../transformer";

@Entity()
@Index("ind_find_wsi_ws_in_period", ['workspaceId', 'startedTime', 'stoppedTime']) // findInstancesWithWorkspaceInPeriod
@Index("ind_phasePersisted_region", ['phasePersisted', 'region']) // findInstancesByPhaseAndRegion
// on DB but not Typeorm: @Index("ind_lastModified", ["_lastModified"]) // DBSync
export class DBWorkspaceInstance implements WorkspaceInstance {
@PrimaryColumn(TypeORM.UUID_COLUMN_TYPE)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Copyright (c) 2022 Gitpod GmbH. All rights reserved.
* Licensed under the GNU Affero General Public License (AGPL).
* See License-AGPL.txt in the project root for license information.
*/

import {MigrationInterface, QueryRunner} from "typeorm";
import { indexExists } from "./helper/helper";

const TABLE_NAME = "d_b_workspace_instance";
const INDEX_NAME = "ind_phasePersisted_region";

export class InstancesByPhaseAndRegion1645019483643 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<void> {
if(!(await indexExists(queryRunner, TABLE_NAME, INDEX_NAME))) {
await queryRunner.query(`CREATE INDEX ${INDEX_NAME} ON ${TABLE_NAME} (phasePersisted, region)`);
}
}

public async down(queryRunner: QueryRunner): Promise<void> {
}

}
9 changes: 9 additions & 0 deletions components/gitpod-db/src/typeorm/workspace-db-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,15 @@ export abstract class AbstractTypeORMWorkspaceDBImpl implements WorkspaceDB {
return <WorkspaceAndInstance>(res);
}

async findInstancesByPhaseAndRegion(phase: string, region: string): Promise<WorkspaceInstance[]> {
const repo = await this.getWorkspaceInstanceRepo();
// uses index: ind_phasePersisted_region
const qb = repo.createQueryBuilder("wsi")
.where("wsi.phasePersisted = :phase", { phase })
.andWhere("wsi.region = :region", { region });
return qb.getMany();
}

async findPrebuiltWorkspacesByProject(projectId: string, branch?: string, limit?: number): Promise<PrebuiltWorkspace[]> {
const repo = await this.getPrebuiltWorkspaceRepo();

Expand Down
1 change: 1 addition & 0 deletions components/gitpod-db/src/workspace-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export interface WorkspaceDB {
findAllWorkspaces(offset: number, limit: number, orderBy: keyof Workspace, orderDir: "ASC" | "DESC", ownerId?: string, searchTerm?: string, minCreationTime?: Date, maxCreationDateTime?: Date, type?: WorkspaceType): Promise<{ total: number, rows: Workspace[] }>;
findAllWorkspaceAndInstances(offset: number, limit: number, orderBy: keyof WorkspaceAndInstance, orderDir: "ASC" | "DESC", query?: AdminGetWorkspacesQuery, searchTerm?: string): Promise<{ total: number, rows: WorkspaceAndInstance[] }>;
findWorkspaceAndInstance(id: string): Promise<WorkspaceAndInstance | undefined>;
findInstancesByPhaseAndRegion(phase: string, region: string): Promise<WorkspaceInstance[]>;

findAllWorkspaceInstances(offset: number, limit: number, orderBy: keyof WorkspaceInstance, orderDir: "ASC" | "DESC", ownerId?: string, minCreationTime?: Date, maxCreationTime?: Date, onlyRunning?: boolean, type?: WorkspaceType): Promise<{ total: number, rows: WorkspaceInstance[] }>;

Expand Down
3 changes: 1 addition & 2 deletions components/gitpod-messagebus/src/messagebus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ const ASTERISK = "*";

@injectable()
export class MessageBusHelperImpl implements MessageBusHelper {
readonly workspaceExchange = MessageBusHelperImpl.WORKSPACE_EXCHANGE;
readonly workspaceExchange = MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL;

/**
* Ensures that the gitpod workspace exchange is present
Expand Down Expand Up @@ -155,7 +155,6 @@ export class MessageBusHelperImpl implements MessageBusHelper {
}

export namespace MessageBusHelperImpl {
export const WORKSPACE_EXCHANGE = "gitpod.ws";
export const WORKSPACE_EXCHANGE_LOCAL = "gitpod.ws.local";
export const PREBUILD_UPDATABLE_QUEUE = "pwsupdatable";
}
Expand Down
92 changes: 2 additions & 90 deletions components/installer/pkg/components/rabbitmq/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ package rabbitmq
import (
"encoding/json"
"fmt"
"strings"

"helm.sh/helm/v3/pkg/cli/values"
"sigs.k8s.io/yaml"

"github.com/gitpod-io/gitpod/installer/pkg/cluster"
"github.com/gitpod-io/gitpod/installer/pkg/common"
Expand Down Expand Up @@ -108,76 +106,20 @@ type config struct {
Policies []policy `json:"policies"`
}

func generateParameters(username string, password string, input []parameter) ([]parameter, error) {
// Ensures this defaults to [] not null when marshalled to JSON
params := make([]parameter, 0)

for _, item := range input {
// Sort out default values
if item.Name == "" {
name, err := common.RandomString(20)
if err != nil {
return nil, err
}
item.Name = name
}
if item.Values.SrcExchange == "" {
item.Values.SrcExchange = "gitpod.ws.local"
}
if item.Values.SrcExchangeKey == "" {
item.Values.SrcExchangeKey = "#"
}
if item.Values.DestExchange == "" {
item.Values.DestExchange = "gitpod.ws"
}
if item.Values.DestUri == "" {
item.Values.DestUri = "amqp://"
}

srcUri := strings.Replace(item.Values.SrcUri, "$USERNAME", username, -1)
srcUri = strings.Replace(srcUri, "$PASSWORD", password, -1)

params = append(params, parameter{
Name: item.Name,
Vhost: "/",
Component: "shovel",
Values: parameterValues{
AckMode: "on-publish",
SrcDeleteAfter: "never",
SrcExchange: item.Values.SrcExchange,
SrcExchangeKey: item.Values.SrcExchangeKey,
SrcProtocol: "amqp091",
SrcUri: srcUri,
DestAddForwardHeaders: item.Values.DestAddForwardHeaders,
DestExchange: item.Values.DestExchange,
DestProtocol: "amqp091",
DestUri: item.Values.DestUri,
ReconnectDelay: item.Values.ReconnectDelay,
},
})
}
return params, nil
}

var Helm = common.CompositeHelmFunc(
helm.ImportTemplate(charts.RabbitMQ(), helm.TemplateConfig{}, func(cfg *common.RenderContext) (*common.HelmConfig, error) {
username := "gitpod"

password := cfg.Values.MessageBusPassword

parameters, err := generateParameters(username, password, []parameter{})
if err != nil {
return nil, err
}

loadDefinition, err := json.Marshal(config{
Users: []user{{
Name: username,
Password: password,
Tags: "administrator",
}},
Vhosts: []vhost{{Name: "/"}},
Parameters: parameters,
Parameters: []parameter{},
Permissions: []permission{{
User: username,
Vhost: "/",
Expand All @@ -186,38 +128,19 @@ var Helm = common.CompositeHelmFunc(
Read: ".*",
}},
Exchanges: []exchange{{
Name: "gitpod.ws",
Vhost: "/",
Type: "topic",
Durable: true,
AutoDelete: false,
}, {
Name: "gitpod.ws.local",
Vhost: "/",
Type: "topic",
Durable: true,
AutoDelete: false,
}, {
Name: "wsman",
Vhost: "/",
Type: "topic",
Durable: false,
AutoDelete: false,
}, {
Name: "consensus-leader",
Vhost: "/",
Type: "fanout",
Durable: false,
AutoDelete: false,
}},
Bindings: []binding{{
Source: "gitpod.ws.local",
Vhost: "/",
Destination: "gitpod.ws",
DestinationType: "exchange",
RoutingKey: "#",
Arguments: arguments{},
}},
Bindings: []binding{},
Queues: []queue{{
Name: "consensus-peers",
Vhost: "/",
Expand Down Expand Up @@ -251,16 +174,6 @@ var Helm = common.CompositeHelmFunc(
return nil, err
}

shovelsTemplate, err := yaml.Marshal(parameters)
if err != nil {
return nil, err
}

shovelsTemplateFileName, err := helm.KeyFileValue("shovelsTemplate", shovelsTemplate)
if err != nil {
return nil, err
}

affinity, err := helm.AffinityYaml(cluster.AffinityLabelMeta)
if err != nil {
return nil, err
Expand Down Expand Up @@ -293,7 +206,6 @@ var Helm = common.CompositeHelmFunc(
FileValues: []string{
affinityTemplate,
loadDefinitionFilename,
shovelsTemplateFileName,
},
},
}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func configmap(ctx *common.RenderContext) ([]runtime.Object, error) {
StoppingPhaseSeconds: 3600,
UnknownPhaseSeconds: 600,
},
StaticBridges: WSManagerList(),
EmulatePreparingIntervalSeconds: 10,
StaticBridges: WSManagerList(),
}

fc, err := common.ToJSONString(wsmbcfg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Configuration struct {
ControllerIntervalSeconds int32 `json:"controllerIntervalSeconds"`
ControllerMaxDisconnectSeconds int32 `json:"controllerMaxDisconnectSeconds"`
MaxTimeToRunningPhaseSeconds int32 `json:"maxTimeToRunningPhaseSeconds"`
EmulatePreparingIntervalSeconds int32 `json:"emulatePreparingIntervalSeconds"`
Timeouts Timeouts `json:"timeouts"`
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ rabbitmq:
enabled: true
allowExternal: true
plugins: "rabbitmq_management rabbitmq_peer_discovery_k8s"
extraPlugins: "rabbitmq_shovel rabbitmq_shovel_management"
extraPlugins: ""
loadDefinition:
enabled: true
existingSecret: load-definition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration {
await this.messageBusHelper.assertWorkspaceExchange(this.channel);

// TODO(at) clarify on the exchange level
await super.publish(MessageBusHelperImpl.WORKSPACE_EXCHANGE, topic, Buffer.from(JSON.stringify(prebuildInfo)));
await super.publish(MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL, topic, Buffer.from(JSON.stringify(prebuildInfo)));
}

async notifyOnInstanceUpdate(userId: string, instance: WorkspaceInstance) {
Expand Down
Loading