Skip to content

Commit

Permalink
Improve brimcap error handling (#2955)
Browse files Browse the repository at this point in the history
  • Loading branch information
nwt authored Jan 23, 2024
1 parent eec89c3 commit a5ee8af
Show file tree
Hide file tree
Showing 15 changed files with 209 additions and 164 deletions.
7 changes: 3 additions & 4 deletions apps/zui/src/core/loader/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {LoadFormat} from "@brimdata/zed-js"
import {LoadContext} from "../../domain/loads/load-context"

export type LoadOptions = {
windowId: string
Expand All @@ -14,7 +13,7 @@ export type LoadOptions = {
}

export interface Loader {
when(context: LoadContext): PromiseLike<boolean> | boolean
run(context: LoadContext): PromiseLike<void> | void
rollback(context: LoadContext): PromiseLike<void> | void
when(): PromiseLike<boolean> | boolean
run(): PromiseLike<void> | void
rollback?(): PromiseLike<void> | void
}
25 changes: 14 additions & 11 deletions apps/zui/src/domain/loads/default-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import {createReadableStream} from "src/core/zq"
import {throttle} from "lodash"
import {errorToString} from "src/util/error-to-string"

export const defaultLoader: Loader = {
export class DefaultLoader implements Loader {
constructor(private ctx: LoadContext) {}

when() {
return true
},
}

async run(ctx: LoadContext) {
async run() {
const {ctx} = this
const client = await ctx.createClient()
const progress = createProgressTracker(ctx)
const shaper = createShaper(ctx)
Expand All @@ -26,7 +29,7 @@ export const defaultLoader: Loader = {

let res
try {
ctx.onProgress(0)
ctx.setProgress(0)
res = await client.load(body, {
pool: ctx.poolId,
branch: ctx.branch,
Expand All @@ -38,16 +41,16 @@ export const defaultLoader: Loader = {
})
} catch (e) {
const error = streamError ? new Error(streamError) : e
ctx.onWarning(errorToString(error))
ctx.onProgress(null)
ctx.addError(errorToString(error))
ctx.setProgress(null)
throw error
}
for (const warning of res?.warnings ?? []) ctx.onWarning(warning)
for (const warning of res?.warnings ?? []) ctx.addError(warning)
await ctx.onPoolChanged()
ctx.onProgress(1)
},
ctx.setProgress(1)
}

rollback() {},
rollback() {}
}

function getFileSize(path: string) {
Expand All @@ -64,7 +67,7 @@ function createShaper(ctx) {
}

function createProgressTracker(ctx) {
const onProgress = throttle((n) => ctx.onProgress(n), 500)
const onProgress = throttle((n) => ctx.setProgress(n), 500)

let total = ctx.files.reduce((sum, file) => sum + getFileSize(file), 0)
let bytes = 0
Expand Down
2 changes: 1 addition & 1 deletion apps/zui/src/domain/loads/handlers/quick-load-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export const quickLoadFiles = createHandler(
return
}

ctx.invoke("loads.create", {
return ctx.invoke("loads.create", {
windowId: globalThis.windowId,
poolId: args.poolId || "new",
files: args.files,
Expand Down
14 changes: 10 additions & 4 deletions apps/zui/src/domain/loads/load-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {createLoadRef} from "./load-ref"
import {select} from "src/core/main/select"

export class LoadContext {
abortMsg = undefined as string
private ctl = new AbortController()
private id = nanoid()
private window: SearchWindow
Expand Down Expand Up @@ -41,24 +42,29 @@ export class LoadContext {
)
}

onProgress(progress: number) {
setProgress(progress: number) {
this.main.dispatch(Loads.update({id: this.id, changes: {progress}}))
}

onWarning(warning: string) {
addError(error: string) {
const load = Loads.find(this.main.store.getState(), this.id)
const errors = [...load.errors, warning]
const errors = [...load.errors, error]
this.main.dispatch(Loads.update({id: this.id, changes: {errors}}))
}

async onPoolChanged() {
await syncPoolOp(this.opts.lakeId, this.opts.poolId)
}

abort() {
abort(msg?: string) {
this.abortMsg = msg
this.ctl.abort()
}

get abortError() {
if (this.abortMsg) return new Error(this.abortMsg)
}

get ref() {
return select((s) => Loads.find(s, this.id))
}
Expand Down
4 changes: 3 additions & 1 deletion apps/zui/src/domain/loads/operations/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export const submit = createOperation(
const pool = await createPool(data)
const script = new ZedScript(data.shaper || "")
// Async so that we can return this and subscribe to updates on the load.
zui.pools
const promise = zui.pools
.load({
windowId: data.windowId,
format: data.format,
Expand All @@ -36,6 +36,8 @@ export const submit = createOperation(
})

zui.window.openTab(poolPath(pool.id))

return promise
}
)

Expand Down
36 changes: 11 additions & 25 deletions apps/zui/src/domain/loads/plugin-api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {defaultLoader} from "./default-loader"
import {DefaultLoader} from "./default-loader"
import {LoadContext} from "./load-context"
import {Loader} from "src/core/loader/types"
import Loads from "src/js/state/Loads"
Expand All @@ -12,38 +12,24 @@ type Events = {
error: (load: LoadReference) => void
}

type LoaderRef = {name: string; initialize: (ctx: LoadContext) => Loader}

export class LoadsApi extends TypedEmitter<Events> {
private list: LoaderApi[] = []
private list: LoaderRef[] = []

// Don't use this...or rename to addLoader
create(name: string, impl: Loader) {
this.list.push(new LoaderApi(name, impl))
addLoader(name: string, initialize: (ctx: LoadContext) => Loader) {
this.list.push({name, initialize})
}

async getMatch(context: LoadContext) {
let loader = defaultLoader
for (const pluginLoader of this.list) {
if (await pluginLoader.when(context)) {
loader = pluginLoader
break
}
async initialize(context: LoadContext) {
for (const ref of this.list) {
const customLoader = ref.initialize(context)
if (await customLoader.when()) return customLoader
}
return loader
return new DefaultLoader(context)
}

get all() {
return select(Loads.all)
}
}

class LoaderApi {
when: Loader["when"]
run: Loader["run"]
rollback: Loader["rollback"]

constructor(public name: string, impl: Loader) {
this.when = impl.when
this.run = impl.run
this.rollback = impl.rollback
}
}
9 changes: 5 additions & 4 deletions apps/zui/src/domain/pools/plugin-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {syncPoolOp} from "src/electron/ops/sync-pool-op"
import {LoadOptions} from "src/core/loader/types"
import {getMainObject} from "src/core/main"
import {TypedEmitter} from "src/util/typed-emitter"
import {call} from "src/util/call"

type Events = {
create: (event: {pool: Pool}) => void
Expand Down Expand Up @@ -39,16 +40,16 @@ export class PoolsApi extends TypedEmitter<Events> {
async load(opts: LoadOptions) {
const main = getMainObject()
const context = new LoadContext(main, opts)
const loader = await loads.getMatch(context)
const loader = await loads.initialize(context)
try {
await context.setup()
await loader.run(context)
await loader.run()
await waitForPoolStats(context)
loads.emit("success", context.ref)
} catch (e) {
await loader.rollback(context)
await call(loader.rollback)
loads.emit("error", context.ref)
throw e
throw context.abortError || e
} finally {
context.teardown()
}
Expand Down
27 changes: 27 additions & 0 deletions apps/zui/src/plugins/brimcap/analyze.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import {compact} from "lodash"
import {configurations} from "src/zui"
import {pluginNamespace, yamlConfigPropName} from "./config"
import {AnalyzeOptions, createCli} from "./cli"

function getAnalyzeOptions(): AnalyzeOptions {
return {
json: true,
config: configurations.get(pluginNamespace, yamlConfigPropName) || "",
}
}

export function createAnalyzeProcess(signal) {
const cli = createCli()
const sub = cli.analyze("-", getAnalyzeOptions(), signal)
return sub
}

export function monitorAnalyzeProgress(analyzeProc, callback) {
analyzeProc.stderr
.once("data", () => analyzeProc.stdout.emit("start"))
.on("data", (data) => {
const lines = compact(data.toString().split("\n")) as string[]
const stats = lines.map((line) => JSON.parse(line))
stats.forEach(callback)
})
}
18 changes: 4 additions & 14 deletions apps/zui/src/plugins/brimcap/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,11 @@ export default class BrimcapCLI {
return this.execSpawnSync("search", [...toCliOpts(opts)])
}

private execSpawn(
subCommand: string,
optsAndArgs: string[],
signal?: AbortSignal
) {
// don't detach if is windows
const p = spawn(this.binPath, [subCommand, ...optsAndArgs], this.spawnOpts)
signal?.addEventListener("abort", () => {
if (this.isWin) {
spawnSync("taskkill", ["/pid", p.pid.toString(), "/f", "/t"])
} else {
process.kill(-p.pid, "SIGINT")
}
private execSpawn(subCommand: string, optsAndArgs: string[], signal) {
return spawn(this.binPath, [subCommand, ...optsAndArgs], {
...this.spawnOpts,
signal,
})
return p
}

private execSpawnSync(subCommand: string, opts: string[]) {
Expand Down
10 changes: 10 additions & 0 deletions apps/zui/src/plugins/brimcap/configure-zeek-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {zeekColorMap} from "./zeek/colors"
import {pools} from "src/zui"

export function configureZeekPool(poolId: string) {
pools
.configure(poolId)
.set("timeField", "ts")
.set("colorField", "_path")
.set("colorMap", zeekColorMap)
}
Loading

0 comments on commit a5ee8af

Please sign in to comment.