Skip to content

Commit

Permalink
[server] Finish spans
Browse files Browse the repository at this point in the history
  • Loading branch information
geropl authored and roboquat committed Mar 21, 2022
1 parent 4fa1859 commit 6043cc0
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 86 deletions.
130 changes: 69 additions & 61 deletions components/server/src/workspace/headless-log-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { accessHeadlessLogs } from "../auth/rate-limiter";
import { BearerAuth } from "../auth/bearer-authenticator";
import { ProjectsService } from "../projects/projects-service";
import { HostContextProvider } from "../auth/host-context-provider";
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";

export const HEADLESS_LOGS_PATH_PREFIX = "/headless-logs";
export const HEADLESS_LOG_DOWNLOAD_PATH_PREFIX = "/headless-log-download";
Expand All @@ -54,79 +55,86 @@ export class HeadlessLogController {
authenticateAndAuthorize,
asyncHandler(async (req: express.Request, res: express.Response) => {
const span = opentracing.globalTracer().startSpan(HEADLESS_LOGS_PATH_PREFIX);
const params = { instanceId: req.params.instanceId, terminalId: req.params.terminalId };
const user = req.user as User; // verified by authenticateAndAuthorize

const instanceId = params.instanceId;
const ws = await this.authorizeHeadlessLogAccess(span, user, instanceId, res);
if (!ws) {
return;
}
const { workspace, instance } = ws;
try {
const params = { instanceId: req.params.instanceId, terminalId: req.params.terminalId };
const user = req.user as User; // verified by authenticateAndAuthorize

const logCtx = { userId: user.id, instanceId, workspaceId: workspace!.id };
log.debug(logCtx, HEADLESS_LOGS_PATH_PREFIX);
const instanceId = params.instanceId;
const ws = await this.authorizeHeadlessLogAccess(span, user, instanceId, res);
if (!ws) {
return;
}
const { workspace, instance } = ws;

const aborted = new Deferred<boolean>();
try {
const head = {
"Content-Type": "text/html; charset=utf-8", // is text/plain, but with that node.js won't stream...
"Transfer-Encoding": "chunked",
"Cache-Control": "no-cache, no-store, must-revalidate", // make sure stream are not re-used on reconnect
};
res.writeHead(200, head);
const logCtx = { userId: user.id, instanceId, workspaceId: workspace!.id };
log.debug(logCtx, HEADLESS_LOGS_PATH_PREFIX);

const abort = (err: any) => {
aborted.resolve(true);
log.debug(logCtx, "headless-log: aborted");
};
req.on("abort", abort); // abort handling if the reqeuest was aborted
const aborted = new Deferred<boolean>();
try {
const head = {
"Content-Type": "text/html; charset=utf-8", // is text/plain, but with that node.js won't stream...
"Transfer-Encoding": "chunked",
"Cache-Control": "no-cache, no-store, must-revalidate", // make sure stream are not re-used on reconnect
};
res.writeHead(200, head);

const queue = new Queue(); // Make sure we forward in the correct order
const writeToResponse = async (chunk: string) =>
queue.enqueue(
() =>
new Promise<void>(async (resolve, reject) => {
if (aborted.isResolved && (await aborted.promise)) {
return;
}
const abort = (err: any) => {
aborted.resolve(true);
log.debug(logCtx, "headless-log: aborted");
};
req.on("abort", abort); // abort handling if the reqeuest was aborted

const done = res.write(chunk, "utf-8", (err?: Error | null) => {
if (err) {
reject(err); // propagate write error to upstream
const queue = new Queue(); // Make sure we forward in the correct order
const writeToResponse = async (chunk: string) =>
queue.enqueue(
() =>
new Promise<void>(async (resolve, reject) => {
if (aborted.isResolved && (await aborted.promise)) {
return;
}
});
// handle as per doc: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback
if (!done) {
res.once("drain", resolve);
} else {
setImmediate(resolve);
}
}),

const done = res.write(chunk, "utf-8", (err?: Error | null) => {
if (err) {
reject(err); // propagate write error to upstream
return;
}
});
// handle as per doc: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback
if (!done) {
res.once("drain", resolve);
} else {
setImmediate(resolve);
}
}),
);
const logEndpoint = HeadlessLogEndpoint.fromWithOwnerToken(instance);
await this.headlessLogService.streamWorkspaceLogWhileRunning(
logCtx,
logEndpoint,
instanceId,
params.terminalId,
writeToResponse,
aborted,
);
const logEndpoint = HeadlessLogEndpoint.fromWithOwnerToken(instance);
await this.headlessLogService.streamWorkspaceLogWhileRunning(
logCtx,
logEndpoint,
instanceId,
params.terminalId,
writeToResponse,
aborted,
);

// In an ideal world, we'd use res.addTrailers()/response.trailer here. But despite being introduced with HTTP/1.1 in 1999, trailers are not supported by popular proxies (nginx, for example).
// So we resort to this hand-written solution
res.write(`\n${HEADLESS_LOG_STREAM_STATUS_CODE}: 200`);
// In an ideal world, we'd use res.addTrailers()/response.trailer here. But despite being introduced with HTTP/1.1 in 1999, trailers are not supported by popular proxies (nginx, for example).
// So we resort to this hand-written solution
res.write(`\n${HEADLESS_LOG_STREAM_STATUS_CODE}: 200`);

res.end();
} catch (err) {
log.debug(logCtx, "error streaming headless logs", err);
res.end();
} catch (err) {
log.debug(logCtx, "error streaming headless logs", err);

res.write(`\n${HEADLESS_LOG_STREAM_STATUS_CODE}: 500`);
res.end();
res.write(`\n${HEADLESS_LOG_STREAM_STATUS_CODE}: 500`);
res.end();
} finally {
aborted.resolve(true); // ensure that the promise gets resolved eventually!
}
} catch (e) {
TraceContext.setError({ span }, e);
throw e;
} finally {
aborted.resolve(true); // ensure that the promise gets resolved eventually!
span.finish();
}
}),
]);
Expand Down
66 changes: 41 additions & 25 deletions components/server/src/workspace/workspace-starter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,19 @@ export class WorkspaceStarter {

protected async notifyOnPrebuildQueued(ctx: TraceContext, workspaceId: string) {
const span = TraceContext.startSpan("notifyOnPrebuildQueued", ctx);
const prebuild = await this.workspaceDb.trace({ span }).findPrebuildByWorkspaceID(workspaceId);
if (prebuild) {
const info = (await this.workspaceDb.trace({ span }).findPrebuildInfos([prebuild.id]))[0];
if (info) {
await this.messageBus.notifyOnPrebuildUpdate({ info, status: "queued" });
try {
const prebuild = await this.workspaceDb.trace({ span }).findPrebuildByWorkspaceID(workspaceId);
if (prebuild) {
const info = (await this.workspaceDb.trace({ span }).findPrebuildInfos([prebuild.id]))[0];
if (info) {
await this.messageBus.notifyOnPrebuildUpdate({ info, status: "queued" });
}
}
} catch (e) {
TraceContext.setError({ span }, e);
throw e;
} finally {
span.finish();
}
}

Expand Down Expand Up @@ -507,6 +514,8 @@ export class WorkspaceStarter {
"cannot properly fail workspace instance during start",
err,
);
} finally {
span.finish();
}
}

Expand Down Expand Up @@ -1389,27 +1398,34 @@ export class WorkspaceStarter {
user: User,
): Promise<{ initializer: GitInitializer | CompositeInitializer; disposable: Disposable }> {
const span = TraceContext.startSpan("createInitializerForCommit", ctx);
const mainGit = this.createGitInitializer({ span }, workspace, context, user);
if (!context.additionalRepositoryCheckoutInfo || context.additionalRepositoryCheckoutInfo.length === 0) {
return mainGit;
}
const subRepoInitializers = [mainGit];
for (const subRepo of context.additionalRepositoryCheckoutInfo) {
subRepoInitializers.push(this.createGitInitializer({ span }, workspace, subRepo, user));
}
const inits = await Promise.all(subRepoInitializers);
const compositeInit = new CompositeInitializer();
const compositeDisposable = new DisposableCollection();
for (const r of inits) {
const wsinit = new WorkspaceInitializer();
wsinit.setGit(r.initializer);
compositeInit.addInitializer(wsinit);
compositeDisposable.push(r.disposable);
try {
const mainGit = this.createGitInitializer({ span }, workspace, context, user);
if (!context.additionalRepositoryCheckoutInfo || context.additionalRepositoryCheckoutInfo.length === 0) {
return mainGit;
}
const subRepoInitializers = [mainGit];
for (const subRepo of context.additionalRepositoryCheckoutInfo) {
subRepoInitializers.push(this.createGitInitializer({ span }, workspace, subRepo, user));
}
const inits = await Promise.all(subRepoInitializers);
const compositeInit = new CompositeInitializer();
const compositeDisposable = new DisposableCollection();
for (const r of inits) {
const wsinit = new WorkspaceInitializer();
wsinit.setGit(r.initializer);
compositeInit.addInitializer(wsinit);
compositeDisposable.push(r.disposable);
}
return {
initializer: compositeInit,
disposable: compositeDisposable,
};
} catch (e) {
TraceContext.setError({ span }, e);
throw e;
} finally {
span.finish();
}
return {
initializer: compositeInit,
disposable: compositeDisposable,
};
}

protected async createGitInitializer(
Expand Down

0 comments on commit 6043cc0

Please sign in to comment.