Skip to content

Commit

Permalink
sql: consistent aggregated timestamp when flushing sql stats
Browse files Browse the repository at this point in the history
Previously, when SQL Stats are flushed to system table, the
aggregatedTs column for SQL Stats is calculated for each stats
entry individually. This means that if a flush starts near
the end of each hour, it is possible that different stats
rows will be assigned two different aggregated timestamp.
Currently, flusher flushes statement statistics first, and only
when statement statistics are flushed, transaction statistics
will be flushed into system table. This means that it is likely
the transaction statistics will get assigned a different
aggregatedTs than the statement statistics.
Consequentially, when the frontend fetches SQL Stats through
the CombinedStmtStats handler, the frontend default performs
range scan at 1 hour interval. This triggers a range scan
on the system table for that 1 hour range. This causes the
statement statisitcs that got assigned a different aggregatedTs
to be omitted from the result.

This commit changed the flusher to only compute aggregatedTs once
before the flush actually happen, and assign that aggregatedTs
too *all* stmt/txn stats rows. Statements executed in the same
aggregation interval can be looked up by the corresponding
statement fingerprint ID stored in transaction stats metadata.

Release note: None
  • Loading branch information
Azhng committed Oct 19, 2021
1 parent 5d8a61b commit f96d3a3
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 78 deletions.
235 changes: 180 additions & 55 deletions pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/datadriven"
)

const (
timeArgs = "time"
dbNameArgs = "db"
implicitTxnArgs = "implicitTxn"
fingerprintArgs = "fingerprint"
timeArgs = "time"
dbNameArgs = "db"
implicitTxnArgs = "implicitTxn"
fingerprintArgs = "fingerprint"
eventArgs = "event"
eventCallbackCmd = "callbackCmd"
eventCallbackCmdArgKey = "callbackCmdArgKey"
eventCallbackCmdArgValue = "callbackCmdArgValue"
)

// TestSQLStatsDataDriven runs the data-driven tests in
Expand All @@ -63,9 +68,13 @@ func TestSQLStatsDataDriven(t *testing.T) {
defer log.Scope(t).Close(t)

stubTime := &stubTime{}
injector := newRuntimeKnobsInjector()

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
params.Knobs.SQLStatsKnobs.(*sqlstats.TestingKnobs).StubTimeNow = stubTime.Now
params.Knobs.SQLStatsKnobs.(*sqlstats.TestingKnobs).OnStmtStatsFlushFinished = injector.invokePostStmtStatsFlushCallback
params.Knobs.SQLStatsKnobs.(*sqlstats.TestingKnobs).OnTxnStatsFlushFinished = injector.invokePostTxnStatsFlushCallback

cluster := serverutils.StartNewTestCluster(t, 3 /* numNodes */, base.TestClusterArgs{
ServerArgs: params,
Expand All @@ -84,64 +93,127 @@ func TestSQLStatsDataDriven(t *testing.T) {

observer := sqlutils.MakeSQLRunner(observerConn)

execDataDrivenTestCmd := func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "exec-sql":
stmts := strings.Split(d.Input, "\n")
for i := range stmts {
_, err := sqlConn.Exec(stmts[i])
if err != nil {
t.Errorf("failed to execute stmt %s due to %s", stmts[i], err.Error())
}
}
case "observe-sql":
actual := observer.QueryStr(t, d.Input)
rows := make([]string, len(actual))

for rowIdx := range actual {
rows[rowIdx] = strings.Join(actual[rowIdx], ",")
}
return strings.Join(rows, "\n")
case "sql-stats-flush":
sqlStats.Flush(ctx)
case "set-time":
mustHaveArgsOrFatal(t, d, timeArgs)

var timeString string
d.ScanArgs(t, timeArgs, &timeString)
tm, err := time.Parse(time.RFC3339, timeString)
if err != nil {
return err.Error()
}
stubTime.setTime(tm)
return stubTime.Now().String()
case "should-sample-logical-plan":
mustHaveArgsOrFatal(t, d, fingerprintArgs, implicitTxnArgs, dbNameArgs)

var dbName string
var implicitTxn bool
var fingerprint string

d.ScanArgs(t, fingerprintArgs, &fingerprint)
d.ScanArgs(t, implicitTxnArgs, &implicitTxn)
d.ScanArgs(t, dbNameArgs, &dbName)

// Since the data driven tests framework does not support space
// in args, we used % as a placeholder for space and manually replace
// them.
fingerprint = strings.Replace(fingerprint, "%", " ", -1)

return fmt.Sprintf("%t",
appStats.ShouldSaveLogicalPlanDesc(
fingerprint,
implicitTxn,
dbName,
),
)
}

return ""
}

datadriven.Walk(t, "testdata/", func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "exec-sql":
stmts := strings.Split(d.Input, "\n")
for i := range stmts {
_, err := sqlConn.Exec(stmts[i])
if err != nil {
t.Errorf("failed to execute stmt %s due to %s", stmts[i], err.Error())
}
if d.Cmd == "register-callback" {
mustHaveArgsOrFatal(
t,
d,
eventArgs,
eventCallbackCmd,
eventCallbackCmdArgKey,
eventCallbackCmdArgValue,
)

var event string
var callbackCmd string
var callbackCmdArgKey string
var callbackCmdArgValue string

d.ScanArgs(t, eventArgs, &event)
d.ScanArgs(t, eventCallbackCmd, &callbackCmd)
d.ScanArgs(t, eventCallbackCmdArgKey, &callbackCmdArgKey)
d.ScanArgs(t, eventCallbackCmdArgValue, &callbackCmdArgValue)

injectedDataDrivenCmd := &datadriven.TestData{
Cmd: callbackCmd,
CmdArgs: []datadriven.CmdArg{
{
Key: callbackCmdArgKey,
Vals: []string{
callbackCmdArgValue,
},
},
},
}
case "observe-sql":
actual := observer.QueryStr(t, d.Input)
rows := make([]string, len(actual))

for rowIdx := range actual {
rows[rowIdx] = strings.Join(actual[rowIdx], ",")
switch event {
case "stmt-stats-flushed":
injector.registerPostStmtStatsFlushCallback(func() {
execDataDrivenTestCmd(t, injectedDataDrivenCmd)
})
case "txn-stats-flushed":
injector.registerPostTxnStatsFlushCallback(func() {
execDataDrivenTestCmd(t, injectedDataDrivenCmd)
})
default:
return "invalid callback event"
}
return strings.Join(rows, "\n")
case "sql-stats-flush":
sqlStats.Flush(ctx)
case "set-time":
mustHaveArgsOrFatal(t, d, timeArgs)

var timeString string
d.ScanArgs(t, timeArgs, &timeString)
tm, err := time.Parse(time.RFC3339, timeString)
if err != nil {
return err.Error()
} else if d.Cmd == "unregister-callback" {
mustHaveArgsOrFatal(t, d, eventArgs)

var event string
d.ScanArgs(t, eventArgs, &event)

switch event {
case "stmt-stats-flushed":
injector.unregisterPostStmtStatsFlushCallback()
case "txn-stats-flushed":
injector.unregisterPostTxnStatsFlushCallback()
default:
return "invalid event"
}
stubTime.setTime(tm)
return stubTime.Now().String()
case "should-sample-logical-plan":
mustHaveArgsOrFatal(t, d, fingerprintArgs, implicitTxnArgs, dbNameArgs)

var dbName string
var implicitTxn bool
var fingerprint string

d.ScanArgs(t, fingerprintArgs, &fingerprint)
d.ScanArgs(t, implicitTxnArgs, &implicitTxn)
d.ScanArgs(t, dbNameArgs, &dbName)

// Since the data driven tests framework does not support space
// in args, we used % as a placeholder for space and manually replace
// them.
fingerprint = strings.Replace(fingerprint, "%", " ", -1)

return fmt.Sprintf("%t",
appStats.ShouldSaveLogicalPlanDesc(
fingerprint,
implicitTxn,
dbName,
),
)
}

return ""
return execDataDrivenTestCmd(t, d)
})
})
}
Expand All @@ -153,3 +225,56 @@ func mustHaveArgsOrFatal(t *testing.T, d *datadriven.TestData, args ...string) {
}
}
}

type runtimeKnobsInjector struct {
mu struct {
syncutil.Mutex
knobs *sqlstats.TestingKnobs
}
}

func newRuntimeKnobsInjector() *runtimeKnobsInjector {
r := &runtimeKnobsInjector{}
r.mu.knobs = &sqlstats.TestingKnobs{}
return r
}

func (r *runtimeKnobsInjector) registerPostStmtStatsFlushCallback(cb func()) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.knobs.OnStmtStatsFlushFinished = cb
}

func (r *runtimeKnobsInjector) unregisterPostStmtStatsFlushCallback() {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.knobs.OnStmtStatsFlushFinished = nil
}

func (r *runtimeKnobsInjector) invokePostStmtStatsFlushCallback() {
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.knobs.OnStmtStatsFlushFinished != nil {
r.mu.knobs.OnStmtStatsFlushFinished()
}
}

func (r *runtimeKnobsInjector) registerPostTxnStatsFlushCallback(cb func()) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.knobs.OnTxnStatsFlushFinished = cb
}

func (r *runtimeKnobsInjector) unregisterPostTxnStatsFlushCallback() {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.knobs.OnTxnStatsFlushFinished = nil
}

func (r *runtimeKnobsInjector) invokePostTxnStatsFlushCallback() {
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.knobs.OnTxnStatsFlushFinished != nil {
r.mu.knobs.OnTxnStatsFlushFinished()
}
}
57 changes: 37 additions & 20 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,47 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) {
log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s",
s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted))

// The flush routine directly logs errors if they are encountered. Therefore,
// no error is returned here.
_ = s.SQLStats.IterateStatementStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleStmtStats(ctx, statistics)
}, "failed to flush statement statistics" /* errMsg */)

return nil
})
_ = s.SQLStats.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedTransactionStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleTxnStats(ctx, statistics)
}, "failed to flush transaction statistics" /* errMsg */)
aggregatedTs := s.computeAggregatedTs()
s.lastFlushStarted = s.getTimeNow()

return nil
})
s.flushStmtStats(ctx, aggregatedTs)
s.flushTxnStats(ctx, aggregatedTs)

if err := s.SQLStats.Reset(ctx); err != nil {
log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
}
}

s.lastFlushStarted = s.getTimeNow()
func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) {
// s.doFlush directly logs errors if they are encountered. Therefore,
// no error is returned here.
_ = s.SQLStats.IterateStatementStats(ctx, &sqlstats.IteratorOptions{},
func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleStmtStats(ctx, statistics, aggregatedTs)
}, "failed to flush statement statistics" /* errMsg */)

return nil
})

if s.cfg.Knobs != nil && s.cfg.Knobs.OnStmtStatsFlushFinished != nil {
s.cfg.Knobs.OnStmtStatsFlushFinished()
}
}

func (s *PersistedSQLStats) flushTxnStats(ctx context.Context, aggregatedTs time.Time) {
_ = s.SQLStats.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{},
func(ctx context.Context, statistics *roachpb.CollectedTransactionStatistics) error {
s.doFlush(ctx, func() error {
return s.doFlushSingleTxnStats(ctx, statistics, aggregatedTs)
}, "failed to flush transaction statistics" /* errMsg */)

return nil
})

if s.cfg.Knobs != nil && s.cfg.Knobs.OnTxnStatsFlushFinished != nil {
s.cfg.Knobs.OnTxnStatsFlushFinished()
}
}

func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, errMsg string) {
Expand All @@ -74,13 +93,12 @@ func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, er
}

func (s *PersistedSQLStats) doFlushSingleTxnStats(
ctx context.Context, stats *roachpb.CollectedTransactionStatistics,
ctx context.Context, stats *roachpb.CollectedTransactionStatistics, aggregatedTs time.Time,
) error {
return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Explicitly copy the stats variable so the txn closure is retryable.
scopedStats := *stats

aggregatedTs := s.computeAggregatedTs()
serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(stats.TransactionFingerprintID))

insertFn := func(ctx context.Context, txn *kv.Txn) (alreadyExists bool, err error) {
Expand Down Expand Up @@ -121,13 +139,12 @@ func (s *PersistedSQLStats) doFlushSingleTxnStats(
}

func (s *PersistedSQLStats) doFlushSingleStmtStats(
ctx context.Context, stats *roachpb.CollectedStatementStatistics,
ctx context.Context, stats *roachpb.CollectedStatementStatistics, aggregatedTs time.Time,
) error {
return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Explicitly copy the stats so that this closure is retryable.
scopedStats := *stats

aggregatedTs := s.computeAggregatedTs()
serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.ID))
serializedTransactionFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.Key.TransactionFingerprintID))
serializedPlanHash := sqlstatsutil.EncodeUint64ToBytes(uint64(dummyPlanHash))
Expand Down
Loading

0 comments on commit f96d3a3

Please sign in to comment.