Skip to content

Commit

Permalink
implement HTML log viewer using event source to do streaming (electro…
Browse files Browse the repository at this point in the history
  • Loading branch information
jchip authored and ImgBotApp committed Aug 20, 2020
1 parent 4ba0888 commit ea4c06a
Show file tree
Hide file tree
Showing 7 changed files with 618 additions and 291 deletions.
216 changes: 184 additions & 32 deletions packages/xarc-app-dev/src/lib/dev-admin/admin-http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,83 +4,217 @@ import * as http from "http";
import * as Path from "path";
import * as Fs from "fs";
import * as Url from "url";
import * as AnsiConvert from "ansi-to-html";
import { AdminServer } from "./admin-server";
import { getLogEventAsHtml } from "./log-parser";
import * as QS from "querystring";
import * as _ from "lodash";

export type AdminHttpOptions = {
port?: number;
admin: any;
getLogs?: Function;
};

/**
* Track each log entry with timestamp from Date.now() as ID.
*
* If two entry has the same timestamp, then it's expected they are consecutively
* generated, and add a second part tx that's an incrementing number starting at 1.
*
* When client request logs, it's expected to send a query param `entryId` that's
* the stringified version in the format of "<tx>,<ts>"
*/
type LogEntryId = {
ts: number;
tx?: number;
};

type EventClient = {
lastEntryId: LogEntryId;
res: http.ServerResponse;
};

function parseEntryId(str: string): LogEntryId {
if (str.indexOf(",") > 0) {
const parts = str.split(",");
return {
ts: parseInt(parts[0]),
tx: parseInt(parts[1])
};
}

return { ts: parseInt(str) };
}

function compareEntryId(a: LogEntryId, b: LogEntryId) {
if (a.ts === b.ts) {
return (a.tx || 0) - (b.tx || 0);
}
return a.ts - b.ts;
}

function stringifyEntryId(entryId) {
return entryId.tx ? `${entryId.ts},${entryId.tx}` : `${entryId.ts}`;
}

export class AdminHttp {
_server: http.Server;
_admin: AdminServer;
_getLogs: Function;
_logHtml: string;
_adminHtml: string;
_port: number;
_instanceId: number;
_eventClientId: number;
_eventClients: Record<number, EventClient>;
_sendStreamTimer: NodeJS.Timeout;

constructor(options: AdminHttpOptions) {
this._server = http.createServer(this.requestListener.bind(this));
this._admin = options.admin;
this._getLogs = options.getLogs;
this._port = options.port || 9001;
this._server.listen(this._port);
this._instanceId = Date.now();
this._eventClientId = 0;
this._eventClients = {};
}

_readAsset(filename: string, memoize: string): string {
_readAsset(filename: string, memoize: string, processor?: Function): string {
if (!this[memoize]) {
this[memoize] = Fs.readFileSync(filename).toString();
let content = Fs.readFileSync(filename).toString();
if (processor) {
content = processor(content);
}
this[memoize] = content;
}

return this[memoize];
}

_serveHtml(res: http.ServerResponse, content: string) {
res.setHeader("content-type", "text/html; charset=UTF-8");
res.writeHead(200);
res.writeHead(200, {
"content-type": "text/html; charset=UTF-8"
});
res.end(content);
}

_prepareLogs(lastEntryId: LogEntryId, name = "app") {
const logs = this._getLogs(name);

const htmlLogs = logs
.filter(event => {
return compareEntryId(event as LogEntryId, lastEntryId) > 0;
})
.map(event => {
const message = getLogEventAsHtml(event);
const record: any = {
level: event.level,
ts: event.ts,
message
};
if (event.jsonData) {
record.json = true;
}
if (event.tx) {
record.tx = event.tx;
}
return record;
});

return htmlLogs;
}

_serveLogs(url: Url.UrlWithStringQuery, res: http.ServerResponse) {
const logs = this._admin.getLogs("app");
const query: any = QS.parse(url.query);
const id = parseInt(query.id, 10);

let start = 0;
let entryIdStr = "0";

if (id === this._instanceId) {
start = parseInt(query.start, 10) || 0;
}

const htmlLogs = [];
for (let ix = start; ix < logs.length; ix++) {
const event: any = logs[ix];
const message = getLogEventAsHtml(event);
const record: any = {
level: event.level,
message
};
if (event.jsonData) {
record.json = true;
}
htmlLogs.push(record);
entryIdStr = query.entryId || "0";
}

const data = {
start,
logs: htmlLogs,
total: logs.length,
logs: this._prepareLogs(parseEntryId(entryIdStr)),
instanceId: this._instanceId
};

res.setHeader("content-type", "application/json");
res.writeHead(200);
res.writeHead(200, {
"content-type": "application/json"
});
res.end(JSON.stringify(data));
}

_streamLogsHandler(url: Url.UrlWithStringQuery, res: http.ServerResponse) {
const query: any = QS.parse(url.query);
res.writeHead(200, {
"content-type": "text/event-stream",
"cache-control": "no-cache",
"access-control-allow-origin": "*"
});

res.socket.setKeepAlive(true);

let entryId = "0";
if (query.id === this._instanceId) {
entryId = query.entryId || "0";
}

const eventClientId = this._eventClientId++;
this._eventClients[eventClientId] = { res, lastEntryId: parseEntryId(entryId) };

res.connection.on("close", () => {
delete this._eventClients[eventClientId];
});

this.sendLogsToStreamClients();
}

sendLogsToStreamClients() {
if (this._sendStreamTimer) {
clearTimeout(this._sendStreamTimer);
}

this._sendStreamTimer = setTimeout(() => this._sendLogsToStreamClients(), 100);
}

_sendLogsToStreamClients() {
this._sendStreamTimer = null;
const logs = this._getLogs("app").filter(x => x);
const lastLog = _.last(logs) || { ts: 0, tx: 0 };
const clients = Object.entries(this._eventClients)
.map(e => e[1])
.filter(client => {
return client.lastEntryId && lastLog
? compareEntryId(client.lastEntryId, lastLog as LogEntryId) < 0
: true;
})
.sort((a, b) => {
return compareEntryId(a.lastEntryId, b.lastEntryId);
});

if (clients.length === 0) {
return;
}

const minTsTx = clients[0].lastEntryId;
const htmlLogs = this._prepareLogs(minTsTx);

clients.forEach(client => {
const startIx = htmlLogs.findIndex(l => {
//l.ts > client.start;
return compareEntryId(l as LogEntryId, client.lastEntryId) > 0;
});
const data = {
logs: htmlLogs.slice(startIx),
instanceId: this._instanceId
};

client.res.write(`event: log-stream\n`);
client.res.write(`data: ${JSON.stringify(data)}`);
client.res.write(`\n\n`);

client.lastEntryId = { ts: lastLog.ts, tx: lastLog.tx };
});
}

requestListener(req: http.IncomingMessage, res: http.ServerResponse) {
const url = Url.parse(req.url);
switch (url.pathname) {
Expand All @@ -95,7 +229,22 @@ export class AdminHttp {
return this._serveLogs(url, res);
case "/log":
case "/__electrode_dev/log":
return this._serveHtml(res, this._readAsset(Path.join(__dirname, "log.html"), "_logHtml"));
return this._serveHtml(
res,
this._readAsset(
Path.join(__dirname, "log.html"),
"_logHtml",
(content: string): string => {
return content.replace(
"/*{{LOG_VIEW_JS}}*/",
Fs.readFileSync(Path.join(__dirname, "log-view.js")).toString()
);
}
)
);
case "/stream-logs":
case "/__electrode_dev/stream-logs":
return this._streamLogsHandler(url, res);
default:
res.writeHead(404);
return res.end("dev admin http 404 " + req.url);
Expand All @@ -108,6 +257,9 @@ export class AdminHttp {
shutdown() {
const server = this._server;
this._server = null;
Object.entries(this._eventClients).forEach(([, client]) => {
client.res.end("shutdown", () => client.res.destroy());
});
if (server) {
server.close();
}
Expand Down
25 changes: 20 additions & 5 deletions packages/xarc-app-dev/src/lib/dev-admin/admin-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ export class AdminServer {

this._shutdown = false;
this._fullAppLogUrl = formUrl({ ...fullDevServer, path: controlPaths.appLog });
this._adminHttp = new AdminHttp({ admin: this, port: this._opts.port });
this._adminHttp = new AdminHttp({
getLogs: (app: string) => this.getLogs(app),
port: this._opts.port
});
}

async start() {
Expand Down Expand Up @@ -503,9 +506,15 @@ ${instruction}`
deferLogsOutput(context, showFullLink = true, delay = 250) {
const { tag, store } = context;

const now = Date.now();
if (context._deferTimer) {
clearTimeout(context._deferTimer);
// avoid starving defer output
if (now - context._initialDefer < 150) {
clearTimeout(context._deferTimer);
context._deferTimer = null;
}
} else {
context._initialDefer = now;
context._deferIx = [];
}

Expand All @@ -517,9 +526,13 @@ ${instruction}`
context._showFullLink = showFullLink;
}

if (context._deferTimer) {
return;
}

context._deferTimestamp = Date.now();
context._deferTimer = setTimeout(() => {
context._deferTimer = undefined;
context._deferTimer = null;
let currentIx = 0;
const logsToShow = context._deferIx.reduce((logs, deferIx) => {
if (currentIx > deferIx) {
Expand Down Expand Up @@ -572,9 +585,9 @@ ${instruction}`
const str = data.toString();
context.checkLine && context.checkLine(str);
if (!str.trim()) {
store.push({ level: "info", message: "" });
store.push({ ts: Date.now(), level: "info", message: "" });
} else {
const entry = parseLog(str.trimRight());
const entry = parseLog(str.trimRight(), _.last(store));
// consider lines with at least two leading white spaces to be potential
// continuation of previous error/warning messages.
if (continuation && str.startsWith(" ")) {
Expand All @@ -588,6 +601,8 @@ ${instruction}`
this.deferLogsOutput(context, entry.show > 1);
}
}

this._adminHttp.sendLogsToStreamClients();
};

inputs.forEach(input => {
Expand Down
11 changes: 9 additions & 2 deletions packages/xarc-app-dev/src/lib/dev-admin/log-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const tagLevelMap = {
"debugger listening on": "silly"
};

export function parse(str) {
export function parse(str: string, last: any) {
let jsonData;
let show;

Expand Down Expand Up @@ -59,12 +59,19 @@ export function parse(str) {
}
}

return {
const entry: any = {
level: level || "info",
ts: Date.now(),
message: message || str,
json: jsonData,
show
};

if (last && entry.ts === last.ts) {
entry.tx = (last.tx || 0) + 1;
}

return entry;
}

const Levels = {
Expand Down
Loading

0 comments on commit ea4c06a

Please sign in to comment.