Skip to content

Commit

Permalink
Update notifications on updates to subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexTugarev committed Sep 6, 2022
1 parent d2064a8 commit 872b62e
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 12 deletions.
21 changes: 14 additions & 7 deletions components/dashboard/src/AppNotifications.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ export function AppNotifications() {
setNotifications(localState);
return;
}
(async () => {
const serverState = await getGitpodService().server.getNotifications();
setNotifications(serverState);
if (serverState.length > 0) {
setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 60 /* seconds */);
}
})();
reloadNotifications().catch(console.error);

getGitpodService().registerClient({
onSubscriptionUpdate: () => reloadNotifications().catch(console.error),
});
}, []);

const reloadNotifications = async () => {
const serverState = await getGitpodService().server.getNotifications();
setNotifications(serverState);
removeLocalStorageObject(KEY_APP_NOTIFICATIONS);
if (serverState.length > 0) {
setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 300 /* seconds */);
}
};

const topNotification = notifications[0];
if (topNotification === undefined) {
return null;
Expand Down
6 changes: 6 additions & 0 deletions components/gitpod-messagebus/src/messagebus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ export interface MessageBusHelper {
* @param topic the topic to parse
*/
getWsInformationFromTopic(topic: string): WorkspaceTopic | undefined;

getSubscriptionUpdateTopic(attributionId?: string): string;
}

export const WorkspaceTopic = Symbol("WorkspaceTopic");
Expand Down Expand Up @@ -89,6 +91,10 @@ export class MessageBusHelperImpl implements MessageBusHelper {
await ch.assertExchange(this.workspaceExchange, "topic", { durable: true });
}

getSubscriptionUpdateTopic(attributionId: string | undefined): string {
return `subscription.${attributionId || "*"}.update`;
}

/**
* Computes the topic name of for listening to a workspace.
*
Expand Down
14 changes: 14 additions & 0 deletions components/gitpod-protocol/src/gitpod-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ export interface GitpodClient {

onPrebuildUpdate(update: PrebuildWithStatus): void;

onSubscriptionUpdate(attributionId: string): void;

onCreditAlert(creditAlert: CreditAlert): void;

//#region propagating reconnection to iframe
Expand Down Expand Up @@ -573,6 +575,18 @@ export class GitpodCompositeClient<Client extends GitpodClient> implements Gitpo
}
}
}

onSubscriptionUpdate(attributionId: string): void {
for (const client of this.clients) {
if (client.onSubscriptionUpdate) {
try {
client.onSubscriptionUpdate(attributionId);
} catch (error) {
console.error(error);
}
}
}
}
}

export type GitpodService = GitpodServiceImpl<GitpodClient, GitpodServer>;
Expand Down
47 changes: 42 additions & 5 deletions components/server/ee/src/workspace/gitpod-server-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ import { BillingMode } from "@gitpod/gitpod-protocol/lib/billing-mode";
import { BillingModes } from "../billing/billing-mode";
import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/experiments/configcat-server";
import { BillingService } from "../billing/billing-service";
import { MessageBusIntegration } from "../../../src/workspace/messagebus-integration";

@injectable()
export class GitpodServerEEImpl extends GitpodServerImpl {
Expand Down Expand Up @@ -167,6 +168,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
@inject(BillingModes) protected readonly billingModes: BillingModes;
@inject(BillingService) protected readonly billingService: BillingService;

@inject(MessageBusIntegration) protected readonly messageBus: MessageBusIntegration;

initialize(
client: GitpodClient | undefined,
user: User | undefined,
Expand All @@ -179,6 +182,7 @@ export class GitpodServerEEImpl extends GitpodServerImpl {

this.listenToCreditAlerts();
this.listenForPrebuildUpdates().catch((err) => log.error("error registering for prebuild updates", err));
this.listenForSubscriptionUpdates().catch((err) => log.error("error registering for prebuild updates", err));
}

protected async listenForPrebuildUpdates() {
Expand Down Expand Up @@ -206,6 +210,32 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
// TODO(at) we need to keep the list of accessible project up to date
}

protected async listenForSubscriptionUpdates() {
if (!this.user) {
return;
}
const teamIds = (await this.teamDB.findTeamsByUser(this.user.id)).map(({ id }) =>
AttributionId.render({ kind: "team", teamId: id }),
);
for (const attributionId of [AttributionId.render({ kind: "user", userId: this.user.id }), ...teamIds]) {
this.disposables.push(
this.localMessageBroker.listenForSubscriptionUpdates(
attributionId,
(ctx: TraceContext, attributionId: string) =>
TraceContext.withSpan(
"forwardSubscriptionUpdateToClient",
(ctx) => {
traceClientMetadata(ctx, this.clientMetadata);
TraceContext.setJsonRPCMetadata(ctx, "onSubscriptionUpdate");
this.client?.onSubscriptionUpdate(attributionId);
},
ctx,
),
),
);
}
}

protected async getAccessibleProjects() {
if (!this.user) {
return [];
Expand Down Expand Up @@ -2104,13 +2134,16 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
await this.stripeService.setDefaultPaymentMethodForCustomer(customer, setupIntentId);
await this.stripeService.createSubscriptionForCustomer(customer);

const attributionId = AttributionId.render({ kind: "team", teamId });
const attributionId: AttributionId = { kind: "team", teamId };
const attributionIdString = AttributionId.render(attributionId);

// Creating a cost center for this team
await this.costCenterDB.storeEntry({
id: attributionId,
id: attributionIdString,
spendingLimit: this.defaultSpendingLimit,
});

this.messageBus.notifyOnSubscriptionUpdate(ctx, attributionId).catch();
} catch (error) {
log.error(`Failed to subscribe team '${teamId}' to Stripe`, error);
throw new ResponseError(ErrorCodes.INTERNAL_SERVER_ERROR, `Failed to subscribe team '${teamId}' to Stripe`);
Expand Down Expand Up @@ -2155,13 +2188,17 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
if (typeof usageLimit !== "number" || usageLimit < 0) {
throw new ResponseError(ErrorCodes.BAD_REQUEST, "Unexpected `usageLimit` value.");
}
const attributionId = AttributionId.render({ kind: "team", teamId });
await this.guardCostCenterAccess(ctx, user.id, attributionId, "update");

const attributionId: AttributionId = { kind: "team", teamId };
const attributionIdString = AttributionId.render(attributionId);
await this.guardCostCenterAccess(ctx, user.id, attributionIdString, "update");

await this.costCenterDB.storeEntry({
id: AttributionId.render({ kind: "team", teamId }),
id: attributionIdString,
spendingLimit: usageLimit,
});

this.messageBus.notifyOnSubscriptionUpdate(ctx, attributionId).catch();
}

async getNotifications(ctx: TraceContext): Promise<string[]> {
Expand Down
25 changes: 25 additions & 0 deletions components/server/src/messaging/local-message-broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import { MessageBusIntegration } from "../workspace/messagebus-integration";
export interface PrebuildUpdateListener {
(ctx: TraceContext, evt: PrebuildWithStatus): void;
}
export interface SubscriptionUpdateListener {
(ctx: TraceContext, attributionId: string): void;
}
export interface CreditAlertListener {
(ctx: TraceContext, alert: CreditAlert): void;
}
Expand All @@ -38,6 +41,8 @@ export interface LocalMessageBroker {

listenForPrebuildUpdates(projectId: string, listener: PrebuildUpdateListener): Disposable;

listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable;

listenToCreditAlerts(userId: string, listener: CreditAlertListener): Disposable;

listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable;
Expand Down Expand Up @@ -69,6 +74,7 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
protected creditAlertsListeners: Map<string, CreditAlertListener[]> = new Map();
protected headlessWorkspaceEventListeners: Map<string, HeadlessWorkspaceEventListener[]> = new Map();
protected workspaceInstanceUpdateListeners: Map<string, WorkspaceInstanceUpdateListener[]> = new Map();
protected subscriptionUpdateListeners: Map<string, SubscriptionUpdateListener[]> = new Map();

protected readonly disposables = new DisposableCollection();

Expand Down Expand Up @@ -151,6 +157,21 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
},
),
);
this.disposables.push(
this.messageBusIntegration.listenToSubscriptionUpdates((ctx: TraceContext, attributionId: string) => {
TraceContext.setOWI(ctx, {});

const listeners = this.subscriptionUpdateListeners.get(attributionId) || [];
for (const l of listeners) {
try {
l(ctx, attributionId);
} catch (err) {
TraceContext.setError(ctx, err);
log.error("listenForWorkspaceInstanceUpdates", err, { attributionId });
}
}
}),
);
}

async stop() {
Expand All @@ -165,6 +186,10 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
return this.doRegister(userId, listener, this.creditAlertsListeners);
}

listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable {
return this.doRegister(attributionId, listener, this.subscriptionUpdateListeners);
}

listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable {
// we're being cheap here in re-using a map where it just needs to be a plain array.
return this.doRegister(
Expand Down
32 changes: 32 additions & 0 deletions components/server/src/workspace/messagebus-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import * as opentracing from "opentracing";
import { CancellationTokenSource } from "vscode-ws-jsonrpc";
import { increaseMessagebusTopicReads } from "../prometheus-metrics";
import { CreditAlert } from "@gitpod/gitpod-protocol/lib/accounting-protocol";
import { AttributionId } from "@gitpod/gitpod-protocol/lib/attribution";

interface WorkspaceInstanceUpdateCallback {
(ctx: TraceContext, instance: WorkspaceInstance, ownerId: string | undefined): void;
Expand Down Expand Up @@ -72,6 +73,16 @@ export class CreditAlertListener extends AbstractTopicListener<CreditAlert> {
}
}

export class SubscriptionUpdateListener extends AbstractTopicListener<string> {
constructor(protected messageBusHelper: MessageBusHelper, listener: TopicListener<string>) {
super(messageBusHelper.workspaceExchange, listener);
}

topic() {
return this.messageBusHelper.getSubscriptionUpdateTopic();
}
}

export class PrebuildUpdatableQueueListener implements MessagebusListener {
protected channel: Channel | undefined;
protected consumerTag: string | undefined;
Expand Down Expand Up @@ -208,6 +219,27 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration {
return Disposable.create(() => cancellationTokenSource.cancel());
}

async notifyOnSubscriptionUpdate(ctx: TraceContext, attributionId: AttributionId) {
if (!this.channel) {
throw new Error("Not connected to message bus");
}
const topic = this.messageBusHelper.getSubscriptionUpdateTopic(AttributionId.render(attributionId));
const msg = Buffer.from(JSON.stringify({}));
await this.messageBusHelper.assertWorkspaceExchange(this.channel);
await super.publish(MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL, topic, msg, {
trace: ctx,
});
}

listenToSubscriptionUpdates(callback: (ctx: TraceContext, attributionId: string) => void): Disposable {
const listener = new SubscriptionUpdateListener(this.messageBusHelper, callback);
const cancellationTokenSource = new CancellationTokenSource();
this.listen(listener, cancellationTokenSource.token).catch((err) => {
/** ignore */
});
return Disposable.create(() => cancellationTokenSource.cancel());
}

async notifyOnPrebuildUpdate(prebuildInfo: PrebuildWithStatus) {
if (!this.channel) {
throw new Error("Not connected to message bus");
Expand Down

0 comments on commit 872b62e

Please sign in to comment.