Skip to content

Commit

Permalink
keyvisualizer: pre-aggregate ranges
Browse files Browse the repository at this point in the history
Previously, there was no bound on the number of ranges that
could be propagated to the collectors. After collection,
data was downsampled using a simple heurstic to decide if
a bucket was worth keeping or if it should be aggregated with
its neighbor.

In this commit, I've introduced a function, `maybeAggregateBoundaries`,
to prevent more than `keyvisualizer.max_buckets` from being propagated
to collectors. This pre-aggregation takes the place of the post-collection
downsampling. For the first stable release of the key visualizer,
I am intentionally sacrificing dynamic resolution and prioritizing boundary
stability instead. This trade-off means that the key visualizer will
demand less network, memory, and storage resources from the cluster
while operating.

Additionally, this PR drops the sample retention time from 14 days to 7 days,
and ensures that `keyvisualizer.max_buckets` is bounded between [1, 1024].

Resolves: #96740
Epic: None
Release note: None
  • Loading branch information
zachlite committed Mar 15, 2023
1 parent a36d88b commit f8ba475
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 229 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ ALL_TESTS = [
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
"//pkg/keyvisualizer/spanstatsconsumer:spanstatsconsumer_test",
"//pkg/kv/bulk:bulk_test",
"//pkg/kv/kvclient/kvcoord:kvcoord_disallowed_imports_test",
"//pkg/kv/kvclient/kvcoord:kvcoord_test",
Expand Down Expand Up @@ -1173,6 +1174,7 @@ GO_TARGETS = [
"//pkg/keyvisualizer/keyvissubscriber:keyvissubscriber",
"//pkg/keyvisualizer/spanstatscollector:spanstatscollector",
"//pkg/keyvisualizer/spanstatsconsumer:spanstatsconsumer",
"//pkg/keyvisualizer/spanstatsconsumer:spanstatsconsumer_test",
"//pkg/keyvisualizer/spanstatskvaccessor:spanstatskvaccessor",
"//pkg/keyvisualizer:keyvisualizer",
"//pkg/kv/bulk/bulkpb:bulkpb",
Expand Down
8 changes: 7 additions & 1 deletion pkg/keyvisualizer/keyvissettings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package keyvissettings

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -39,5 +40,10 @@ var MaxBuckets = settings.RegisterIntSetting(
"keyvisualizer.max_buckets",
"the maximum number of buckets in a sample",
256,
settings.NonNegativeIntWithMaximum(1024),
func(i int64) error {
if i < 1 || i > 1024 {
return fmt.Errorf("expected max_buckets to be in range [1, 1024], got %d", i)
}
return nil
},
)
19 changes: 13 additions & 6 deletions pkg/keyvisualizer/spanstatsconsumer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "spanstatsconsumer",
srcs = [
"downsample.go",
"span_stats_consumer.go",
],
srcs = ["span_stats_consumer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/keyvisualizer/spanstatsconsumer",
visibility = ["//visibility:public"],
deps = [
"//pkg/keyvisualizer/keyvispb",
"//pkg/keyvisualizer/keyvissettings",
"//pkg/keyvisualizer/keyvisstorage",
"//pkg/keyvisualizer/spanstatskvaccessor",
Expand All @@ -23,4 +19,15 @@ go_library(
],
)

go_test(
name = "spanstatsconsumer_test",
srcs = ["span_stats_consumer_test.go"],
args = ["-test.timeout=295s"],
embed = [":spanstatsconsumer"],
deps = [
"//pkg/roachpb",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
208 changes: 0 additions & 208 deletions pkg/keyvisualizer/spanstatsconsumer/downsample.go

This file was deleted.

43 changes: 35 additions & 8 deletions pkg/keyvisualizer/spanstatsconsumer/span_stats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package spanstatsconsumer

import (
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvissettings"
Expand Down Expand Up @@ -73,11 +74,6 @@ func (s *SpanStatsConsumer) GetSamples(ctx context.Context) error {
return err
}

maxBuckets := keyvissettings.MaxBuckets.Get(&s.settings.SV)
for i, sample := range samplesRes.Samples {
samplesRes.Samples[i].SpanStats = downsample(sample.SpanStats, int(maxBuckets))
}

if err := keyvisstorage.WriteSamples(ctx, s.ie, samplesRes.Samples); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(
err, "write samples failed"))
Expand All @@ -86,6 +82,36 @@ func (s *SpanStatsConsumer) GetSamples(ctx context.Context) error {
return nil
}

// maybeAggregateBoundaries aggregates boundaries if len(boundaries) <= max.
func maybeAggregateBoundaries(boundaries []roachpb.Span, max int) ([]roachpb.Span, error) {
if len(boundaries) <= max {
return boundaries, nil
}

// combineFactor is the factor that the length of the boundaries slice is reduced by. For example,
// if len(boundaries) == 1000, and max == 100, combineFactor == 10.
// if len(boundaries) == 1001, and max == 100, combineFactor == 11.
combineFactor := int(math.Ceil(float64(len(boundaries)) / float64(max)))
combinedLength := int(math.Ceil(float64(len(boundaries)) / float64(combineFactor)))
combined := make([]roachpb.Span, combinedLength)

// Iterate through boundaries, incrementing by combineFactor.
for i := 0; i < combinedLength; i++ {
startIdx := i * combineFactor
if startIdx >= len(boundaries) {
return nil, errors.New("could not aggregate boundaries")
}
startSpan := boundaries[startIdx]
endIndex := startIdx + combineFactor - 1
if endIndex >= len(boundaries) {
combined[i] = startSpan
} else {
combined[i] = startSpan.Combine(boundaries[endIndex])
}
}
return combined, nil
}

// decideBoundaries decides the key spans that we want statistics
// for. For now, it will tell KV to collect statistics for all
// ranges from [Min, Max).
Expand Down Expand Up @@ -116,13 +142,14 @@ func (s *SpanStatsConsumer) decideBoundaries(ctx context.Context) ([]roachpb.Spa
s.ri.Next(ctx)
}

return boundaries, nil
maxBuckets := keyvissettings.MaxBuckets.Get(&s.settings.SV)
return maybeAggregateBoundaries(boundaries, int(maxBuckets))
}

// DeleteExpiredSamples deletes historical samples older than 2 weeks.
func (s *SpanStatsConsumer) DeleteExpiredSamples(ctx context.Context) error {
twoWeeksAgo := timeutil.Now().AddDate(0, 0, -14)
if err := keyvisstorage.DeleteSamplesBeforeTime(ctx, s.ie, twoWeeksAgo); err != nil {
oneWeekAgo := timeutil.Now().AddDate(0, 0, -7)
if err := keyvisstorage.DeleteSamplesBeforeTime(ctx, s.ie, oneWeekAgo); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(
err, "delete expired samples failed"))
}
Expand Down
Loading

0 comments on commit f8ba475

Please sign in to comment.