Skip to content

Commit

Permalink
Merge #71731 #71897
Browse files Browse the repository at this point in the history
71731: sql: consistent aggregated timestamp when flushing sql stats r=matthewtodd a=Azhng

Previously, when SQL Stats are flushed to system table, the
aggregatedTs column for SQL Stats is calculated for each stats
entry individually. This means that if a flush starts near
the end of each hour, it is possible that different stats
rows will be assigned two different aggregated timestamp.
Currently, flusher flushes statement statistics first, and only
when statement statistics are flushed, transaction statistics
will be flushed into system table. This means that it is likely
the transaction statistics will get assigned a different
aggregatedTs than the statement statistics.
Consequentially, when the frontend fetches SQL Stats through
the CombinedStmtStats handler, the frontend default performs
range scan at 1 hour interval. This triggers a range scan
on the system table for that 1 hour range. This causes the
statement statisitcs that got assigned a different aggregatedTs
to be omitted from the result.

This commit changed the flusher to only compute aggregatedTs once
before the flush actually happen, and assign that aggregatedTs
too *all* stmt/txn stats rows. Statements executed in the same
aggregation interval can be looked up by the corresponding
statement fingerprint ID stored in transaction stats metadata.

Follow up to #71596

Release note: None

71897: ui:app filter is has multi select option r=maryliag a=maryliag

Previously you could only select one App on the filter for
Transaction and Statement page. This commits introduces a
multi select option, making possible for the user select
several apps at the same time and exclude internal results.

This commits also properly sets the filter value for database
with no values to (unset).

Partially addresses #70544

Not included in this commit:
- Select all apps except internal by default
- Add app filter to Session tab

Before
<img width="308" alt="Screen Shot 2021-10-22 at 6 27 25 PM" src="https://user-images.githubusercontent.com/1017486/138529700-012a3530-a7ef-4bbe-bd81-0a0a49c5be61.png">
<img width="278" alt="Screen Shot 2021-10-22 at 6 29 37 PM" src="https://user-images.githubusercontent.com/1017486/138529713-0c9c309d-5255-4fe9-be75-5edb5bdf0580.png">


After
<img width="273" alt="Screen Shot 2021-10-22 at 6 27 37 PM" src="https://user-images.githubusercontent.com/1017486/138529626-1966a7c1-fba5-4167-9c17-351cc0f8da3d.png">
<img width="273" alt="Screen Shot 2021-10-22 at 6 29 51 PM" src="https://user-images.githubusercontent.com/1017486/138529726-39ff6c0b-97ec-4ea4-af9f-aa2d3d15ac80.png">

Release note (ui change): App filter on Transaction and Statement
pages is now multi select.

Co-authored-by: Azhng <[email protected]>
Co-authored-by: Marylia Gutierrez <[email protected]>
  • Loading branch information
3 people committed Oct 25, 2021
3 parents 322dd53 + f96d3a3 + b963f54 commit e9b69a8
Show file tree
Hide file tree
Showing 14 changed files with 477 additions and 165 deletions.
235 changes: 180 additions & 55 deletions pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/datadriven"
)

const (
timeArgs = "time"
dbNameArgs = "db"
implicitTxnArgs = "implicitTxn"
fingerprintArgs = "fingerprint"
timeArgs = "time"
dbNameArgs = "db"
implicitTxnArgs = "implicitTxn"
fingerprintArgs = "fingerprint"
eventArgs = "event"
eventCallbackCmd = "callbackCmd"
eventCallbackCmdArgKey = "callbackCmdArgKey"
eventCallbackCmdArgValue = "callbackCmdArgValue"
)

// TestSQLStatsDataDriven runs the data-driven tests in
Expand All @@ -63,9 +68,13 @@ func TestSQLStatsDataDriven(t *testing.T) {
defer log.Scope(t).Close(t)

stubTime := &stubTime{}
injector := newRuntimeKnobsInjector()

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
params.Knobs.SQLStatsKnobs.(*sqlstats.TestingKnobs).StubTimeNow = stubTime.Now
params.Knobs.SQLStatsKnobs.(*sqlstats.TestingKnobs).OnStmtStatsFlushFinished = injector.invokePostStmtStatsFlushCallback
params.Knobs.SQLStatsKnobs.(*sqlstats.TestingKnobs).OnTxnStatsFlushFinished = injector.invokePostTxnStatsFlushCallback

cluster := serverutils.StartNewTestCluster(t, 3 /* numNodes */, base.TestClusterArgs{
ServerArgs: params,
Expand All @@ -84,64 +93,127 @@ func TestSQLStatsDataDriven(t *testing.T) {

observer := sqlutils.MakeSQLRunner(observerConn)

execDataDrivenTestCmd := func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "exec-sql":
stmts := strings.Split(d.Input, "\n")
for i := range stmts {
_, err := sqlConn.Exec(stmts[i])
if err != nil {
t.Errorf("failed to execute stmt %s due to %s", stmts[i], err.Error())
}
}
case "observe-sql":
actual := observer.QueryStr(t, d.Input)
rows := make([]string, len(actual))

for rowIdx := range actual {
rows[rowIdx] = strings.Join(actual[rowIdx], ",")
}
return strings.Join(rows, "\n")
case "sql-stats-flush":
sqlStats.Flush(ctx)
case "set-time":
mustHaveArgsOrFatal(t, d, timeArgs)

var timeString string
d.ScanArgs(t, timeArgs, &timeString)
tm, err := time.Parse(time.RFC3339, timeString)
if err != nil {
return err.Error()
}
stubTime.setTime(tm)
return stubTime.Now().String()
case "should-sample-logical-plan":
mustHaveArgsOrFatal(t, d, fingerprintArgs, implicitTxnArgs, dbNameArgs)

var dbName string
var implicitTxn bool
var fingerprint string

d.ScanArgs(t, fingerprintArgs, &fingerprint)
d.ScanArgs(t, implicitTxnArgs, &implicitTxn)
d.ScanArgs(t, dbNameArgs, &dbName)

// Since the data driven tests framework does not support space
// in args, we used % as a placeholder for space and manually replace
// them.
fingerprint = strings.Replace(fingerprint, "%", " ", -1)

return fmt.Sprintf("%t",
appStats.ShouldSaveLogicalPlanDesc(
fingerprint,
implicitTxn,
dbName,
),
)
}

return ""
}

datadriven.Walk(t, "testdata/", func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "exec-sql":
stmts := strings.Split(d.Input, "\n")
for i := range stmts {
_, err := sqlConn.Exec(stmts[i])
if err != nil {
t.Errorf("failed to execute stmt %s due to %s", stmts[i], err.Error())
}
if d.Cmd == "register-callback" {
mustHaveArgsOrFatal(
t,
d,
eventArgs,
eventCallbackCmd,
eventCallbackCmdArgKey,
eventCallbackCmdArgValue,
)

var event string
var callbackCmd string
var callbackCmdArgKey string
var callbackCmdArgValue string

d.ScanArgs(t, eventArgs, &event)
d.ScanArgs(t, eventCallbackCmd, &callbackCmd)
d.ScanArgs(t, eventCallbackCmdArgKey, &callbackCmdArgKey)
d.ScanArgs(t, eventCallbackCmdArgValue, &callbackCmdArgValue)

injectedDataDrivenCmd := &datadriven.TestData{
Cmd: callbackCmd,
CmdArgs: []datadriven.CmdArg{
{
Key: callbackCmdArgKey,
Vals: []string{
callbackCmdArgValue,
},
},
},
}
case "observe-sql":
actual := observer.QueryStr(t, d.Input)
rows := make([]string, len(actual))

for rowIdx := range actual {
rows[rowIdx] = strings.Join(actual[rowIdx], ",")
switch event {
case "stmt-stats-flushed":
injector.registerPostStmtStatsFlushCallback(func() {
execDataDrivenTestCmd(t, injectedDataDrivenCmd)
})
case "txn-stats-flushed":
injector.registerPostTxnStatsFlushCallback(func() {
execDataDrivenTestCmd(t, injectedDataDrivenCmd)
})
default:
return "invalid callback event"
}
return strings.Join(rows, "\n")
case "sql-stats-flush":
sqlStats.Flush(ctx)
case "set-time":
mustHaveArgsOrFatal(t, d, timeArgs)

var timeString string
d.ScanArgs(t, timeArgs, &timeString)
tm, err := time.Parse(time.RFC3339, timeString)
if err != nil {
return err.Error()
} else if d.Cmd == "unregister-callback" {
mustHaveArgsOrFatal(t, d, eventArgs)

var event string
d.ScanArgs(t, eventArgs, &event)

switch event {
case "stmt-stats-flushed":
injector.unregisterPostStmtStatsFlushCallback()
case "txn-stats-flushed":
injector.unregisterPostTxnStatsFlushCallback()
default:
return "invalid event"
}
stubTime.setTime(tm)
return stubTime.Now().String()
case "should-sample-logical-plan":
mustHaveArgsOrFatal(t, d, fingerprintArgs, implicitTxnArgs, dbNameArgs)

var dbName string
var implicitTxn bool
var fingerprint string

d.ScanArgs(t, fingerprintArgs, &fingerprint)
d.ScanArgs(t, implicitTxnArgs, &implicitTxn)
d.ScanArgs(t, dbNameArgs, &dbName)

// Since the data driven tests framework does not support space
// in args, we used % as a placeholder for space and manually replace
// them.
fingerprint = strings.Replace(fingerprint, "%", " ", -1)

return fmt.Sprintf("%t",
appStats.ShouldSaveLogicalPlanDesc(
fingerprint,
implicitTxn,
dbName,
),
)
}

return ""
return execDataDrivenTestCmd(t, d)
})
})
}
Expand All @@ -153,3 +225,56 @@ func mustHaveArgsOrFatal(t *testing.T, d *datadriven.TestData, args ...string) {
}
}
}

type runtimeKnobsInjector struct {
mu struct {
syncutil.Mutex
knobs *sqlstats.TestingKnobs
}
}

func newRuntimeKnobsInjector() *runtimeKnobsInjector {
r := &runtimeKnobsInjector{}
r.mu.knobs = &sqlstats.TestingKnobs{}
return r
}

func (r *runtimeKnobsInjector) registerPostStmtStatsFlushCallback(cb func()) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.knobs.OnStmtStatsFlushFinished = cb
}

func (r *runtimeKnobsInjector) unregisterPostStmtStatsFlushCallback() {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.knobs.OnStmtStatsFlushFinished = nil
}

func (r *runtimeKnobsInjector) invokePostStmtStatsFlushCallback() {
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.knobs.OnStmtStatsFlushFinished != nil {
r.mu.knobs.OnStmtStatsFlushFinished()
}
}

func (r *runtimeKnobsInjector) registerPostTxnStatsFlushCallback(cb func()) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.knobs.OnTxnStatsFlushFinished = cb
}

func (r *runtimeKnobsInjector) unregisterPostTxnStatsFlushCallback() {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.knobs.OnTxnStatsFlushFinished = nil
}

func (r *runtimeKnobsInjector) invokePostTxnStatsFlushCallback() {
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.knobs.OnTxnStatsFlushFinished != nil {
r.mu.knobs.OnTxnStatsFlushFinished()
}
}
57 changes: 37 additions & 20 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,47 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) {
log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s",
s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted))

// The flush routine directly logs errors if they are encountered. Therefore,
// no error is returned here.
_ = s.SQLStats.IterateStatementStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleStmtStats(ctx, statistics)
}, "failed to flush statement statistics" /* errMsg */)

return nil
})
_ = s.SQLStats.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedTransactionStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleTxnStats(ctx, statistics)
}, "failed to flush transaction statistics" /* errMsg */)
aggregatedTs := s.computeAggregatedTs()
s.lastFlushStarted = s.getTimeNow()

return nil
})
s.flushStmtStats(ctx, aggregatedTs)
s.flushTxnStats(ctx, aggregatedTs)

if err := s.SQLStats.Reset(ctx); err != nil {
log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
}
}

s.lastFlushStarted = s.getTimeNow()
func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) {
// s.doFlush directly logs errors if they are encountered. Therefore,
// no error is returned here.
_ = s.SQLStats.IterateStatementStats(ctx, &sqlstats.IteratorOptions{},
func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleStmtStats(ctx, statistics, aggregatedTs)
}, "failed to flush statement statistics" /* errMsg */)

return nil
})

if s.cfg.Knobs != nil && s.cfg.Knobs.OnStmtStatsFlushFinished != nil {
s.cfg.Knobs.OnStmtStatsFlushFinished()
}
}

func (s *PersistedSQLStats) flushTxnStats(ctx context.Context, aggregatedTs time.Time) {
_ = s.SQLStats.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{},
func(ctx context.Context, statistics *roachpb.CollectedTransactionStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleTxnStats(ctx, statistics, aggregatedTs)
}, "failed to flush transaction statistics" /* errMsg */)

return nil
})

if s.cfg.Knobs != nil && s.cfg.Knobs.OnTxnStatsFlushFinished != nil {
s.cfg.Knobs.OnTxnStatsFlushFinished()
}
}

func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, errMsg string) {
Expand All @@ -74,13 +93,12 @@ func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, er
}

func (s *PersistedSQLStats) doFlushSingleTxnStats(
ctx context.Context, stats *roachpb.CollectedTransactionStatistics,
ctx context.Context, stats *roachpb.CollectedTransactionStatistics, aggregatedTs time.Time,
) error {
return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Explicitly copy the stats variable so the txn closure is retryable.
scopedStats := *stats

aggregatedTs := s.computeAggregatedTs()
serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(stats.TransactionFingerprintID))

insertFn := func(ctx context.Context, txn *kv.Txn) (alreadyExists bool, err error) {
Expand Down Expand Up @@ -121,13 +139,12 @@ func (s *PersistedSQLStats) doFlushSingleTxnStats(
}

func (s *PersistedSQLStats) doFlushSingleStmtStats(
ctx context.Context, stats *roachpb.CollectedStatementStatistics,
ctx context.Context, stats *roachpb.CollectedStatementStatistics, aggregatedTs time.Time,
) error {
return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Explicitly copy the stats so that this closure is retryable.
scopedStats := *stats

aggregatedTs := s.computeAggregatedTs()
serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.ID))
serializedTransactionFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.Key.TransactionFingerprintID))
serializedPlanHash := sqlstatsutil.EncodeUint64ToBytes(uint64(dummyPlanHash))
Expand Down
Loading

0 comments on commit e9b69a8

Please sign in to comment.