Skip to content

Commit

Permalink
feat: SSE stream optimization
Browse files Browse the repository at this point in the history
On the SSE stream, the snapshot and cumulative events are displayed as JSON objects, where the metric names are the property names. The property values are also JSON objects, in which the property names are the names of the aggregation function (avg, min, max, etc.). This two-level object hierarchy has not caused any problems so far, the amount of data has been acceptable.

Currently, aggregate counting per tag is still supported, but by default it is only allowed for the group tag. It would be necessary to turn on the aggregate calculation for the url and name tags as well (per URL graphs, summary, etc.). With the current SSE stream format, this would significantly increase the amount of data. This would have a direct impact on the k6 memory requirement, so it is a problem.

Based on measurements, it can be concluded that using a two-level array structure instead of the two-level object structure used in snapshot and cumulative events significantly reduces the size of the events and thus also the memory requirement.

For example, for the scripts/demo/demo.js test script, the size of the SSE stream is reduced by 65%. With url and name tags enabled, the new format results in a 10% size increase compared to the original size (without these tags). That is, the new format enables aggregate calculation based on url and name tags, which is a prerequisite for several requested features.

Therefore, instead of the two-level object structure, the use of a two-level array structure must be implemented in the snapshot and cumulative events. The packages/model package can hide SSE stream changes from the rest of the dashboard UI.
  • Loading branch information
szkiba committed Feb 12, 2024
1 parent 2fdaa94 commit 5801436
Show file tree
Hide file tree
Showing 23 changed files with 478 additions and 267 deletions.
7 changes: 4 additions & 3 deletions dashboard/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type aggregator struct {

once sync.Once

seenMetrics map[string]struct{}
seenMetrics []string
}

func closer(what io.Closer, logger logrus.FieldLogger) {
Expand All @@ -57,7 +57,7 @@ func aggregate(input, output string, opts *options, proc *process) error {
agg.registry = newRegistry()
agg.options = opts
agg.logger = proc.logger
agg.seenMetrics = make(map[string]struct{})
agg.seenMetrics = make([]string, 0)

var inputFile, outputFile afero.File
var err error
Expand Down Expand Up @@ -176,8 +176,9 @@ func (agg *aggregator) updateAndSend(
return
}

newbies := met.newbies(agg.seenMetrics)
newbies, updated := met.newbies(agg.seenMetrics)
if len(newbies) != 0 {
agg.seenMetrics = updated
agg.fireEvent(metricEvent, newbies)
}

Expand Down
75 changes: 60 additions & 15 deletions dashboard/assets/packages/model/dist/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,72 @@ type Metric = {
name: string;
contains?: ValueType;
type?: MetricType;
custom?: boolean;
};
declare class Query {
name: string;
aggregate?: AggregateType;
tags?: Record<string, string>;
group?: string;
scenario?: string;
constructor(query: string);
}
declare class Metrics {
values: Record<string, Metric>;
constructor({ values }?: {
names: Array<string>;
_aggregates: Record<MetricType, Array<AggregateType>>;
constructor({ values, names }?: {
values?: {} | undefined;
names?: never[] | undefined;
});
set aggregates(value: Record<MetricType, Array<string>>);
onEvent(data: Record<string, object>): void;
toAggregate(data: Array<Array<number>>): Record<string, Aggregate>;
find(query: string): Metric | undefined;
unit(name: string, aggregate?: AggregateType): UnitType;
}

declare enum EventType {
config = "config",
param = "param",
start = "start",
stop = "stop",
metric = "metric",
snapshot = "snapshot",
cumulative = "cumulative",
threshold = "threshold"
}
type ConfigEvent = {
type: EventType.config;
data: Record<string, unknown>;
};
type ParamEvent = {
type: EventType.param;
data: Record<string, unknown>;
};
type StartEvent = {
type: EventType.start;
data: Array<Array<number>>;
};
type StopEvent = {
type: EventType.stop;
data: Array<Array<number>>;
};
type MetricEvent = {
type: EventType.metric;
data: Record<string, Record<string, object>>;
};
type SnapshotEvent = {
type: EventType.snapshot;
data: Array<Array<number>>;
};
type CumulativeEvent = {
type: EventType.cumulative;
data: Array<Array<number>>;
};
type ThresholdEvent = {
type: EventType.threshold;
data: Record<string, Array<string>>;
};
type DashboardEvent = ConfigEvent | ParamEvent | StartEvent | StopEvent | MetricEvent | SnapshotEvent | CumulativeEvent | ThresholdEvent;

type SampleVectorInit = {
length: number;
capacity: number;
Expand Down Expand Up @@ -155,14 +202,9 @@ declare class Param implements Record<string, unknown> {
constructor(from?: Record<string, unknown>);
[x: string]: unknown;
}
declare enum EventType {
config = "config",
param = "param",
start = "start",
stop = "stop",
metric = "metric",
snapshot = "snapshot",
cumulative = "cumulative"
declare class Thresholds implements Record<string, Array<string>> {
constructor(from?: Record<string, string[]>);
[x: string]: Array<string>;
}
declare class Digest implements EventListenerObject {
config: Config;
Expand All @@ -172,24 +214,27 @@ declare class Digest implements EventListenerObject {
metrics: Metrics;
samples: Samples;
summary: Summary;
constructor({ config, param, start, stop, metrics, samples, summary }?: {
thresholds: Thresholds;
constructor({ config, param, start, stop, metrics, samples, summary, thresholds }?: {
config?: Config | undefined;
param?: Param | undefined;
start?: Date | undefined;
stop?: Date | undefined;
metrics?: Metrics | undefined;
samples?: Samples | undefined;
summary?: Summary | undefined;
thresholds?: Thresholds | undefined;
});
handleEvent(event: MessageEvent): void;
onEvent(type: EventType, data: Record<string, Aggregate>): void;
onEvent(event: DashboardEvent): void;
private onConfig;
private onParam;
private onStart;
private onStop;
private onMetric;
private onSnapshot;
private onCumulative;
private onThreshold;
}

export { Aggregate, AggregateType, Config, Digest, EventType, Metric, MetricType, Metrics, Param, Query, SampleVector, SampleVectorInit, Samples, SamplesView, Summary, SummaryRow, SummaryView, UnitType, ValueType };
export { Aggregate, AggregateType, Config, DashboardEvent, Digest, EventType, Metric, MetricType, Metrics, Param, Query, SampleVector, SampleVectorInit, Samples, SamplesView, Summary, SummaryRow, SummaryView, UnitType, ValueType };
Loading

0 comments on commit 5801436

Please sign in to comment.