Skip to content

Commit

Permalink
[server] Generalize HeadlessLogService
Browse files Browse the repository at this point in the history
  • Loading branch information
geropl committed Feb 1, 2022
1 parent ecf1113 commit 09be181
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 58 deletions.
5 changes: 3 additions & 2 deletions components/server/src/workspace/gitpod-server-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,8 @@ export class GitpodServerImpl implements GitpodServerWithTracing, Disposable {
async getHeadlessLog(ctx: TraceContext, instanceId: string): Promise<HeadlessLogUrls> {
traceAPIParams(ctx, { instanceId });

const user = this.checkAndBlockUser('getHeadlessLog', { instanceId });
this.checkAndBlockUser('getHeadlessLog', { instanceId });
const logCtx: LogContext = { instanceId };

const ws = await this.workspaceDb.trace(ctx).findByInstanceId(instanceId);
if (!ws) {
Expand All @@ -1179,7 +1180,7 @@ export class GitpodServerImpl implements GitpodServerWithTracing, Disposable {
throw new ResponseError(ErrorCodes.NOT_FOUND, `Workspace instance for ${instanceId} not found`);
}

const urls = await this.headlessLogService.getHeadlessLogURLs(user.id, wsi, ws.ownerId);
const urls = await this.headlessLogService.getHeadlessLogURLs(logCtx, wsi, ws.ownerId);
if (!urls || (typeof urls.streams === "object" && Object.keys(urls.streams).length === 0)) {
throw new ResponseError(ErrorCodes.NOT_FOUND, `Headless logs for ${instanceId} not found`);
}
Expand Down
9 changes: 6 additions & 3 deletions components/server/src/workspace/headless-log-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { CompositeResourceAccessGuard, OwnerResourceGuard, TeamMemberResourceGua
import { DBWithTracing, TracedWorkspaceDB } from "@gitpod/gitpod-db/lib/traced-db";
import { WorkspaceDB } from "@gitpod/gitpod-db/lib/workspace-db";
import { TeamDB } from "@gitpod/gitpod-db/lib/team-db";
import { HeadlessLogService } from "./headless-log-service";
import { HeadlessLogService, HeadlessLogEndpoint } from "./headless-log-service";
import * as opentracing from 'opentracing';
import { asyncHandler } from "../express-util";
import { Deferred } from "@gitpod/gitpod-protocol/lib/util/deferred";
Expand Down Expand Up @@ -54,6 +54,7 @@ export class HeadlessLogController {
const logCtx = { userId: user.id, instanceId, workspaceId: workspace!.id };
log.debug(logCtx, HEADLESS_LOGS_PATH_PREFIX);

const aborted = new Deferred<boolean>();
try {
const head = {
'Content-Type': 'text/html; charset=utf-8', // is text/plain, but with that node.js won't stream...
Expand All @@ -62,7 +63,6 @@ export class HeadlessLogController {
};
res.writeHead(200, head)

const aborted = new Deferred<boolean>();
const abort = (err: any) => {
aborted.resolve(true);
log.debug(logCtx, "headless-log: aborted");
Expand All @@ -88,7 +88,8 @@ export class HeadlessLogController {
process.nextTick(resolve);
}
}));
await this.headlessLogService.streamWorkspaceLog(instance, params.terminalId, writeToResponse, aborted);
const logEndpoint = HeadlessLogEndpoint.fromWithOwnerToken(instance);
await this.headlessLogService.streamWorkspaceLogWhileRunning(logCtx, logEndpoint, instanceId, params.terminalId, writeToResponse, aborted);

// In an ideal world, we'd use res.addTrailers()/response.trailer here. But despite being introduced with HTTP/1.1 in 1999, trailers are not supported by popular proxies (nginx, for example).
// So we resort to this hand-written solution
Expand All @@ -100,6 +101,8 @@ export class HeadlessLogController {

res.write(`\n${HEADLESS_LOG_STREAM_STATUS_CODE}: 500`);
res.end();
} finally {
aborted.resolve(true); // ensure that the promise gets resolved eventually!
}
})]);
router.get("/", malformedRequestHandler);
Expand Down
146 changes: 93 additions & 53 deletions components/server/src/workspace/headless-log-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,41 @@ import { WorkspaceInstance } from "@gitpod/gitpod-protocol";
import * as grpc from '@grpc/grpc-js';
import { Config } from "../config";
import * as browserHeaders from "browser-headers";
import { log } from '@gitpod/gitpod-protocol/lib/util/logging';
import { log, LogContext } from '@gitpod/gitpod-protocol/lib/util/logging';
import { TextDecoder } from "util";
import { WebsocketTransport } from "../util/grpc-web-ws-transport";
import { Deferred } from "@gitpod/gitpod-protocol/lib/util/deferred";
import { ListLogsRequest, ListLogsResponse, LogDownloadURLRequest, LogDownloadURLResponse } from '@gitpod/content-service/lib/headless-log_pb';
import { HEADLESS_LOG_DOWNLOAD_PATH_PREFIX } from "./headless-log-controller";
import { CachingHeadlessLogServiceClientProvider } from "@gitpod/content-service/lib/sugar";

export type HeadlessLogEndpoint = {
url: string,
ownerToken?: string,
headers?: { [key: string]: string },
};
export namespace HeadlessLogEndpoint {
export function authHeaders(logCtx: LogContext, logEndpoint: HeadlessLogEndpoint): browserHeaders.BrowserHeaders | undefined {
const headers = new browserHeaders.BrowserHeaders(logEndpoint.headers);
if (logEndpoint.ownerToken) {
headers.set("x-gitpod-owner-token", logEndpoint.ownerToken);
}

if (Object.keys(headers.headersMap).length === 0) {
log.warn(logCtx, "workspace logs: no ownerToken nor headers!");
return undefined;
}

return headers;
}
export function fromWithOwnerToken(wsi: WorkspaceInstance): HeadlessLogEndpoint {
return {
url: wsi.ideUrl,
ownerToken: wsi.status.ownerToken,
}
}
}

@injectable()
export class HeadlessLogService {
static readonly SUPERVISOR_API_PATH = "/_supervisor/v1";
Expand All @@ -32,21 +59,22 @@ export class HeadlessLogService {
@inject(Config) protected readonly config: Config;
@inject(CachingHeadlessLogServiceClientProvider) protected readonly headlessLogClientProvider: CachingHeadlessLogServiceClientProvider;

public async getHeadlessLogURLs(userId: string, wsi: WorkspaceInstance, ownerId: string, maxTimeoutSecs: number = 30): Promise<HeadlessLogUrls | undefined> {
public async getHeadlessLogURLs(logCtx: LogContext, wsi: WorkspaceInstance, ownerId: string, maxTimeoutSecs: number = 30): Promise<HeadlessLogUrls | undefined> {
if (isSupervisorAvailableSoon(wsi)) {
const logEndpoint = HeadlessLogEndpoint.fromWithOwnerToken(wsi);
const aborted = new Deferred<boolean>();
setTimeout(() => aborted.resolve(true), maxTimeoutSecs * 1000);
const streamIds = await this.retryWhileInstanceIsRunning(wsi, () => this.supervisorListHeadlessLogs(wsi), "list headless log streams", aborted);
const streamIds = await this.retryOnError(() => this.supervisorListHeadlessLogs(logCtx, wsi.id, logEndpoint), "list headless log streams", this.continueWhileRunning(wsi.id), aborted);
if (streamIds !== undefined) {
return streamIds;
}
}

// we were unable to get a repsonse from supervisor - let's try content service next
return await this.contentServiceListLogs(userId, wsi, ownerId);
return await this.contentServiceListLogs(wsi, ownerId);
}

protected async contentServiceListLogs(userId: string, wsi: WorkspaceInstance, ownerId: string): Promise<HeadlessLogUrls | undefined> {
protected async contentServiceListLogs(wsi: WorkspaceInstance, ownerId: string): Promise<HeadlessLogUrls | undefined> {
const req = new ListLogsRequest();
req.setOwnerId(ownerId);
req.setWorkspaceId(wsi.workspaceId);
Expand Down Expand Up @@ -74,19 +102,24 @@ export class HeadlessLogService {
};
}

protected async supervisorListHeadlessLogs(wsi: WorkspaceInstance): Promise<HeadlessLogUrls> {
if (wsi.ideUrl === "") {
protected async supervisorListHeadlessLogs(logCtx: LogContext, instanceId: string, logEndpoint: HeadlessLogEndpoint): Promise<HeadlessLogUrls | undefined> {
const tasks = await this.supervisorListTasks(logCtx, logEndpoint);
return this.renderTasksHeadlessLogUrls(logCtx, instanceId, tasks);
}

protected async supervisorListTasks(logCtx: LogContext, logEndpoint: HeadlessLogEndpoint): Promise<TaskStatus[]> {
if (logEndpoint.url === "") {
// if ideUrl is not yet set we're too early and we deem the workspace not ready yet: retry later!
throw new Error(`instance's ${wsi.id} has no ideUrl, yet`);
throw new Error(`instance's ${logCtx.instanceId} has no ideUrl, yet`);
}

const tasks = await new Promise<TaskStatus[]>((resolve, reject) => {
const client = new StatusServiceClient(toSupervisorURL(wsi.ideUrl), {
const client = new StatusServiceClient(toSupervisorURL(logEndpoint.url), {
transport: WebsocketTransport(),
});

const req = new TasksStatusRequest(); // Note: Don't set observe here at all, else it won't work!
const stream = client.tasksStatus(req, authHeaders(wsi));
const stream = client.tasksStatus(req, HeadlessLogEndpoint.authHeaders(logCtx, logEndpoint));
stream.on('data', (resp: TasksStatusResponse) => {
resolve(resp.getTasksList());
stream.cancel();
Expand All @@ -99,7 +132,10 @@ export class HeadlessLogService {
}
});
});
return tasks;
}

protected renderTasksHeadlessLogUrls(logCtx: LogContext, instanceId: string, tasks: TaskStatus[]): HeadlessLogUrls {
// render URLs that point to server's /headless-logs/ endpoint which forwards calls to the running workspaces's supervisor
const streams: { [id: string]: string } = {};
for (const task of tasks) {
Expand All @@ -109,14 +145,14 @@ export class HeadlessLogService {
// this might be the case when there is no terminal for this task, yet.
// if we find any such case, we deem the workspace not ready yet, and try to reconnect later,
// to be sure to get hold of all terminals created.
throw new Error(`instance's ${wsi.id} task ${task.getId()} has no terminal yet`);
throw new Error(`instance's ${instanceId} task ${task.getId()} has no terminal yet`);
}
if (task.getState() === TaskState.CLOSED) {
// if a task has already been closed we can no longer access it's terminal, and have to skip it.
continue;
}
streams[taskId] = this.config.hostUrl.with({
pathname: `/headless-logs/${wsi.id}/${terminalId}`,
pathname: `/headless-logs/${instanceId}/${terminalId}`,
}).toString();
}
return {
Expand Down Expand Up @@ -158,12 +194,29 @@ export class HeadlessLogService {

/**
* For now, simply stream the supervisor data
*
* @param workspace
* @param logCtx
* @param logEndpoint
* @param instanceId
* @param terminalID
* @param sink
* @param doContinue
* @param aborted
*/
async streamWorkspaceLogWhileRunning(logCtx: LogContext, logEndpoint: HeadlessLogEndpoint, instanceId: string, terminalID: string, sink: (chunk: string) => Promise<void>, aborted: Deferred<boolean>): Promise<void> {
await this.streamWorkspaceLog(logCtx, logEndpoint, terminalID, sink, this.continueWhileRunning(instanceId), aborted);
}

/**
* For now, simply stream the supervisor data
* @param logCtx
* @param logEndpoint
* @param terminalID
* @param sink
* @param doContinue
* @param aborted
*/
async streamWorkspaceLog(wsi: WorkspaceInstance, terminalID: string, sink: (chunk: string) => Promise<void>, aborted: Deferred<boolean>): Promise<void> {
const client = new TerminalServiceClient(toSupervisorURL(wsi.ideUrl), {
protected async streamWorkspaceLog(logCtx: LogContext, logEndpoint: HeadlessLogEndpoint, terminalID: string, sink: (chunk: string) => Promise<void>, doContinue: () => Promise<boolean>, aborted: Deferred<boolean>): Promise<void> {
const client = new TerminalServiceClient(toSupervisorURL(logEndpoint.url), {
transport: WebsocketTransport(), // necessary because HTTPTransport causes caching issues
});
const req = new ListenTerminalRequest();
Expand All @@ -172,10 +225,10 @@ export class HeadlessLogService {
let receivedDataYet = false;
let stream: ResponseStream<ListenTerminalResponse> | undefined = undefined;
aborted.promise.then(() => stream?.cancel());
const doStream = (cancelRetry: () => void) => new Promise<void>((resolve, reject) => {
const doStream = (retry: (doRetry?: boolean) => void) => new Promise<void>((resolve, reject) => {
// [gpl] this is the very reason we cannot redirect the frontend to the supervisor URL: currently we only have ownerTokens for authentication
const decoder = new TextDecoder('utf-8')
stream = client.listen(req, authHeaders(wsi));
stream = client.listen(req, HeadlessLogEndpoint.authHeaders(logCtx, logEndpoint));
stream.on('data', (resp: ListenTerminalResponse) => {
receivedDataYet = true;

Expand All @@ -184,7 +237,7 @@ export class HeadlessLogService {
sink(data)
.catch((err) => {
stream?.cancel(); // If downstream reports an error: cancel connection to upstream
log.debug({ instanceId: wsi.id }, "stream cancelled", err);
log.debug(logCtx, "stream cancelled", err);
});
});
stream.on('end', (status?: Status) => {
Expand All @@ -201,58 +254,56 @@ export class HeadlessLogService {
return;
}

cancelRetry();
retry(false);
reject(err);
});
});
await this.retryWhileInstanceIsRunning(wsi, doStream, "stream workspace logs", aborted);
await this.retryOnError(doStream, "stream workspace logs", doContinue, aborted);
}

/**
* Retries op while the passed WorkspaceInstance is still starting. Retries are stopped if either:
* - `op` calls `cancel()` and an err is thrown, it is re-thrown by this method
* - `op` calls `retry(false)` and an err is thrown, it is re-thrown by this method
* - `aborted` resolves to `true`: `undefined` is returned
* - if the instance enters the either STOPPING/STOPPED phases, we stop retrying, and return `undefined`
* @param wsi
* - `(await while()) === true`: `undefined` is returned
* @param op
* @param description
* @param doContinue
* @param aborted
* @returns
*/
protected async retryWhileInstanceIsRunning<T>(wsi: WorkspaceInstance, op: (cancel: () => void) => Promise<T>, description: string, aborted: Deferred<boolean>): Promise<T | undefined> {
let cancelled = false;
const cancel = () => { cancelled = true; };
protected async retryOnError<T>(op: (cancel: () => void) => Promise<T>, description: string, doContinue: () => Promise<boolean>, aborted: Deferred<boolean>): Promise<T | undefined> {
let retry = true;
const retryFunction = (doRetry: boolean = true) => { retry = doRetry };

let instance = wsi;
while (!cancelled && !(aborted.isResolved && (await aborted.promise)) ) {
while (retry && !(aborted.isResolved && (await aborted.promise)) ) {
try {
return await op(cancel);
return await op(retryFunction);
} catch (err) {
if (cancelled) {
if (!retry) {
throw err;
}

log.debug(`unable to ${description}`, err);
const maybeInstance = await this.db.findInstanceById(instance.id);
if (!maybeInstance) {
const shouldContinue = await doContinue();
if (!shouldContinue) {
return undefined;
}
instance = maybeInstance;

if (!this.shouldRetry(instance)) {
return undefined;
}
log.debug(`re-trying ${description}...`);
log.debug(`unable to ${description}, retrying...`, err);
await new Promise((resolve) => setTimeout(resolve, 2000));
continue;
}
}
return undefined;
}

protected shouldRetry(wsi: WorkspaceInstance): boolean {
return isSupervisorAvailableSoon(wsi);
}
protected continueWhileRunning(instanceId: string): () => Promise<boolean> {
const db = this.db;
return async () => {
const maybeInstance = await db.findInstanceById(instanceId);
return !!maybeInstance && isSupervisorAvailableSoon(maybeInstance);
}
};
}

function isSupervisorAvailableSoon(wsi: WorkspaceInstance): boolean {
Expand All @@ -273,14 +324,3 @@ function toSupervisorURL(ideUrl: string): string {
u.pathname = HeadlessLogService.SUPERVISOR_API_PATH;
return u.toString();
}

function authHeaders(wsi: WorkspaceInstance): browserHeaders.BrowserHeaders | undefined {
const ownerToken = wsi.status.ownerToken;
if (!ownerToken) {
log.warn({ instanceId: wsi.id }, "workspace logs: owner token not found");
return undefined;
}
const headers = new browserHeaders.BrowserHeaders();
headers.set("x-gitpod-owner-token", ownerToken);
return headers;
}

0 comments on commit 09be181

Please sign in to comment.