From 09be181d8d260d77bae6da4041ae5fd5e802849c Mon Sep 17 00:00:00 2001 From: Gero Posmyk-Leinemann Date: Mon, 31 Jan 2022 08:12:55 +0000 Subject: [PATCH] [server] Generalize HeadlessLogService --- .../src/workspace/gitpod-server-impl.ts | 5 +- .../src/workspace/headless-log-controller.ts | 9 +- .../src/workspace/headless-log-service.ts | 146 +++++++++++------- 3 files changed, 102 insertions(+), 58 deletions(-) diff --git a/components/server/src/workspace/gitpod-server-impl.ts b/components/server/src/workspace/gitpod-server-impl.ts index 714b72b0215242..0fa8d2c5e646ac 100644 --- a/components/server/src/workspace/gitpod-server-impl.ts +++ b/components/server/src/workspace/gitpod-server-impl.ts @@ -1162,7 +1162,8 @@ export class GitpodServerImpl implements GitpodServerWithTracing, Disposable { async getHeadlessLog(ctx: TraceContext, instanceId: string): Promise { traceAPIParams(ctx, { instanceId }); - const user = this.checkAndBlockUser('getHeadlessLog', { instanceId }); + this.checkAndBlockUser('getHeadlessLog', { instanceId }); + const logCtx: LogContext = { instanceId }; const ws = await this.workspaceDb.trace(ctx).findByInstanceId(instanceId); if (!ws) { @@ -1179,7 +1180,7 @@ export class GitpodServerImpl implements GitpodServerWithTracing, Disposable { throw new ResponseError(ErrorCodes.NOT_FOUND, `Workspace instance for ${instanceId} not found`); } - const urls = await this.headlessLogService.getHeadlessLogURLs(user.id, wsi, ws.ownerId); + const urls = await this.headlessLogService.getHeadlessLogURLs(logCtx, wsi, ws.ownerId); if (!urls || (typeof urls.streams === "object" && Object.keys(urls.streams).length === 0)) { throw new ResponseError(ErrorCodes.NOT_FOUND, `Headless logs for ${instanceId} not found`); } diff --git a/components/server/src/workspace/headless-log-controller.ts b/components/server/src/workspace/headless-log-controller.ts index 2a0c11b96fbb3c..669ab18c4c2d50 100644 --- a/components/server/src/workspace/headless-log-controller.ts +++ b/components/server/src/workspace/headless-log-controller.ts @@ -12,7 +12,7 @@ import { CompositeResourceAccessGuard, OwnerResourceGuard, TeamMemberResourceGua import { DBWithTracing, TracedWorkspaceDB } from "@gitpod/gitpod-db/lib/traced-db"; import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db"; import { TeamDB } from "@gitpod/gitpod-db/lib/team-db"; -import { HeadlessLogService } from "./headless-log-service"; +import { HeadlessLogService, HeadlessLogEndpoint } from "./headless-log-service"; import * as opentracing from 'opentracing'; import { asyncHandler } from "../express-util"; import { Deferred } from "@gitpod/gitpod-protocol/lib/util/deferred"; @@ -54,6 +54,7 @@ export class HeadlessLogController { const logCtx = { userId: user.id, instanceId, workspaceId: workspace!.id }; log.debug(logCtx, HEADLESS_LOGS_PATH_PREFIX); + const aborted = new Deferred(); try { const head = { 'Content-Type': 'text/html; charset=utf-8', // is text/plain, but with that node.js won't stream... @@ -62,7 +63,6 @@ export class HeadlessLogController { }; res.writeHead(200, head) - const aborted = new Deferred(); const abort = (err: any) => { aborted.resolve(true); log.debug(logCtx, "headless-log: aborted"); @@ -88,7 +88,8 @@ export class HeadlessLogController { process.nextTick(resolve); } })); - await this.headlessLogService.streamWorkspaceLog(instance, params.terminalId, writeToResponse, aborted); + const logEndpoint = HeadlessLogEndpoint.fromWithOwnerToken(instance); + await this.headlessLogService.streamWorkspaceLogWhileRunning(logCtx, logEndpoint, instanceId, params.terminalId, writeToResponse, aborted); // In an ideal world, we'd use res.addTrailers()/response.trailer here. But despite being introduced with HTTP/1.1 in 1999, trailers are not supported by popular proxies (nginx, for example). // So we resort to this hand-written solution @@ -100,6 +101,8 @@ export class HeadlessLogController { res.write(`\n${HEADLESS_LOG_STREAM_STATUS_CODE}: 500`); res.end(); + } finally { + aborted.resolve(true); // ensure that the promise gets resolved eventually! } })]); router.get("/", malformedRequestHandler); diff --git a/components/server/src/workspace/headless-log-service.ts b/components/server/src/workspace/headless-log-service.ts index 5cd6e11444638a..95919c9ebb91ad 100644 --- a/components/server/src/workspace/headless-log-service.ts +++ b/components/server/src/workspace/headless-log-service.ts @@ -16,7 +16,7 @@ import { WorkspaceInstance } from "@gitpod/gitpod-protocol"; import * as grpc from '@grpc/grpc-js'; import { Config } from "../config"; import * as browserHeaders from "browser-headers"; -import { log } from '@gitpod/gitpod-protocol/lib/util/logging'; +import { log, LogContext } from '@gitpod/gitpod-protocol/lib/util/logging'; import { TextDecoder } from "util"; import { WebsocketTransport } from "../util/grpc-web-ws-transport"; import { Deferred } from "@gitpod/gitpod-protocol/lib/util/deferred"; @@ -24,6 +24,33 @@ import { ListLogsRequest, ListLogsResponse, LogDownloadURLRequest, LogDownloadUR import { HEADLESS_LOG_DOWNLOAD_PATH_PREFIX } from "./headless-log-controller"; import { CachingHeadlessLogServiceClientProvider } from "@gitpod/content-service/lib/sugar"; +export type HeadlessLogEndpoint = { + url: string, + ownerToken?: string, + headers?: { [key: string]: string }, +}; +export namespace HeadlessLogEndpoint { + export function authHeaders(logCtx: LogContext, logEndpoint: HeadlessLogEndpoint): browserHeaders.BrowserHeaders | undefined { + const headers = new browserHeaders.BrowserHeaders(logEndpoint.headers); + if (logEndpoint.ownerToken) { + headers.set("x-gitpod-owner-token", logEndpoint.ownerToken); + } + + if (Object.keys(headers.headersMap).length === 0) { + log.warn(logCtx, "workspace logs: no ownerToken nor headers!"); + return undefined; + } + + return headers; + } + export function fromWithOwnerToken(wsi: WorkspaceInstance): HeadlessLogEndpoint { + return { + url: wsi.ideUrl, + ownerToken: wsi.status.ownerToken, + } + } +} + @injectable() export class HeadlessLogService { static readonly SUPERVISOR_API_PATH = "/_supervisor/v1"; @@ -32,21 +59,22 @@ export class HeadlessLogService { @inject(Config) protected readonly config: Config; @inject(CachingHeadlessLogServiceClientProvider) protected readonly headlessLogClientProvider: CachingHeadlessLogServiceClientProvider; - public async getHeadlessLogURLs(userId: string, wsi: WorkspaceInstance, ownerId: string, maxTimeoutSecs: number = 30): Promise { + public async getHeadlessLogURLs(logCtx: LogContext, wsi: WorkspaceInstance, ownerId: string, maxTimeoutSecs: number = 30): Promise { if (isSupervisorAvailableSoon(wsi)) { + const logEndpoint = HeadlessLogEndpoint.fromWithOwnerToken(wsi); const aborted = new Deferred(); setTimeout(() => aborted.resolve(true), maxTimeoutSecs * 1000); - const streamIds = await this.retryWhileInstanceIsRunning(wsi, () => this.supervisorListHeadlessLogs(wsi), "list headless log streams", aborted); + const streamIds = await this.retryOnError(() => this.supervisorListHeadlessLogs(logCtx, wsi.id, logEndpoint), "list headless log streams", this.continueWhileRunning(wsi.id), aborted); if (streamIds !== undefined) { return streamIds; } } // we were unable to get a repsonse from supervisor - let's try content service next - return await this.contentServiceListLogs(userId, wsi, ownerId); + return await this.contentServiceListLogs(wsi, ownerId); } - protected async contentServiceListLogs(userId: string, wsi: WorkspaceInstance, ownerId: string): Promise { + protected async contentServiceListLogs(wsi: WorkspaceInstance, ownerId: string): Promise { const req = new ListLogsRequest(); req.setOwnerId(ownerId); req.setWorkspaceId(wsi.workspaceId); @@ -74,19 +102,24 @@ export class HeadlessLogService { }; } - protected async supervisorListHeadlessLogs(wsi: WorkspaceInstance): Promise { - if (wsi.ideUrl === "") { + protected async supervisorListHeadlessLogs(logCtx: LogContext, instanceId: string, logEndpoint: HeadlessLogEndpoint): Promise { + const tasks = await this.supervisorListTasks(logCtx, logEndpoint); + return this.renderTasksHeadlessLogUrls(logCtx, instanceId, tasks); + } + + protected async supervisorListTasks(logCtx: LogContext, logEndpoint: HeadlessLogEndpoint): Promise { + if (logEndpoint.url === "") { // if ideUrl is not yet set we're too early and we deem the workspace not ready yet: retry later! - throw new Error(`instance's ${wsi.id} has no ideUrl, yet`); + throw new Error(`instance's ${logCtx.instanceId} has no ideUrl, yet`); } const tasks = await new Promise((resolve, reject) => { - const client = new StatusServiceClient(toSupervisorURL(wsi.ideUrl), { + const client = new StatusServiceClient(toSupervisorURL(logEndpoint.url), { transport: WebsocketTransport(), }); const req = new TasksStatusRequest(); // Note: Don't set observe here at all, else it won't work! - const stream = client.tasksStatus(req, authHeaders(wsi)); + const stream = client.tasksStatus(req, HeadlessLogEndpoint.authHeaders(logCtx, logEndpoint)); stream.on('data', (resp: TasksStatusResponse) => { resolve(resp.getTasksList()); stream.cancel(); @@ -99,7 +132,10 @@ export class HeadlessLogService { } }); }); + return tasks; + } + protected renderTasksHeadlessLogUrls(logCtx: LogContext, instanceId: string, tasks: TaskStatus[]): HeadlessLogUrls { // render URLs that point to server's /headless-logs/ endpoint which forwards calls to the running workspaces's supervisor const streams: { [id: string]: string } = {}; for (const task of tasks) { @@ -109,14 +145,14 @@ export class HeadlessLogService { // this might be the case when there is no terminal for this task, yet. // if we find any such case, we deem the workspace not ready yet, and try to reconnect later, // to be sure to get hold of all terminals created. - throw new Error(`instance's ${wsi.id} task ${task.getId()} has no terminal yet`); + throw new Error(`instance's ${instanceId} task ${task.getId()} has no terminal yet`); } if (task.getState() === TaskState.CLOSED) { // if a task has already been closed we can no longer access it's terminal, and have to skip it. continue; } streams[taskId] = this.config.hostUrl.with({ - pathname: `/headless-logs/${wsi.id}/${terminalId}`, + pathname: `/headless-logs/${instanceId}/${terminalId}`, }).toString(); } return { @@ -158,12 +194,29 @@ export class HeadlessLogService { /** * For now, simply stream the supervisor data - * - * @param workspace + * @param logCtx + * @param logEndpoint + * @param instanceId + * @param terminalID + * @param sink + * @param doContinue + * @param aborted + */ + async streamWorkspaceLogWhileRunning(logCtx: LogContext, logEndpoint: HeadlessLogEndpoint, instanceId: string, terminalID: string, sink: (chunk: string) => Promise, aborted: Deferred): Promise { + await this.streamWorkspaceLog(logCtx, logEndpoint, terminalID, sink, this.continueWhileRunning(instanceId), aborted); + } + + /** + * For now, simply stream the supervisor data + * @param logCtx + * @param logEndpoint * @param terminalID + * @param sink + * @param doContinue + * @param aborted */ - async streamWorkspaceLog(wsi: WorkspaceInstance, terminalID: string, sink: (chunk: string) => Promise, aborted: Deferred): Promise { - const client = new TerminalServiceClient(toSupervisorURL(wsi.ideUrl), { + protected async streamWorkspaceLog(logCtx: LogContext, logEndpoint: HeadlessLogEndpoint, terminalID: string, sink: (chunk: string) => Promise, doContinue: () => Promise, aborted: Deferred): Promise { + const client = new TerminalServiceClient(toSupervisorURL(logEndpoint.url), { transport: WebsocketTransport(), // necessary because HTTPTransport causes caching issues }); const req = new ListenTerminalRequest(); @@ -172,10 +225,10 @@ export class HeadlessLogService { let receivedDataYet = false; let stream: ResponseStream | undefined = undefined; aborted.promise.then(() => stream?.cancel()); - const doStream = (cancelRetry: () => void) => new Promise((resolve, reject) => { + const doStream = (retry: (doRetry?: boolean) => void) => new Promise((resolve, reject) => { // [gpl] this is the very reason we cannot redirect the frontend to the supervisor URL: currently we only have ownerTokens for authentication const decoder = new TextDecoder('utf-8') - stream = client.listen(req, authHeaders(wsi)); + stream = client.listen(req, HeadlessLogEndpoint.authHeaders(logCtx, logEndpoint)); stream.on('data', (resp: ListenTerminalResponse) => { receivedDataYet = true; @@ -184,7 +237,7 @@ export class HeadlessLogService { sink(data) .catch((err) => { stream?.cancel(); // If downstream reports an error: cancel connection to upstream - log.debug({ instanceId: wsi.id }, "stream cancelled", err); + log.debug(logCtx, "stream cancelled", err); }); }); stream.on('end', (status?: Status) => { @@ -201,48 +254,42 @@ export class HeadlessLogService { return; } - cancelRetry(); + retry(false); reject(err); }); }); - await this.retryWhileInstanceIsRunning(wsi, doStream, "stream workspace logs", aborted); + await this.retryOnError(doStream, "stream workspace logs", doContinue, aborted); } /** * Retries op while the passed WorkspaceInstance is still starting. Retries are stopped if either: - * - `op` calls `cancel()` and an err is thrown, it is re-thrown by this method + * - `op` calls `retry(false)` and an err is thrown, it is re-thrown by this method * - `aborted` resolves to `true`: `undefined` is returned - * - if the instance enters the either STOPPING/STOPPED phases, we stop retrying, and return `undefined` - * @param wsi + * - `(await while()) === true`: `undefined` is returned * @param op * @param description + * @param doContinue * @param aborted * @returns */ - protected async retryWhileInstanceIsRunning(wsi: WorkspaceInstance, op: (cancel: () => void) => Promise, description: string, aborted: Deferred): Promise { - let cancelled = false; - const cancel = () => { cancelled = true; }; + protected async retryOnError(op: (cancel: () => void) => Promise, description: string, doContinue: () => Promise, aborted: Deferred): Promise { + let retry = true; + const retryFunction = (doRetry: boolean = true) => { retry = doRetry }; - let instance = wsi; - while (!cancelled && !(aborted.isResolved && (await aborted.promise)) ) { + while (retry && !(aborted.isResolved && (await aborted.promise)) ) { try { - return await op(cancel); + return await op(retryFunction); } catch (err) { - if (cancelled) { + if (!retry) { throw err; } - log.debug(`unable to ${description}`, err); - const maybeInstance = await this.db.findInstanceById(instance.id); - if (!maybeInstance) { + const shouldContinue = await doContinue(); + if (!shouldContinue) { return undefined; } - instance = maybeInstance; - if (!this.shouldRetry(instance)) { - return undefined; - } - log.debug(`re-trying ${description}...`); + log.debug(`unable to ${description}, retrying...`, err); await new Promise((resolve) => setTimeout(resolve, 2000)); continue; } @@ -250,9 +297,13 @@ export class HeadlessLogService { return undefined; } - protected shouldRetry(wsi: WorkspaceInstance): boolean { - return isSupervisorAvailableSoon(wsi); - } + protected continueWhileRunning(instanceId: string): () => Promise { + const db = this.db; + return async () => { + const maybeInstance = await db.findInstanceById(instanceId); + return !!maybeInstance && isSupervisorAvailableSoon(maybeInstance); + } + }; } function isSupervisorAvailableSoon(wsi: WorkspaceInstance): boolean { @@ -273,14 +324,3 @@ function toSupervisorURL(ideUrl: string): string { u.pathname = HeadlessLogService.SUPERVISOR_API_PATH; return u.toString(); } - -function authHeaders(wsi: WorkspaceInstance): browserHeaders.BrowserHeaders | undefined { - const ownerToken = wsi.status.ownerToken; - if (!ownerToken) { - log.warn({ instanceId: wsi.id }, "workspace logs: owner token not found"); - return undefined; - } - const headers = new browserHeaders.BrowserHeaders(); - headers.set("x-gitpod-owner-token", ownerToken); - return headers; -}