Skip to content

Commit

Permalink
sqlstats: improve GetPercentileValues performance
Browse files Browse the repository at this point in the history
Previously, the fucntion to Query the percentile values
was also doing a flush of the stream. This was required
because during Insights detector, everytime some data was
added a query was done after, so a flush had to be done to
guarantee precision.
When making the usage of the same Query function for the
`GetPercentileValues` it was still doing a flush, but that
was not necessary, since there was no new data, and if there
was the detector was going to take care of the flush. It could
also be causing contention on the stream.

Since there is no need for the flush when retrieving the info
for sql stats, this commits updates the Query function
to have an optional flush, that can be skipped on that case.
Probably other improvements can be made on the existing path,
but this PR focus on the performance degradations caused by
the fucntion `GetPercentileValues` and no behaviour change
is made on other existing flows.

Fixes #102208

Release note: None
  • Loading branch information
maryliag committed Apr 26, 2023
1 parent 4751d63 commit c18d0af
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 17 deletions.
14 changes: 8 additions & 6 deletions pkg/sql/sqlstats/insights/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (d *anomalyDetector) isSlow(stmt *Statement) (decision bool) {

d.withFingerprintLatencySummary(stmt, func(latencySummary *quantile.Stream) {
latencySummary.Insert(stmt.LatencyInSeconds)
p50 := latencySummary.Query(0.5)
p99 := latencySummary.Query(0.99)
p50 := latencySummary.Query(0.5, true)
p99 := latencySummary.Query(0.99, true)
decision = stmt.LatencyInSeconds >= p99 &&
stmt.LatencyInSeconds >= 2*p50 &&
stmt.LatencyInSeconds >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds()
Expand All @@ -91,7 +91,9 @@ func (d *anomalyDetector) isSlow(stmt *Statement) (decision bool) {
return
}

func (d *anomalyDetector) GetPercentileValues(id appstatspb.StmtFingerprintID) PercentileValues {
func (d *anomalyDetector) GetPercentileValues(
id appstatspb.StmtFingerprintID, shouldFlush bool,
) PercentileValues {
// latencySummary.Query might modify its own state (Stream.flush), so a read-write lock is necessary.
d.mu.Lock()
defer d.mu.Unlock()
Expand All @@ -100,9 +102,9 @@ func (d *anomalyDetector) GetPercentileValues(id appstatspb.StmtFingerprintID) P
latencySummary := entry.Value.(latencySummaryEntry).value
// If more percentiles are added, update the value of `desiredQuantiles` above
// to include the new keys.
latencies.P50 = latencySummary.Query(0.5)
latencies.P90 = latencySummary.Query(0.9)
latencies.P99 = latencySummary.Query(0.99)
latencies.P50 = latencySummary.Query(0.5, shouldFlush)
latencies.P90 = latencySummary.Query(0.9, shouldFlush)
latencies.P99 = latencySummary.Query(0.99, shouldFlush)
}
return latencies
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/insights/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type Reader interface {
}

type LatencyInformation interface {
GetPercentileValues(fingerprintID appstatspb.StmtFingerprintID) PercentileValues
GetPercentileValues(fingerprintID appstatspb.StmtFingerprintID, shouldFlush bool) PercentileValues
}

type PercentileValues struct {
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ func (s *Container) RecordStatement(

// Percentile latencies are only being sampled if the latency was above the
// AnomalyDetectionLatencyThreshold.
latencies := s.latencyInformation.GetPercentileValues(stmtFingerprintID)
// The Insights detector already does a flush when detecting for anomaly latency,
// so there is no need to force a flush when retrieving the data during this step.
latencies := s.latencyInformation.GetPercentileValues(stmtFingerprintID, false)
latencyInfo := appstatspb.LatencyInfo{
Min: value.ServiceLatency,
Max: value.ServiceLatency,
Expand Down
10 changes: 8 additions & 2 deletions pkg/util/quantile/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ func (s *Stream) insert(sample Sample) {
// Query returns the computed qth percentiles value. If s was created with
// NewTargeted, and q is not in the set of quantiles provided a priori, Query
// will return an unspecified result.
func (s *Stream) Query(q float64) float64 {
// flush can be a heavy operation, but is required on cases where you need high precision
// of the data because new data was just added (shouldFlush = true).
// If is okay to have some delay on the data or no new data was added the flush
// can be skipped (shouldFlush = false).
func (s *Stream) Query(q float64, shouldFlush bool) float64 {
if !s.flushed() {
// Fast path when there hasn't been enough data for a flush;
// this also yields better accuracy for small sets of data.
Expand All @@ -181,7 +185,9 @@ func (s *Stream) Query(q float64) float64 {
s.maybeSort()
return s.b[i].Value
}
s.flush()
if shouldFlush {
s.flush()
}
return s.stream.query(q)
}

Expand Down
42 changes: 35 additions & 7 deletions pkg/util/quantile/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func verifyPercsWithAbsoluteEpsilon(t *testing.T, a []float64, s *Stream) {
upper = len(a)
}
w, min, max := a[k-1], a[lower-1], a[upper-1]
if g := s.Query(quantile); g < min || g > max {
if g := s.Query(quantile, true); g < min || g > max {
t.Errorf("q=%f: want %v [%f,%f], got %v", quantile, w, min, max, g)
}
}
Expand All @@ -76,7 +76,7 @@ func verifyLowPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) {
lowerRank := int((1 - RelativeEpsilon) * qu * n)
upperRank := int(math.Ceil((1 + RelativeEpsilon) * qu * n))
w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1]
if g := s.Query(qu); g < min || g > max {
if g := s.Query(qu, true); g < min || g > max {
t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g)
}
}
Expand All @@ -91,7 +91,7 @@ func verifyHighPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) {
lowerRank := int((1 - (1+RelativeEpsilon)*(1-qu)) * n)
upperRank := int(math.Ceil((1 - (1-RelativeEpsilon)*(1-qu)) * n))
w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1]
if g := s.Query(qu); g < min || g > max {
if g := s.Query(qu, true); g < min || g > max {
t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g)
}
}
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestTargetedQuerySmallSampleSize(t *testing.T) {
0.90: 5,
0.99: 5,
} {
if got := s.Query(φ); got != want {
if got := s.Query(φ, true); got != want {
t.Errorf("want %f for φ=%f, got %f", want, φ, got)
}
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestUncompressed(t *testing.T) {
// Before compression, Query should have 100% accuracy.
for quantile := range Targets {
w := quantile * 100
if g := q.Query(quantile); g != w {
if g := q.Query(quantile, true); g != w {
t.Errorf("want %f, got %f", w, g)
}
}
Expand All @@ -219,17 +219,45 @@ func TestUncompressedSamples(t *testing.T) {
func TestUncompressedOne(t *testing.T) {
q := NewTargeted(map[float64]float64{0.99: 0.01})
q.Insert(3.14)
if g := q.Query(0.90); g != 3.14 {
if g := q.Query(0.90, true); g != 3.14 {
t.Error("want PI, got", g)
}
}

func TestDefaults(t *testing.T) {
if g := NewTargeted(map[float64]float64{0.99: 0.001}).Query(0.99); g != 0 {
if g := NewTargeted(map[float64]float64{0.99: 0.001}).Query(0.99, true); g != 0 {
t.Errorf("want 0, got %f", g)
}
}

func TestQueryFlush(t *testing.T) {
q := NewTargeted(map[float64]float64{0.99: 0.001})
for i := 1; i <= 100; i++ {
q.Insert(float64(i))
}
// A flush after all inserts should make all following `Query`
// give the same result with shouldFlush true or false.
q.flush()
if p := q.Query(0.90, true); p != 91 {
t.Error("want 91, got", p)
}
if p := q.Query(0.90, false); p != 91 {
t.Error("want 91, got", p)
}

// Do an insert without forcing a flush. The Query with
// shouldFlush false will ignore the new value and return
// the same result as before.
q.Insert(float64(101))
if p := q.Query(0.90, false); p != 91 {
t.Error("want 91, got", p)
}
// The Query with flush will update the value.
if p := q.Query(0.90, true); p != 92 {
t.Error("want 92, got", p)
}
}

func TestByteSize(t *testing.T) {
// Empty size is nonzero.
q := NewTargeted(Targets)
Expand Down

0 comments on commit c18d0af

Please sign in to comment.