diff --git a/apps/zui/src/core/loader/types.ts b/apps/zui/src/core/loader/types.ts index 8ebc3a84f0..797f8d7a33 100644 --- a/apps/zui/src/core/loader/types.ts +++ b/apps/zui/src/core/loader/types.ts @@ -1,5 +1,4 @@ import {LoadFormat} from "@brimdata/zed-js" -import {LoadContext} from "../../domain/loads/load-context" export type LoadOptions = { windowId: string @@ -14,7 +13,7 @@ export type LoadOptions = { } export interface Loader { - when(context: LoadContext): PromiseLike | boolean - run(context: LoadContext): PromiseLike | void - rollback(context: LoadContext): PromiseLike | void + when(): PromiseLike | boolean + run(): PromiseLike | void + rollback?(): PromiseLike | void } diff --git a/apps/zui/src/domain/loads/default-loader.ts b/apps/zui/src/domain/loads/default-loader.ts index 00efd71f76..89c20ce1d1 100644 --- a/apps/zui/src/domain/loads/default-loader.ts +++ b/apps/zui/src/domain/loads/default-loader.ts @@ -6,12 +6,15 @@ import {createReadableStream} from "src/core/zq" import {throttle} from "lodash" import {errorToString} from "src/util/error-to-string" -export const defaultLoader: Loader = { +export class DefaultLoader implements Loader { + constructor(private ctx: LoadContext) {} + when() { return true - }, + } - async run(ctx: LoadContext) { + async run() { + const {ctx} = this const client = await ctx.createClient() const progress = createProgressTracker(ctx) const shaper = createShaper(ctx) @@ -26,7 +29,7 @@ export const defaultLoader: Loader = { let res try { - ctx.onProgress(0) + ctx.setProgress(0) res = await client.load(body, { pool: ctx.poolId, branch: ctx.branch, @@ -38,16 +41,16 @@ export const defaultLoader: Loader = { }) } catch (e) { const error = streamError ? new Error(streamError) : e - ctx.onWarning(errorToString(error)) - ctx.onProgress(null) + ctx.addError(errorToString(error)) + ctx.setProgress(null) throw error } - for (const warning of res?.warnings ?? []) ctx.onWarning(warning) + for (const warning of res?.warnings ?? []) ctx.addError(warning) await ctx.onPoolChanged() - ctx.onProgress(1) - }, + ctx.setProgress(1) + } - rollback() {}, + rollback() {} } function getFileSize(path: string) { @@ -64,7 +67,7 @@ function createShaper(ctx) { } function createProgressTracker(ctx) { - const onProgress = throttle((n) => ctx.onProgress(n), 500) + const onProgress = throttle((n) => ctx.setProgress(n), 500) let total = ctx.files.reduce((sum, file) => sum + getFileSize(file), 0) let bytes = 0 diff --git a/apps/zui/src/domain/loads/handlers/quick-load-files.ts b/apps/zui/src/domain/loads/handlers/quick-load-files.ts index 9897458a5e..bd796a463b 100644 --- a/apps/zui/src/domain/loads/handlers/quick-load-files.ts +++ b/apps/zui/src/domain/loads/handlers/quick-load-files.ts @@ -7,7 +7,7 @@ export const quickLoadFiles = createHandler( return } - ctx.invoke("loads.create", { + return ctx.invoke("loads.create", { windowId: globalThis.windowId, poolId: args.poolId || "new", files: args.files, diff --git a/apps/zui/src/domain/loads/load-context.ts b/apps/zui/src/domain/loads/load-context.ts index 77a24e7c09..a764262dfd 100644 --- a/apps/zui/src/domain/loads/load-context.ts +++ b/apps/zui/src/domain/loads/load-context.ts @@ -8,6 +8,7 @@ import {createLoadRef} from "./load-ref" import {select} from "src/core/main/select" export class LoadContext { + abortMsg = undefined as string private ctl = new AbortController() private id = nanoid() private window: SearchWindow @@ -41,13 +42,13 @@ export class LoadContext { ) } - onProgress(progress: number) { + setProgress(progress: number) { this.main.dispatch(Loads.update({id: this.id, changes: {progress}})) } - onWarning(warning: string) { + addError(error: string) { const load = Loads.find(this.main.store.getState(), this.id) - const errors = [...load.errors, warning] + const errors = [...load.errors, error] this.main.dispatch(Loads.update({id: this.id, changes: {errors}})) } @@ -55,10 +56,15 @@ export class LoadContext { await syncPoolOp(this.opts.lakeId, this.opts.poolId) } - abort() { + abort(msg?: string) { + this.abortMsg = msg this.ctl.abort() } + get abortError() { + if (this.abortMsg) return new Error(this.abortMsg) + } + get ref() { return select((s) => Loads.find(s, this.id)) } diff --git a/apps/zui/src/domain/loads/operations/create.ts b/apps/zui/src/domain/loads/operations/create.ts index 131e2d0d83..72353fc354 100644 --- a/apps/zui/src/domain/loads/operations/create.ts +++ b/apps/zui/src/domain/loads/operations/create.ts @@ -15,7 +15,7 @@ export const submit = createOperation( const pool = await createPool(data) const script = new ZedScript(data.shaper || "") // Async so that we can return this and subscribe to updates on the load. - zui.pools + const promise = zui.pools .load({ windowId: data.windowId, format: data.format, @@ -36,6 +36,8 @@ export const submit = createOperation( }) zui.window.openTab(poolPath(pool.id)) + + return promise } ) diff --git a/apps/zui/src/domain/loads/plugin-api.ts b/apps/zui/src/domain/loads/plugin-api.ts index 7f561e04f4..8f66bc6b0d 100644 --- a/apps/zui/src/domain/loads/plugin-api.ts +++ b/apps/zui/src/domain/loads/plugin-api.ts @@ -1,4 +1,4 @@ -import {defaultLoader} from "./default-loader" +import {DefaultLoader} from "./default-loader" import {LoadContext} from "./load-context" import {Loader} from "src/core/loader/types" import Loads from "src/js/state/Loads" @@ -12,38 +12,24 @@ type Events = { error: (load: LoadReference) => void } +type LoaderRef = {name: string; initialize: (ctx: LoadContext) => Loader} + export class LoadsApi extends TypedEmitter { - private list: LoaderApi[] = [] + private list: LoaderRef[] = [] - // Don't use this...or rename to addLoader - create(name: string, impl: Loader) { - this.list.push(new LoaderApi(name, impl)) + addLoader(name: string, initialize: (ctx: LoadContext) => Loader) { + this.list.push({name, initialize}) } - async getMatch(context: LoadContext) { - let loader = defaultLoader - for (const pluginLoader of this.list) { - if (await pluginLoader.when(context)) { - loader = pluginLoader - break - } + async initialize(context: LoadContext) { + for (const ref of this.list) { + const customLoader = ref.initialize(context) + if (await customLoader.when()) return customLoader } - return loader + return new DefaultLoader(context) } get all() { return select(Loads.all) } } - -class LoaderApi { - when: Loader["when"] - run: Loader["run"] - rollback: Loader["rollback"] - - constructor(public name: string, impl: Loader) { - this.when = impl.when - this.run = impl.run - this.rollback = impl.rollback - } -} diff --git a/apps/zui/src/domain/pools/plugin-api.ts b/apps/zui/src/domain/pools/plugin-api.ts index f0b8241292..96474f9c58 100644 --- a/apps/zui/src/domain/pools/plugin-api.ts +++ b/apps/zui/src/domain/pools/plugin-api.ts @@ -9,6 +9,7 @@ import {syncPoolOp} from "src/electron/ops/sync-pool-op" import {LoadOptions} from "src/core/loader/types" import {getMainObject} from "src/core/main" import {TypedEmitter} from "src/util/typed-emitter" +import {call} from "src/util/call" type Events = { create: (event: {pool: Pool}) => void @@ -39,16 +40,16 @@ export class PoolsApi extends TypedEmitter { async load(opts: LoadOptions) { const main = getMainObject() const context = new LoadContext(main, opts) - const loader = await loads.getMatch(context) + const loader = await loads.initialize(context) try { await context.setup() - await loader.run(context) + await loader.run() await waitForPoolStats(context) loads.emit("success", context.ref) } catch (e) { - await loader.rollback(context) + await call(loader.rollback) loads.emit("error", context.ref) - throw e + throw context.abortError || e } finally { context.teardown() } diff --git a/apps/zui/src/plugins/brimcap/analyze.ts b/apps/zui/src/plugins/brimcap/analyze.ts new file mode 100644 index 0000000000..be493df486 --- /dev/null +++ b/apps/zui/src/plugins/brimcap/analyze.ts @@ -0,0 +1,27 @@ +import {compact} from "lodash" +import {configurations} from "src/zui" +import {pluginNamespace, yamlConfigPropName} from "./config" +import {AnalyzeOptions, createCli} from "./cli" + +function getAnalyzeOptions(): AnalyzeOptions { + return { + json: true, + config: configurations.get(pluginNamespace, yamlConfigPropName) || "", + } +} + +export function createAnalyzeProcess(signal) { + const cli = createCli() + const sub = cli.analyze("-", getAnalyzeOptions(), signal) + return sub +} + +export function monitorAnalyzeProgress(analyzeProc, callback) { + analyzeProc.stderr + .once("data", () => analyzeProc.stdout.emit("start")) + .on("data", (data) => { + const lines = compact(data.toString().split("\n")) as string[] + const stats = lines.map((line) => JSON.parse(line)) + stats.forEach(callback) + }) +} diff --git a/apps/zui/src/plugins/brimcap/cli.ts b/apps/zui/src/plugins/brimcap/cli.ts index 02fcfaf2de..9a7209a681 100644 --- a/apps/zui/src/plugins/brimcap/cli.ts +++ b/apps/zui/src/plugins/brimcap/cli.ts @@ -109,21 +109,11 @@ export default class BrimcapCLI { return this.execSpawnSync("search", [...toCliOpts(opts)]) } - private execSpawn( - subCommand: string, - optsAndArgs: string[], - signal?: AbortSignal - ) { - // don't detach if is windows - const p = spawn(this.binPath, [subCommand, ...optsAndArgs], this.spawnOpts) - signal?.addEventListener("abort", () => { - if (this.isWin) { - spawnSync("taskkill", ["/pid", p.pid.toString(), "/f", "/t"]) - } else { - process.kill(-p.pid, "SIGINT") - } + private execSpawn(subCommand: string, optsAndArgs: string[], signal) { + return spawn(this.binPath, [subCommand, ...optsAndArgs], { + ...this.spawnOpts, + signal, }) - return p } private execSpawnSync(subCommand: string, opts: string[]) { diff --git a/apps/zui/src/plugins/brimcap/configure-zeek-pool.ts b/apps/zui/src/plugins/brimcap/configure-zeek-pool.ts new file mode 100644 index 0000000000..9469e420fe --- /dev/null +++ b/apps/zui/src/plugins/brimcap/configure-zeek-pool.ts @@ -0,0 +1,10 @@ +import {zeekColorMap} from "./zeek/colors" +import {pools} from "src/zui" + +export function configureZeekPool(poolId: string) { + pools + .configure(poolId) + .set("timeField", "ts") + .set("colorField", "_path") + .set("colorMap", zeekColorMap) +} diff --git a/apps/zui/src/plugins/brimcap/loader.ts b/apps/zui/src/plugins/brimcap/loader.ts index 9db0392ea2..1b0f07832d 100644 --- a/apps/zui/src/plugins/brimcap/loader.ts +++ b/apps/zui/src/plugins/brimcap/loader.ts @@ -1,133 +1,97 @@ -import {AnalyzeOptions, createCli} from "./cli" import fs from "fs" -import {compact} from "lodash" -import errors from "src/js/errors" -import {pluginNamespace, yamlConfigPropName} from "./config" -import {ChildProcess} from "child_process" import {Loader} from "src/core/loader/types" import {LoadContext} from "src/domain/loads/load-context" import {isPcap} from "./packets/is-pcap" -import {configurations, loads, pools} from "src/zui" -import {zeekColorMap} from "./zeek/colors" +import {loads} from "src/zui" +import {pipeline} from "stream" +import {createTransform} from "./transform-stream" +import {configureZeekPool} from "./configure-zeek-pool" +import {createAnalyzeProcess, monitorAnalyzeProgress} from "./analyze" +import {createCli} from "./cli" +import errors from "src/js/errors" +import {errorToString} from "src/util/error-to-string" -function createLoader(root: string): Loader { - const processes: Record = {} +class BrimcapLoader implements Loader { + constructor(private ctx: LoadContext, private root: string) {} - async function when(ctx: LoadContext): Promise { - const file = ctx.files[0] + async when() { + const file = this.ctx.files[0] return file && (await isPcap(file)) } - async function run(ctx: LoadContext) { - if (ctx.files.length > 1) { + + async run() { + if (this.ctx.files.length > 1) { throw new Error("Only one PCAP can be loaded at a time") } + this.ctx.setProgress(0) + await this.load(this.startPipeline()) + this.index() + configureZeekPool(this.ctx.poolId) + this.ctx.setProgress(1) + } - const main = ctx.main - const cliOpts: AnalyzeOptions = {json: true} - const yamlConfig = configurations.get(pluginNamespace, yamlConfigPropName) - cliOpts.config = yamlConfig || "" - const pcap = ctx.files[0] - const pcapTotalSize = fs.statSync(pcap).size - const pcapStream = fs.createReadStream(pcap) - - ctx.onProgress(0) - const cli = createCli() - const analyzeP = cli.analyze("-", cliOpts, ctx.signal) - processes[analyzeP.pid] = analyzeP - pcapStream.pipe(analyzeP.stdin) - - analyzeP.on("close", () => { - delete processes[analyzeP.pid] - }) - let analyzeErr - analyzeP.on("error", (err) => { - analyzeErr = err + startPipeline() { + return pipeline(fs.createReadStream(this.pcap), this.analyze(), (err) => { + if (err) { + this.ctx.abort() + this.ctx.addError(errorToString(err)) + } }) + } - const handleRespMsg = async (jsonMsg) => { - const {type, ...status} = jsonMsg + analyze() { + const process = createAnalyzeProcess(this.ctx.signal) + const stream = createTransform(process) + const totalSize = fs.statSync(this.pcap).size + monitorAnalyzeProgress(process, ({type, ...status}) => { switch (type) { case "status": - ctx.onProgress(statusToPercent(status, pcapTotalSize)) - await ctx.onPoolChanged() + this.ctx.setProgress(status.pcap_read_size / totalSize || 0) + this.ctx.onPoolChanged() break case "warning": - if (status.warning) ctx.onWarning(status.warning) + if (status.warning) this.ctx.addError(status.warning) break case "error": - if (status.error) analyzeErr = status.error + if (status.error) stream.destroy(new Error(status.error)) break } - } - - // on first data, emit a 'start' on stdout so zealot knows not to timeout the request - analyzeP.stderr.once("data", () => analyzeP.stdout.emit("start")) - analyzeP.stderr.on("data", (d) => { - try { - const msgs: string[] = compact(d.toString().split("\n")) - const jsonMsgs = msgs.map((msg) => JSON.parse(msg)) - jsonMsgs.forEach(handleRespMsg) - } catch (e) { - console.error(e) - analyzeErr = d.toString() - } - }) - analyzeP.on("close", () => { - delete processes[analyzeP.pid] }) + return stream + } - // stream analyze output to pool - const client = await main.createClient(ctx.lakeId) + index() { + const cli = createCli() try { - await client.load(analyzeP.stdout, { - pool: ctx.poolId, - branch: ctx.branch, - message: { - author: "zui", - body: "automatic import with brimcap analyze", - }, - signal: ctx.signal, - }) + cli.index({root: this.root, pcap: this.pcap}) } catch (e) { - if (analyzeErr) - // if load failed because analyze did, report the analyzeErr - throw errors.pcapIngest(analyzeErr) - // otherwise report the loadErr + console.error(e) throw errors.pcapIngest(e) } - - // generate pcap index - // in tests we may not have the pcapPath, so skip indexing for now - if (pcap) { - try { - cli.index({root, pcap}) - } catch (e) { - console.error(e) - throw errors.pcapIngest(e) - } - } - - await ctx.onPoolChanged() - ctx.onProgress(1) - - // update this pool's settings with zeek specific properties - pools - .configure(ctx.poolId) - .set("timeField", "ts") - .set("colorField", "_path") - .set("colorMap", zeekColorMap) } - function rollback() {} - - return {when, run, rollback} -} + async load(streamBody: NodeJS.ReadableStream) { + const client = await this.ctx.createClient() + const {ctx} = this + const author = "zui" + const body = "Automatic Import with brimcap analyze" + const pool = ctx.poolId + const {branch, signal} = ctx + return client.load(streamBody, { + message: {author, body}, + pool, + branch, + signal, + }) + } -// helpers -function statusToPercent(status, totalSize): number { - return status.pcap_read_size / totalSize || 0 + get pcap() { + return this.ctx.files[0] + } } export function activateBrimcapLoader(root: string) { - loads.create("brimcap-loader", createLoader(root)) + loads.addLoader("brimcap-loader", (ctx: LoadContext) => { + return new BrimcapLoader(ctx, root) + }) } diff --git a/apps/zui/src/plugins/brimcap/transform-stream.ts b/apps/zui/src/plugins/brimcap/transform-stream.ts new file mode 100644 index 0000000000..bab619a86d --- /dev/null +++ b/apps/zui/src/plugins/brimcap/transform-stream.ts @@ -0,0 +1,45 @@ +import {ChildProcessWithoutNullStreams} from "node:child_process" +import {Stream} from "node:stream" +import {isAbortError} from "src/util/is-abort-error" + +export function createTransform(sub: ChildProcessWithoutNullStreams) { + const stream = new Stream.Transform({ + transform(chunk, encoding, callback) { + if (!sub.stdin.write(chunk, encoding)) { + sub.stdin.once("drain", callback) + } else { + process.nextTick(callback) + } + }, + + flush(callback) { + sub.stdin.end() + if (sub.stdout.destroyed) callback() + else sub.stdout.on("close", () => callback()) + }, + }) + // Transform stream handlers + stream.on("error", () => { + sub.kill("SIGKILL") + }) + + // Sub Hanlers + sub.on("error", (e) => { + if (isAbortError(e)) return + stream.destroy(e) + }) + + // STDIN HANLDERS + sub.stdin.on("error", (e: Error & {code: string}) => { + e.code === "EPIPE" ? stream.push(null) : stream.destroy(e) + }) + + // STDOUT HANLDERS + sub.stdout + .on("data", (data) => stream.readable && stream.push(data)) + .on("error", (e) => stream.destroy(e)) + + // STDERR HANLDERS + sub.stderr.on("error", (e) => stream.destroy(e)) + return stream +} diff --git a/apps/zui/src/util/call.ts b/apps/zui/src/util/call.ts index db34e604cd..b647918d20 100644 --- a/apps/zui/src/util/call.ts +++ b/apps/zui/src/util/call.ts @@ -6,5 +6,5 @@ export function call any>( fn: Fn, ...args: Parameters ) { - if (fn) fn(...args) + if (fn) return fn(...args) } diff --git a/packages/zui-player/tests/bad-pcap.spec.ts b/packages/zui-player/tests/bad-pcap.spec.ts new file mode 100644 index 0000000000..4f12075b65 --- /dev/null +++ b/packages/zui-player/tests/bad-pcap.spec.ts @@ -0,0 +1,12 @@ +import { play } from 'zui-player'; +import { getPath } from 'zui-test-data'; + +play('bad_pcap_spec', (app, test) => { + test.setTimeout(5 * 60_000); + + test('Displays zeek error message', async () => { + app.page.setDefaultTimeout(5 * 60_000); + await app.dropFile(getPath('vanspy.pcapng')); + await app.attached(/with 1 error/); + }); +}); diff --git a/packages/zui-test-data/data/vanspy.pcapng b/packages/zui-test-data/data/vanspy.pcapng new file mode 100644 index 0000000000..e6284d55aa Binary files /dev/null and b/packages/zui-test-data/data/vanspy.pcapng differ