From 58014362c7bd9d4cd678f868db80b7f403f7b70c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20SZKIBA?= Date: Mon, 12 Feb 2024 17:46:16 +0100 Subject: [PATCH] feat: SSE stream optimization On the SSE stream, the snapshot and cumulative events are displayed as JSON objects, where the metric names are the property names. The property values are also JSON objects, in which the property names are the names of the aggregation function (avg, min, max, etc.). This two-level object hierarchy has not caused any problems so far, the amount of data has been acceptable. Currently, aggregate counting per tag is still supported, but by default it is only allowed for the group tag. It would be necessary to turn on the aggregate calculation for the url and name tags as well (per URL graphs, summary, etc.). With the current SSE stream format, this would significantly increase the amount of data. This would have a direct impact on the k6 memory requirement, so it is a problem. Based on measurements, it can be concluded that using a two-level array structure instead of the two-level object structure used in snapshot and cumulative events significantly reduces the size of the events and thus also the memory requirement. For example, for the scripts/demo/demo.js test script, the size of the SSE stream is reduced by 65%. With url and name tags enabled, the new format results in a 10% size increase compared to the original size (without these tags). That is, the new format enables aggregate calculation based on url and name tags, which is a prerequisite for several requested features. Therefore, instead of the two-level object structure, the use of a two-level array structure must be implemented in the snapshot and cumulative events. The packages/model package can hide SSE stream changes from the rest of the dashboard UI. --- dashboard/aggregate.go | 7 +- .../assets/packages/model/dist/index.d.ts | 75 +++++++-- dashboard/assets/packages/model/dist/index.js | 152 ++++++++++-------- dashboard/assets/packages/model/src/Digest.ts | 56 +++---- dashboard/assets/packages/model/src/Event.ts | 60 +++++++ .../assets/packages/model/src/Metrics.ts | 62 ++++--- dashboard/assets/packages/model/src/index.ts | 3 +- .../assets/packages/report/dist/index.html | 10 +- .../packages/report/src/utils/digest.ts | 4 +- .../{index-5828d283.js => index-5a43f090.js} | 10 +- dashboard/assets/packages/ui/dist/index.html | 2 +- dashboard/extension.go | 15 +- dashboard/extension_test.go | 13 +- dashboard/meter.go | 111 ++++++++++--- dashboard/meter_test.go | 72 ++++----- dashboard/registry.go | 34 +++- dashboard/replay.go | 4 +- dashboard/testdata/result.ndjson | 47 +++--- dashboard/testdata/result.ndjson.gz | Bin 3888 -> 3455 bytes scripts/demo/demo-grpc.js | 4 +- scripts/demo/demo-http.js | 2 +- scripts/demo/demo-ws.js | 2 +- scripts/demo/demo.ndjson.gz | Bin 113924 -> 0 bytes 23 files changed, 478 insertions(+), 267 deletions(-) create mode 100644 dashboard/assets/packages/model/src/Event.ts rename dashboard/assets/packages/ui/dist/assets/{index-5828d283.js => index-5a43f090.js} (74%) delete mode 100644 scripts/demo/demo.ndjson.gz diff --git a/dashboard/aggregate.go b/dashboard/aggregate.go index fcefe09..f4b5782 100644 --- a/dashboard/aggregate.go +++ b/dashboard/aggregate.go @@ -42,7 +42,7 @@ type aggregator struct { once sync.Once - seenMetrics map[string]struct{} + seenMetrics []string } func closer(what io.Closer, logger logrus.FieldLogger) { @@ -57,7 +57,7 @@ func aggregate(input, output string, opts *options, proc *process) error { agg.registry = newRegistry() agg.options = opts agg.logger = proc.logger - agg.seenMetrics = make(map[string]struct{}) + agg.seenMetrics = make([]string, 0) var inputFile, outputFile afero.File var err error @@ -176,8 +176,9 @@ func (agg *aggregator) updateAndSend( return } - newbies := met.newbies(agg.seenMetrics) + newbies, updated := met.newbies(agg.seenMetrics) if len(newbies) != 0 { + agg.seenMetrics = updated agg.fireEvent(metricEvent, newbies) } diff --git a/dashboard/assets/packages/model/dist/index.d.ts b/dashboard/assets/packages/model/dist/index.d.ts index 2512bab..05c4a5c 100644 --- a/dashboard/assets/packages/model/dist/index.d.ts +++ b/dashboard/assets/packages/model/dist/index.d.ts @@ -38,25 +38,72 @@ type Metric = { name: string; contains?: ValueType; type?: MetricType; + custom?: boolean; }; declare class Query { name: string; aggregate?: AggregateType; - tags?: Record; - group?: string; - scenario?: string; constructor(query: string); } declare class Metrics { values: Record; - constructor({ values }?: { + names: Array; + _aggregates: Record>; + constructor({ values, names }?: { values?: {} | undefined; + names?: never[] | undefined; }); + set aggregates(value: Record>); onEvent(data: Record): void; + toAggregate(data: Array>): Record; find(query: string): Metric | undefined; unit(name: string, aggregate?: AggregateType): UnitType; } +declare enum EventType { + config = "config", + param = "param", + start = "start", + stop = "stop", + metric = "metric", + snapshot = "snapshot", + cumulative = "cumulative", + threshold = "threshold" +} +type ConfigEvent = { + type: EventType.config; + data: Record; +}; +type ParamEvent = { + type: EventType.param; + data: Record; +}; +type StartEvent = { + type: EventType.start; + data: Array>; +}; +type StopEvent = { + type: EventType.stop; + data: Array>; +}; +type MetricEvent = { + type: EventType.metric; + data: Record>; +}; +type SnapshotEvent = { + type: EventType.snapshot; + data: Array>; +}; +type CumulativeEvent = { + type: EventType.cumulative; + data: Array>; +}; +type ThresholdEvent = { + type: EventType.threshold; + data: Record>; +}; +type DashboardEvent = ConfigEvent | ParamEvent | StartEvent | StopEvent | MetricEvent | SnapshotEvent | CumulativeEvent | ThresholdEvent; + type SampleVectorInit = { length: number; capacity: number; @@ -155,14 +202,9 @@ declare class Param implements Record { constructor(from?: Record); [x: string]: unknown; } -declare enum EventType { - config = "config", - param = "param", - start = "start", - stop = "stop", - metric = "metric", - snapshot = "snapshot", - cumulative = "cumulative" +declare class Thresholds implements Record> { + constructor(from?: Record); + [x: string]: Array; } declare class Digest implements EventListenerObject { config: Config; @@ -172,7 +214,8 @@ declare class Digest implements EventListenerObject { metrics: Metrics; samples: Samples; summary: Summary; - constructor({ config, param, start, stop, metrics, samples, summary }?: { + thresholds: Thresholds; + constructor({ config, param, start, stop, metrics, samples, summary, thresholds }?: { config?: Config | undefined; param?: Param | undefined; start?: Date | undefined; @@ -180,9 +223,10 @@ declare class Digest implements EventListenerObject { metrics?: Metrics | undefined; samples?: Samples | undefined; summary?: Summary | undefined; + thresholds?: Thresholds | undefined; }); handleEvent(event: MessageEvent): void; - onEvent(type: EventType, data: Record): void; + onEvent(event: DashboardEvent): void; private onConfig; private onParam; private onStart; @@ -190,6 +234,7 @@ declare class Digest implements EventListenerObject { private onMetric; private onSnapshot; private onCumulative; + private onThreshold; } -export { Aggregate, AggregateType, Config, Digest, EventType, Metric, MetricType, Metrics, Param, Query, SampleVector, SampleVectorInit, Samples, SamplesView, Summary, SummaryRow, SummaryView, UnitType, ValueType }; +export { Aggregate, AggregateType, Config, DashboardEvent, Digest, EventType, Metric, MetricType, Metrics, Param, Query, SampleVector, SampleVectorInit, Samples, SamplesView, Summary, SummaryRow, SummaryView, UnitType, ValueType }; diff --git a/dashboard/assets/packages/model/dist/index.js b/dashboard/assets/packages/model/dist/index.js index 4cc4f11..7df5b3f 100644 --- a/dashboard/assets/packages/model/dist/index.js +++ b/dashboard/assets/packages/model/dist/index.js @@ -11,18 +11,18 @@ var UnitType = /* @__PURE__ */ ((UnitType2) => { })(UnitType || {}); // src/Metrics.ts -var AggregateType = /* @__PURE__ */ ((AggregateType4) => { - AggregateType4["value"] = "value"; - AggregateType4["count"] = "count"; - AggregateType4["rate"] = "rate"; - AggregateType4["avg"] = "avg"; - AggregateType4["min"] = "min"; - AggregateType4["max"] = "max"; - AggregateType4["med"] = "med"; - AggregateType4["p90"] = "p90"; - AggregateType4["p95"] = "p95"; - AggregateType4["p99"] = "p99"; - return AggregateType4; +var AggregateType = /* @__PURE__ */ ((AggregateType3) => { + AggregateType3["value"] = "value"; + AggregateType3["count"] = "count"; + AggregateType3["rate"] = "rate"; + AggregateType3["avg"] = "avg"; + AggregateType3["min"] = "min"; + AggregateType3["max"] = "max"; + AggregateType3["med"] = "med"; + AggregateType3["p90"] = "p90"; + AggregateType3["p95"] = "p95"; + AggregateType3["p99"] = "p99"; + return AggregateType3; })(AggregateType || {}); var ValueType = /* @__PURE__ */ ((ValueType2) => { ValueType2["time"] = "time"; @@ -30,49 +30,60 @@ var ValueType = /* @__PURE__ */ ((ValueType2) => { ValueType2["default"] = "default"; return ValueType2; })(ValueType || {}); -var MetricType = /* @__PURE__ */ ((MetricType2) => { - MetricType2["gauge"] = "gauge"; - MetricType2["counter"] = "counter"; - MetricType2["rate"] = "rate"; - MetricType2["trend"] = "trend"; - return MetricType2; +var MetricType = /* @__PURE__ */ ((MetricType3) => { + MetricType3["gauge"] = "gauge"; + MetricType3["counter"] = "counter"; + MetricType3["rate"] = "rate"; + MetricType3["trend"] = "trend"; + return MetricType3; })(MetricType || {}); var Query = class { name; aggregate; - tags; - group; - scenario; constructor(query) { const [name, aggregate] = query.split(".", 2); this.aggregate = aggregate; this.name = name; - let sub = ""; - const idx = name.indexOf("{"); - if (idx && idx > 0) { - sub = name.substring(idx); - sub = sub.substring(1, sub.length - 1); - const cidx = sub.indexOf(":"); - const tname = sub.substring(0, cidx); - const tvalue = sub.substring(cidx + 1); - this.tags = { [tname]: tvalue }; - if (tname == "group") { - this.group = tvalue.substring(2); - } - this.name = name.substring(0, idx); - } } }; var propTime = "time"; var Metrics = class { values; - constructor({ values = {} } = {}) { + names; + _aggregates; + constructor({ values = {}, names = [] } = {}) { this.values = values; + this.names = names; + this._aggregates = {}; + } + set aggregates(value) { + for (const mname in value) { + const mtype = mname; + this._aggregates[mtype] = value[mtype].map((atype) => atype.replaceAll("(", "").replaceAll(")", "")); + } } onEvent(data) { for (const name in data) { this.values[name] = { ...data[name], name }; } + this.names = Object.keys(this.values); + this.names.sort(); + } + toAggregate(data) { + const out = {}; + for (let i = 0; i < data.length && i < this.names.length; i++) { + const metric = this.values[this.names[i]]; + if (!metric) { + continue; + } + const agg = {}; + const names = metric.type ? this._aggregates[metric.type] : []; + for (let j = 0; j < data[i].length && j < names.length; j++) { + agg[names[j]] = data[i][j]; + } + out[metric.name] = agg; + } + return out; } find(query) { const q = new Query(query); @@ -125,6 +136,19 @@ var Metrics = class { } }; +// src/Event.ts +var EventType = /* @__PURE__ */ ((EventType2) => { + EventType2["config"] = "config"; + EventType2["param"] = "param"; + EventType2["start"] = "start"; + EventType2["stop"] = "stop"; + EventType2["metric"] = "metric"; + EventType2["snapshot"] = "snapshot"; + EventType2["cumulative"] = "cumulative"; + EventType2["threshold"] = "threshold"; + return EventType2; +})(EventType || {}); + // src/Samples.ts import jmesspath from "jmespath"; var propTime2 = "time"; @@ -476,16 +500,11 @@ var Param = class { Object.assign(this, from); } }; -var EventType = /* @__PURE__ */ ((EventType2) => { - EventType2["config"] = "config"; - EventType2["param"] = "param"; - EventType2["start"] = "start"; - EventType2["stop"] = "stop"; - EventType2["metric"] = "metric"; - EventType2["snapshot"] = "snapshot"; - EventType2["cumulative"] = "cumulative"; - return EventType2; -})(EventType || {}); +var Thresholds = class { + constructor(from = {}) { + Object.assign(this, from); + } +}; var Digest = class { config; param; @@ -494,6 +513,7 @@ var Digest = class { metrics; samples; summary; + thresholds; constructor({ config = {}, param = {}, @@ -501,7 +521,8 @@ var Digest = class { stop = void 0, metrics = new Metrics(), samples = new Samples(), - summary = new Summary() + summary = new Summary(), + thresholds = new Thresholds() } = {}) { this.config = config; this.param = param; @@ -510,43 +531,38 @@ var Digest = class { this.metrics = metrics; this.samples = samples; this.summary = summary; + this.thresholds = thresholds; } handleEvent(event) { const type = event.type; const data = JSON.parse(event.data); - this.onEvent(type, data); + this.onEvent({ type, data }); } - onEvent(type, data) { - for (const name in data) { - for (const prop in data[name]) { - if (prop.indexOf("(") >= 0) { - const pname = prop.replaceAll("(", "").replaceAll(")", ""); - data[name][pname] = data[name][prop]; - delete data[name][prop]; - } - } - } - switch (type) { + onEvent(event) { + switch (event.type) { case "config" /* config */: - this.onConfig(data); + this.onConfig(event.data); break; case "param" /* param */: - this.onParam(data); + this.onParam(event.data); break; case "start" /* start */: - this.onStart(data); + this.onStart(this.metrics.toAggregate(event.data)); break; case "stop" /* stop */: - this.onStop(data); + this.onStop(this.metrics.toAggregate(event.data)); break; case "metric" /* metric */: - this.onMetric(data); + this.onMetric(event.data); break; case "snapshot" /* snapshot */: - this.onSnapshot(data); + this.onSnapshot(this.metrics.toAggregate(event.data)); break; case "cumulative" /* cumulative */: - this.onCumulative(data); + this.onCumulative(this.metrics.toAggregate(event.data)); + break; + case "threshold" /* threshold */: + this.onThreshold(event.data); break; } } @@ -555,6 +571,7 @@ var Digest = class { } onParam(data) { Object.assign(this.param, data); + this.metrics.aggregates = data["aggregates"]; } onStart(data) { if (data["time"] && data["time"].value) { @@ -579,6 +596,9 @@ var Digest = class { this.summary.onEvent(data); this.summary.annotate(this.metrics); } + onThreshold(data) { + Object.assign(this.thresholds, data); + } }; export { AggregateType, diff --git a/dashboard/assets/packages/model/src/Digest.ts b/dashboard/assets/packages/model/src/Digest.ts index dd80dc4..19ce1d4 100644 --- a/dashboard/assets/packages/model/src/Digest.ts +++ b/dashboard/assets/packages/model/src/Digest.ts @@ -2,9 +2,10 @@ // // SPDX-License-Identifier: AGPL-3.0-only -import { Metrics, Aggregate, AggregateType } from "./Metrics.ts" +import { Metrics, Aggregate, MetricType } from "./Metrics.ts" import { Samples } from "./Samples.ts" import { Summary } from "./Summary.ts" +import { EventType, DashboardEvent } from "./Event.ts" export class Config implements Record { constructor(from = {} as Record) { @@ -28,17 +29,6 @@ export class Thresholds implements Record> { [x: string]: Array } -export enum EventType { - config = "config", - param = "param", - start = "start", - stop = "stop", - metric = "metric", - snapshot = "snapshot", - cumulative = "cumulative", - threshold = "threshold" -} - export class Digest implements EventListenerObject { config: Config param: Param @@ -73,55 +63,45 @@ export class Digest implements EventListenerObject { const type = event.type as EventType const data = JSON.parse(event.data) - this.onEvent(type, data) + this.onEvent({ type, data } as DashboardEvent) } - onEvent(type: EventType, data: Record): void { - // rename p(XX) to pXX - for (const name in data) { - for (const prop in data[name]) { - if (prop.indexOf("(") >= 0) { - const pname = prop.replaceAll("(", "").replaceAll(")", "") as AggregateType - data[name][pname] = data[name][prop as AggregateType] - delete data[name][prop as AggregateType] - } - } - } - - switch (type) { + onEvent(event: DashboardEvent): void { + switch (event.type) { case EventType.config: - this.onConfig(data) + this.onConfig(event.data) break case EventType.param: - this.onParam(data) + this.onParam(event.data) break case EventType.start: - this.onStart(data) + this.onStart(this.metrics.toAggregate(event.data)) break case EventType.stop: - this.onStop(data) + this.onStop(this.metrics.toAggregate(event.data)) break case EventType.metric: - this.onMetric(data) + this.onMetric(event.data) break case EventType.snapshot: - this.onSnapshot(data) + this.onSnapshot(this.metrics.toAggregate(event.data)) break case EventType.cumulative: - this.onCumulative(data) + this.onCumulative(this.metrics.toAggregate(event.data)) break case EventType.threshold: - this.onThreshold(data) + this.onThreshold(event.data) break } } - private onConfig(data: Record): void { + private onConfig(data: Record): void { Object.assign(this.config, data) } - private onParam(data: Record): void { + private onParam(data: Record): void { Object.assign(this.param, data) + this.metrics.aggregates = data["aggregates"] as Record> } private onStart(data: Record): void { @@ -136,7 +116,7 @@ export class Digest implements EventListenerObject { } } - private onMetric(data: Record): void { + private onMetric(data: Record>): void { this.metrics.onEvent(data) this.samples.annotate(this.metrics) this.summary.annotate(this.metrics) @@ -152,7 +132,7 @@ export class Digest implements EventListenerObject { this.summary.annotate(this.metrics) } - private onThreshold(data: Record): void { + private onThreshold(data: Record>): void { Object.assign(this.thresholds, data) } } diff --git a/dashboard/assets/packages/model/src/Event.ts b/dashboard/assets/packages/model/src/Event.ts new file mode 100644 index 0000000..e3a8d18 --- /dev/null +++ b/dashboard/assets/packages/model/src/Event.ts @@ -0,0 +1,60 @@ +export enum EventType { + config = "config", + param = "param", + start = "start", + stop = "stop", + metric = "metric", + snapshot = "snapshot", + cumulative = "cumulative", + threshold = "threshold" +} + +export type ConfigEvent = { + type: EventType.config + data: Record +} + +export type ParamEvent = { + type: EventType.param + data: Record +} + +export type StartEvent = { + type: EventType.start + data: Array> +} + +export type StopEvent = { + type: EventType.stop + data: Array> +} + +export type MetricEvent = { + type: EventType.metric + data: Record> +} + +export type SnapshotEvent = { + type: EventType.snapshot + data: Array> +} + +export type CumulativeEvent = { + type: EventType.cumulative + data: Array> +} + +export type ThresholdEvent = { + type: EventType.threshold + data: Record> +} + +export type DashboardEvent = + | ConfigEvent + | ParamEvent + | StartEvent + | StopEvent + | MetricEvent + | SnapshotEvent + | CumulativeEvent + | ThresholdEvent diff --git a/dashboard/assets/packages/model/src/Metrics.ts b/dashboard/assets/packages/model/src/Metrics.ts index c0b77ff..9745960 100644 --- a/dashboard/assets/packages/model/src/Metrics.ts +++ b/dashboard/assets/packages/model/src/Metrics.ts @@ -38,43 +38,18 @@ export type Metric = { name: string contains?: ValueType type?: MetricType - thresholds?: Array custom?: boolean } export class Query { name: string aggregate?: AggregateType - tags?: Record - group?: string - scenario?: string constructor(query: string) { const [name, aggregate] = query.split(".", 2) this.aggregate = aggregate as AggregateType this.name = name - - let sub = "" - - const idx = name.indexOf("{") - if (idx && idx > 0) { - sub = name.substring(idx) - sub = sub.substring(1, sub.length - 1) - - const cidx = sub.indexOf(":") - - const tname = sub.substring(0, cidx) - const tvalue = sub.substring(cidx + 1) - - this.tags = { [tname]: tvalue } - - if (tname == "group") { - this.group = tvalue.substring(2) - } - - this.name = name.substring(0, idx) - } } } @@ -82,14 +57,49 @@ const propTime = "time" export class Metrics { values: Record - constructor({ values = {} } = {}) { + names: Array + _aggregates: Record> + constructor({ values = {}, names = [] } = {}) { this.values = values + this.names = names + this._aggregates = {} as Record> + } + + set aggregates(value: Record>) { + for (const mname in value) { + const mtype = mname as MetricType + this._aggregates[mtype] = value[mtype].map((atype) => atype.replaceAll("(", "").replaceAll(")", "") as AggregateType) + } } onEvent(data: Record): void { for (const name in data) { this.values[name] = { ...data[name], name } } + + this.names = Object.keys(this.values) + this.names.sort() + } + + toAggregate(data: Array>): Record { + const out = {} as Record + + for (let i = 0; i < data.length && i < this.names.length; i++) { + const metric = this.values[this.names[i]] + if (!metric) { + continue + } + const agg = {} as Aggregate + const names = metric.type ? this._aggregates[metric.type] : [] + + for (let j = 0; j < data[i].length && j < names.length; j++) { + agg[names[j]] = data[i][j] + } + + out[metric.name] = agg + } + + return out } find(query: string): Metric | undefined { diff --git a/dashboard/assets/packages/model/src/index.ts b/dashboard/assets/packages/model/src/index.ts index 25d4f5d..7db38cb 100644 --- a/dashboard/assets/packages/model/src/index.ts +++ b/dashboard/assets/packages/model/src/index.ts @@ -5,6 +5,7 @@ export { UnitType } from "./UnitType.ts" export { Metric, Metrics, MetricType, ValueType, Query, Aggregate, AggregateType } from "./Metrics.ts" -export { Config, Param, Digest, EventType } from "./Digest.ts" +export { EventType, DashboardEvent } from "./Event.ts" +export { Config, Param, Digest } from "./Digest.ts" export { Samples, SamplesView, SampleVector, SampleVectorInit } from "./Samples.ts" export { Summary, SummaryView, SummaryRow } from "./Summary.ts" diff --git a/dashboard/assets/packages/report/dist/index.html b/dashboard/assets/packages/report/dist/index.html index 69b844d..be4f0b4 100644 --- a/dashboard/assets/packages/report/dist/index.html +++ b/dashboard/assets/packages/report/dist/index.html @@ -15,17 +15,17 @@ k6 report