Skip to content

Commit

Permalink
sql: implement IterateStatementStats() for PersistedSQLStats
Browse files Browse the repository at this point in the history
Previously, IterateStatementStats() for PersistedSQLStats was
left unimplemented and it defaults to the implementation of
SQLStats.IterateStatementStats(). This means calls to
IterateStatementStats() on PersistedSQLStats cannot read
the statement statistitcs stored in system table.

This commit implements the IterateStatementStats() through
the new CombinedStmtStatsIterator which enables this method
to read both in-memory and persited statement statistics.

Release note: None
  • Loading branch information
Azhng committed Aug 12, 2021
1 parent ab15e5f commit 08a8eec
Show file tree
Hide file tree
Showing 12 changed files with 826 additions and 145 deletions.
301 changes: 208 additions & 93 deletions pkg/roachpb/app_stats.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/roachpb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ message StatementStatisticsKey {
optional bool vec = 7 [(gogoproto.nullable) = false];
optional bool full_scan = 8 [(gogoproto.nullable) = false];
optional string database = 9 [(gogoproto.nullable) = false];
optional uint64 plan_hash = 10 [(gogoproto.nullable) = false];
}

// CollectedStatementStatistics wraps collected timings and metadata for some
Expand All @@ -190,6 +191,7 @@ message CollectedStatementStatistics {
optional uint64 id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID", (gogoproto.casttype) = "StmtFingerprintID"];
optional StatementStatisticsKey key = 1 [(gogoproto.nullable) = false];
optional StatementStatistics stats = 2 [(gogoproto.nullable) = false];
optional google.protobuf.Timestamp aggregated_ts = 4 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true];
}


Expand All @@ -202,6 +204,7 @@ message CollectedTransactionStatistics {
// App is the name of the app which executed the transaction.
optional string app = 2 [(gogoproto.nullable) = false];
optional TransactionStatistics stats = 3 [(gogoproto.nullable) = false];
optional google.protobuf.Timestamp aggregated_ts = 4 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true];
}


Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ go_library(
name = "persistedsqlstats",
srcs = [
"cluster_settings.go",
"combined_iterator.go",
"flush.go",
"mem_iterator.go",
"provider.go",
"stmt_reader.go",
"test_utils.go",
"writer.go",
],
Expand All @@ -26,6 +29,7 @@ go_library(
"//pkg/sql/sqlstats/sslocal",
"//pkg/sql/sqlstats/ssmemstorage",
"//pkg/sql/sqlutil",
"//pkg/util/encoding",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/stop",
Expand All @@ -39,6 +43,7 @@ go_test(
srcs = [
"flush_test.go",
"main_test.go",
"reader_test.go",
],
deps = [
":persistedsqlstats",
Expand Down
198 changes: 198 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/combined_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package persistedsqlstats

import (
"context"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/errors"
)

// CombinedStmtStatsIterator is an iterator that iterates through both
// in-memory and persisted stmt stats provided by the in-memory iterator and
// the on-disk iterator.
type CombinedStmtStatsIterator struct {
nextToRead *roachpb.CollectedStatementStatistics
expectedColCnt int

mem struct {
canBeAdvanced bool
paused bool
it *memStmtStatsIterator
}

disk struct {
canBeAdvanced bool
paused bool
it sqlutil.InternalRows
}
}

// NewCombinedStmtStatsIterator returns a new instance of
// CombinedStmtStatsIterator.
func NewCombinedStmtStatsIterator(
memIter *memStmtStatsIterator, diskIter sqlutil.InternalRows, expectedColCnt int,
) *CombinedStmtStatsIterator {
c := &CombinedStmtStatsIterator{
expectedColCnt: expectedColCnt,
}

c.mem.it = memIter
c.mem.canBeAdvanced = true

c.disk.it = diskIter
c.disk.canBeAdvanced = true

return c
}

// Next increments the internal counter of the CombinedStmtStatsIterator. It
// returns true if the following Cur() call will be valid, false otherwise.
func (c *CombinedStmtStatsIterator) Next(ctx context.Context) (bool, error) {
var err error

if c.mem.canBeAdvanced && !c.mem.paused {
c.mem.canBeAdvanced = c.mem.it.Next()
}

if c.disk.canBeAdvanced && !c.disk.paused {
c.disk.canBeAdvanced, err = c.disk.it.Next(ctx)
if err != nil {
return false, err
}
}

// Both iterators are exhausted, no new value can be produced.
if !c.mem.canBeAdvanced && !c.disk.canBeAdvanced {
// Sanity check.
if c.mem.paused || c.disk.paused {
return false, errors.AssertionFailedf("bug: leaked iterator")
}
return false, nil
}

// If memIter is exhausted, but disk iterator can still move forward.
// We promote the disk.Cur() and resume the disk iterator if it was paused.
if !c.mem.canBeAdvanced {
row := c.disk.it.Cur()
if row == nil {
return false, errors.New("unexpected nil row")
}

if len(row) != c.expectedColCnt {
return false, errors.AssertionFailedf("unexpectedly received %d columns", len(row))
}

c.nextToRead, err = rowToStmtStats(c.disk.it.Cur())
if err != nil {
return false, err
}

if c.disk.canBeAdvanced {
c.disk.paused = false
}
return true, nil
}

// If diskIter is exhausted, but mem iterator can still move forward.
// We promote the mem.Cur() and resume the mem iterator if it was paused.
if !c.disk.canBeAdvanced {
c.nextToRead = c.mem.it.Cur()

if c.mem.canBeAdvanced {
c.mem.paused = false
}
return true, nil
}

// Both iterators can be moved forward. Now we check the value of Cur()
// for both iterators. We will have a few scenarios:
// 1. mem.Cur() < disk.Cur():
// we promote mem.Cur() to c.nextToRead. We then pause
// the disk iterator and resume the mem iterator for next iteration.
// 2. mem.Cur() == disk.Cur():
// we promote both mem.Cur() and disk.Cur() by merging both
// stats. We resume both iterators for next iteration.
// 3. mem.Cur() > disk.Cur():
// we promote disk.Cur() to c.nextToRead. We then pause
// mem iterator and resume disk iterator for next iteration.
memCurVal := c.mem.it.Cur()
diskCurVal, err := rowToStmtStats(c.disk.it.Cur())
if err != nil {
return false, err
}

switch compareStmtStats(memCurVal, diskCurVal) {
case -1:
// First Case.
c.nextToRead = memCurVal
c.mem.paused = false
c.disk.paused = true
case 0:
// Second Case.
c.nextToRead = memCurVal
c.nextToRead.Stats.Add(&diskCurVal.Stats)
c.mem.paused = false
c.disk.paused = false
case 1:
// Third Case.
c.nextToRead = diskCurVal
c.mem.paused = true
c.disk.paused = false
default:
return false, errors.AssertionFailedf("bug: impossible state")
}

return true, nil
}

// Cur returns the roachpb.CollectedStatementStatistics at the current internal
// counter.
func (c *CombinedStmtStatsIterator) Cur() *roachpb.CollectedStatementStatistics {
return c.nextToRead
}

func compareStmtStats(lhs, rhs *roachpb.CollectedStatementStatistics) int {
// 1. we compare their aggregated_ts
if lhs.AggregatedTs.Before(rhs.AggregatedTs) {
return -1
}
if lhs.AggregatedTs.After(rhs.AggregatedTs) {
return 1
}

// 2. we compare their app name.
cmp := strings.Compare(lhs.Key.App, rhs.Key.App)
if cmp != 0 {
return cmp
}

// 3. we compare their fingerprint ID.
if lhs.ID < rhs.ID {
return -1
}
if lhs.ID > rhs.ID {
return 1
}

// 4. we compare their plan hash.
if lhs.Key.PlanHash < rhs.Key.PlanHash {
return -1
}
if lhs.Key.PlanHash > rhs.Key.PlanHash {
return 1
}

return 0
}
4 changes: 2 additions & 2 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) {

// The flush routine directly logs errors if they are encountered. Therefore,
// no error is returned here.
_ = s.IterateStatementStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
_ = s.SQLStats.IterateStatementStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleStmtStats(ctx, statistics)
}, "failed to flush statement statistics" /* errMsg */)

return nil
})
_ = s.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, key roachpb.TransactionFingerprintID, statistics *roachpb.CollectedTransactionStatistics) error {
_ = s.SQLStats.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, key roachpb.TransactionFingerprintID, statistics *roachpb.CollectedTransactionStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleTxnStats(ctx, key, statistics)
}, "failed to flush transaction statistics" /* errMsg */)
Expand Down
Loading

0 comments on commit 08a8eec

Please sign in to comment.