diff --git a/pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go b/pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go index 0ce1b6ef9976..fd956117233d 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/datadriven_test.go @@ -29,14 +29,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" ) const ( - timeArgs = "time" - dbNameArgs = "db" - implicitTxnArgs = "implicitTxn" - fingerprintArgs = "fingerprint" + timeArgs = "time" + dbNameArgs = "db" + implicitTxnArgs = "implicitTxn" + fingerprintArgs = "fingerprint" + eventArgs = "event" + eventCallbackCmd = "callbackCmd" + eventCallbackCmdArgKey = "callbackCmdArgKey" + eventCallbackCmdArgValue = "callbackCmdArgValue" ) // TestSQLStatsDataDriven runs the data-driven tests in @@ -63,9 +68,13 @@ func TestSQLStatsDataDriven(t *testing.T) { defer log.Scope(t).Close(t) stubTime := &stubTime{} + injector := newRuntimeKnobsInjector() + ctx := context.Background() params, _ := tests.CreateTestServerParams() params.Knobs.SQLStatsKnobs.(*sqlstats.TestingKnobs).StubTimeNow = stubTime.Now + params.Knobs.SQLStatsKnobs.(*sqlstats.TestingKnobs).OnStmtStatsFlushFinished = injector.invokePostStmtStatsFlushCallback + params.Knobs.SQLStatsKnobs.(*sqlstats.TestingKnobs).OnTxnStatsFlushFinished = injector.invokePostTxnStatsFlushCallback cluster := serverutils.StartNewTestCluster(t, 3 /* numNodes */, base.TestClusterArgs{ ServerArgs: params, @@ -84,64 +93,127 @@ func TestSQLStatsDataDriven(t *testing.T) { observer := sqlutils.MakeSQLRunner(observerConn) + execDataDrivenTestCmd := func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "exec-sql": + stmts := strings.Split(d.Input, "\n") + for i := range stmts { + _, err := sqlConn.Exec(stmts[i]) + if err != nil { + t.Errorf("failed to execute stmt %s due to %s", stmts[i], err.Error()) + } + } + case "observe-sql": + actual := observer.QueryStr(t, d.Input) + rows := make([]string, len(actual)) + + for rowIdx := range actual { + rows[rowIdx] = strings.Join(actual[rowIdx], ",") + } + return strings.Join(rows, "\n") + case "sql-stats-flush": + sqlStats.Flush(ctx) + case "set-time": + mustHaveArgsOrFatal(t, d, timeArgs) + + var timeString string + d.ScanArgs(t, timeArgs, &timeString) + tm, err := time.Parse(time.RFC3339, timeString) + if err != nil { + return err.Error() + } + stubTime.setTime(tm) + return stubTime.Now().String() + case "should-sample-logical-plan": + mustHaveArgsOrFatal(t, d, fingerprintArgs, implicitTxnArgs, dbNameArgs) + + var dbName string + var implicitTxn bool + var fingerprint string + + d.ScanArgs(t, fingerprintArgs, &fingerprint) + d.ScanArgs(t, implicitTxnArgs, &implicitTxn) + d.ScanArgs(t, dbNameArgs, &dbName) + + // Since the data driven tests framework does not support space + // in args, we used % as a placeholder for space and manually replace + // them. + fingerprint = strings.Replace(fingerprint, "%", " ", -1) + + return fmt.Sprintf("%t", + appStats.ShouldSaveLogicalPlanDesc( + fingerprint, + implicitTxn, + dbName, + ), + ) + } + + return "" + } + datadriven.Walk(t, "testdata/", func(t *testing.T, path string) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { - switch d.Cmd { - case "exec-sql": - stmts := strings.Split(d.Input, "\n") - for i := range stmts { - _, err := sqlConn.Exec(stmts[i]) - if err != nil { - t.Errorf("failed to execute stmt %s due to %s", stmts[i], err.Error()) - } + if d.Cmd == "register-callback" { + mustHaveArgsOrFatal( + t, + d, + eventArgs, + eventCallbackCmd, + eventCallbackCmdArgKey, + eventCallbackCmdArgValue, + ) + + var event string + var callbackCmd string + var callbackCmdArgKey string + var callbackCmdArgValue string + + d.ScanArgs(t, eventArgs, &event) + d.ScanArgs(t, eventCallbackCmd, &callbackCmd) + d.ScanArgs(t, eventCallbackCmdArgKey, &callbackCmdArgKey) + d.ScanArgs(t, eventCallbackCmdArgValue, &callbackCmdArgValue) + + injectedDataDrivenCmd := &datadriven.TestData{ + Cmd: callbackCmd, + CmdArgs: []datadriven.CmdArg{ + { + Key: callbackCmdArgKey, + Vals: []string{ + callbackCmdArgValue, + }, + }, + }, } - case "observe-sql": - actual := observer.QueryStr(t, d.Input) - rows := make([]string, len(actual)) - for rowIdx := range actual { - rows[rowIdx] = strings.Join(actual[rowIdx], ",") + switch event { + case "stmt-stats-flushed": + injector.registerPostStmtStatsFlushCallback(func() { + execDataDrivenTestCmd(t, injectedDataDrivenCmd) + }) + case "txn-stats-flushed": + injector.registerPostTxnStatsFlushCallback(func() { + execDataDrivenTestCmd(t, injectedDataDrivenCmd) + }) + default: + return "invalid callback event" } - return strings.Join(rows, "\n") - case "sql-stats-flush": - sqlStats.Flush(ctx) - case "set-time": - mustHaveArgsOrFatal(t, d, timeArgs) - - var timeString string - d.ScanArgs(t, timeArgs, &timeString) - tm, err := time.Parse(time.RFC3339, timeString) - if err != nil { - return err.Error() + } else if d.Cmd == "unregister-callback" { + mustHaveArgsOrFatal(t, d, eventArgs) + + var event string + d.ScanArgs(t, eventArgs, &event) + + switch event { + case "stmt-stats-flushed": + injector.unregisterPostStmtStatsFlushCallback() + case "txn-stats-flushed": + injector.unregisterPostTxnStatsFlushCallback() + default: + return "invalid event" } - stubTime.setTime(tm) - return stubTime.Now().String() - case "should-sample-logical-plan": - mustHaveArgsOrFatal(t, d, fingerprintArgs, implicitTxnArgs, dbNameArgs) - - var dbName string - var implicitTxn bool - var fingerprint string - - d.ScanArgs(t, fingerprintArgs, &fingerprint) - d.ScanArgs(t, implicitTxnArgs, &implicitTxn) - d.ScanArgs(t, dbNameArgs, &dbName) - - // Since the data driven tests framework does not support space - // in args, we used % as a placeholder for space and manually replace - // them. - fingerprint = strings.Replace(fingerprint, "%", " ", -1) - - return fmt.Sprintf("%t", - appStats.ShouldSaveLogicalPlanDesc( - fingerprint, - implicitTxn, - dbName, - ), - ) } - - return "" + return execDataDrivenTestCmd(t, d) }) }) } @@ -153,3 +225,56 @@ func mustHaveArgsOrFatal(t *testing.T, d *datadriven.TestData, args ...string) { } } } + +type runtimeKnobsInjector struct { + mu struct { + syncutil.Mutex + knobs *sqlstats.TestingKnobs + } +} + +func newRuntimeKnobsInjector() *runtimeKnobsInjector { + r := &runtimeKnobsInjector{} + r.mu.knobs = &sqlstats.TestingKnobs{} + return r +} + +func (r *runtimeKnobsInjector) registerPostStmtStatsFlushCallback(cb func()) { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.knobs.OnStmtStatsFlushFinished = cb +} + +func (r *runtimeKnobsInjector) unregisterPostStmtStatsFlushCallback() { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.knobs.OnStmtStatsFlushFinished = nil +} + +func (r *runtimeKnobsInjector) invokePostStmtStatsFlushCallback() { + r.mu.Lock() + defer r.mu.Unlock() + if r.mu.knobs.OnStmtStatsFlushFinished != nil { + r.mu.knobs.OnStmtStatsFlushFinished() + } +} + +func (r *runtimeKnobsInjector) registerPostTxnStatsFlushCallback(cb func()) { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.knobs.OnTxnStatsFlushFinished = cb +} + +func (r *runtimeKnobsInjector) unregisterPostTxnStatsFlushCallback() { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.knobs.OnTxnStatsFlushFinished = nil +} + +func (r *runtimeKnobsInjector) invokePostTxnStatsFlushCallback() { + r.mu.Lock() + defer r.mu.Unlock() + if r.mu.knobs.OnTxnStatsFlushFinished != nil { + r.mu.knobs.OnTxnStatsFlushFinished() + } +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go index 6a9ddb69ad1a..341ea9dea843 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/flush.go +++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go @@ -32,28 +32,47 @@ func (s *PersistedSQLStats) Flush(ctx context.Context) { log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s", s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted)) - // The flush routine directly logs errors if they are encountered. Therefore, - // no error is returned here. - _ = s.SQLStats.IterateStatementStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error { - s.doFlush(ctx, func() error { - return s.doFlushSingleStmtStats(ctx, statistics) - }, "failed to flush statement statistics" /* errMsg */) - - return nil - }) - _ = s.SQLStats.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedTransactionStatistics) error { - s.doFlush(ctx, func() error { - return s.doFlushSingleTxnStats(ctx, statistics) - }, "failed to flush transaction statistics" /* errMsg */) + aggregatedTs := s.computeAggregatedTs() + s.lastFlushStarted = s.getTimeNow() - return nil - }) + s.flushStmtStats(ctx, aggregatedTs) + s.flushTxnStats(ctx, aggregatedTs) if err := s.SQLStats.Reset(ctx); err != nil { log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err) } +} - s.lastFlushStarted = s.getTimeNow() +func (s *PersistedSQLStats) flushStmtStats(ctx context.Context, aggregatedTs time.Time) { + // s.doFlush directly logs errors if they are encountered. Therefore, + // no error is returned here. + _ = s.SQLStats.IterateStatementStats(ctx, &sqlstats.IteratorOptions{}, + func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error { + s.doFlush(ctx, func() error { + return s.doFlushSingleStmtStats(ctx, statistics, aggregatedTs) + }, "failed to flush statement statistics" /* errMsg */) + + return nil + }) + + if s.cfg.Knobs != nil && s.cfg.Knobs.OnStmtStatsFlushFinished != nil { + s.cfg.Knobs.OnStmtStatsFlushFinished() + } +} + +func (s *PersistedSQLStats) flushTxnStats(ctx context.Context, aggregatedTs time.Time) { + _ = s.SQLStats.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{}, + func(ctx context.Context, statistics *roachpb.CollectedTransactionStatistics) error { + s.doFlush(ctx, func() error { + return s.doFlushSingleTxnStats(ctx, statistics, aggregatedTs) + }, "failed to flush transaction statistics" /* errMsg */) + + return nil + }) + + if s.cfg.Knobs != nil && s.cfg.Knobs.OnTxnStatsFlushFinished != nil { + s.cfg.Knobs.OnTxnStatsFlushFinished() + } } func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, errMsg string) { @@ -74,13 +93,12 @@ func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, er } func (s *PersistedSQLStats) doFlushSingleTxnStats( - ctx context.Context, stats *roachpb.CollectedTransactionStatistics, + ctx context.Context, stats *roachpb.CollectedTransactionStatistics, aggregatedTs time.Time, ) error { return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Explicitly copy the stats variable so the txn closure is retryable. scopedStats := *stats - aggregatedTs := s.computeAggregatedTs() serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(stats.TransactionFingerprintID)) insertFn := func(ctx context.Context, txn *kv.Txn) (alreadyExists bool, err error) { @@ -121,13 +139,12 @@ func (s *PersistedSQLStats) doFlushSingleTxnStats( } func (s *PersistedSQLStats) doFlushSingleStmtStats( - ctx context.Context, stats *roachpb.CollectedStatementStatistics, + ctx context.Context, stats *roachpb.CollectedStatementStatistics, aggregatedTs time.Time, ) error { return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Explicitly copy the stats so that this closure is retryable. scopedStats := *stats - aggregatedTs := s.computeAggregatedTs() serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.ID)) serializedTransactionFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.Key.TransactionFingerprintID)) serializedPlanHash := sqlstatsutil.EncodeUint64ToBytes(uint64(dummyPlanHash)) diff --git a/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp b/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp new file mode 100644 index 000000000000..4f465f06cde8 --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/testdata/consistent_flush_timestamp @@ -0,0 +1,99 @@ +exec-sql +SET application_name = 'consistent-test' +---- + +register-callback event=stmt-stats-flushed callbackCmd=set-time callbackCmdArgKey=time callbackCmdArgValue=2021-09-20T15:00:01Z +---- + +set-time time=2021-09-20T14:59:59Z +---- +2021-09-20 14:59:59 +0000 UTC + +exec-sql +SELECT 1 +---- + +observe-sql +SELECT + aggregated_ts, + encode(fingerprint_id, 'hex'), + metadata -> 'stmtFingerprintIDs' +FROM + crdb_internal.transaction_statistics +WHERE + app_name = 'consistent-test' +---- +2021-09-20 14:00:00 +0000 UTC,705fcdf3f12803ec,["df3c70bf7729b433"] + + +observe-sql +SELECT + aggregated_ts, + encode(fingerprint_id, 'hex'), + metadata ->> 'query' +FROM + crdb_internal.statement_statistics +WHERE + app_name = 'consistent-test' +---- +2021-09-20 14:00:00 +0000 UTC,df3c70bf7729b433,SELECT _ + +sql-stats-flush +---- + +observe-sql +SELECT + aggregated_ts, + encode(fingerprint_id, 'hex'), + metadata ->> 'query' +FROM + crdb_internal.statement_statistics +WHERE + app_name = 'consistent-test' +---- +2021-09-20 14:00:00 +0000 UTC,df3c70bf7729b433,SELECT _ + +observe-sql +SELECT + aggregated_ts, + encode(fingerprint_id, 'hex'), + metadata -> 'stmtFingerprintIDs' +FROM + crdb_internal.transaction_statistics +WHERE + app_name = 'consistent-test' +---- +2021-09-20 14:00:00 +0000 UTC,705fcdf3f12803ec,["df3c70bf7729b433"] + +exec-sql +SELECT 1 +---- + +observe-sql +SELECT + aggregated_ts, + encode(fingerprint_id, 'hex'), + metadata ->> 'query' +FROM + crdb_internal.statement_statistics +WHERE + app_name = 'consistent-test' +---- +2021-09-20 14:00:00 +0000 UTC,df3c70bf7729b433,SELECT _ +2021-09-20 15:00:00 +0000 UTC,df3c70bf7729b433,SELECT _ + +observe-sql +SELECT + aggregated_ts, + encode(fingerprint_id, 'hex'), + metadata -> 'stmtFingerprintIDs' +FROM + crdb_internal.transaction_statistics +WHERE + app_name = 'consistent-test' +---- +2021-09-20 14:00:00 +0000 UTC,705fcdf3f12803ec,["df3c70bf7729b433"] +2021-09-20 15:00:00 +0000 UTC,705fcdf3f12803ec,["df3c70bf7729b433"] + +unregister-callback event=stmt-stats-flushed +---- diff --git a/pkg/sql/sqlstats/test_utils.go b/pkg/sql/sqlstats/test_utils.go index dea53ffab698..f8afd65bc8d2 100644 --- a/pkg/sql/sqlstats/test_utils.go +++ b/pkg/sql/sqlstats/test_utils.go @@ -14,9 +14,13 @@ import "time" // TestingKnobs provides hooks and knobs for unit tests. type TestingKnobs struct { - // OnStatsFlushFinished is a callback that is triggered when a single - // statistics object is flushed. - OnStatsFlushFinished func(error) + // OnStmtStatsFlushFinished is a callback that is triggered when stmt stats + // finishes flushing. + OnStmtStatsFlushFinished func() + + // OnTxnStatsFlushFinished is a callback that is triggered when txn stats + // finishes flushing. + OnTxnStatsFlushFinished func() // StubTimeNow allows tests to override the timeutil.Now() function used // by the flush operation to calculate aggregated_ts timestamp.