diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 15969ca0fff9..cc042a51e9d0 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -2305,6 +2305,32 @@ are automatically converted server-side. Events in this category are logged to the `TELEMETRY` channel. +### `captured_index_usage_stats` + +An event of type `captured_index_usage_stats` + + +| Field | Description | Sensitive | +|--|--|--| +| `TotalReadCount` | TotalReadCount is the number of times this index has been read from. | no | +| `LastRead` | LastRead is the timestamp that this index was last being read from. | yes | +| `TableID` | TableID is the ID of the table this index is created on. This is same as descpb.TableID and is unique within the cluster. | no | +| `IndexID` | IndexID is the ID of the index within the scope of the given table. | no | +| `DatabaseName` | | yes | +| `TableName` | | yes | +| `IndexName` | | yes | +| `IndexType` | | yes | +| `IsUnique` | | no | +| `IsInverted` | | no | + + +#### Common fields + +| Field | Description | Sensitive | +|--|--|--| +| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no | +| `EventType` | The type of the event. | no | + ### `sampled_query` An event of type `sampled_query` is the SQL query event logged to the telemetry channel. It diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a1deec050361..b5e1abfb043a 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -319,6 +319,7 @@ ALL_TESTS = [ "//pkg/sql/rowexec:rowexec_test", "//pkg/sql/rowflow:rowflow_test", "//pkg/sql/scanner:scanner_test", + "//pkg/sql/scheduledlogging:scheduledlogging_test", "//pkg/sql/schemachange:schemachange_test", "//pkg/sql/schemachanger/rel:rel_test", "//pkg/sql/schemachanger/scbuild:scbuild_test", diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 6e72ca49c374..78e8b2676458 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -19,32 +19,33 @@ type ModuleTestingKnobs interface { // TestingKnobs contains facilities for controlling various parts of the // system for testing. type TestingKnobs struct { - Store ModuleTestingKnobs - KVClient ModuleTestingKnobs - RangeFeed ModuleTestingKnobs - SQLExecutor ModuleTestingKnobs - SQLLeaseManager ModuleTestingKnobs - SQLSchemaChanger ModuleTestingKnobs - SQLDeclarativeSchemaChanger ModuleTestingKnobs - SQLTypeSchemaChanger ModuleTestingKnobs - GCJob ModuleTestingKnobs - PGWireTestingKnobs ModuleTestingKnobs - StartupMigrationManager ModuleTestingKnobs - DistSQL ModuleTestingKnobs - SQLEvalContext ModuleTestingKnobs - NodeLiveness ModuleTestingKnobs - Server ModuleTestingKnobs - TenantTestingKnobs ModuleTestingKnobs - JobsTestingKnobs ModuleTestingKnobs - BackupRestore ModuleTestingKnobs - TTL ModuleTestingKnobs - Streaming ModuleTestingKnobs - MigrationManager ModuleTestingKnobs - IndexUsageStatsKnobs ModuleTestingKnobs - SQLStatsKnobs ModuleTestingKnobs - SpanConfig ModuleTestingKnobs - SQLLivenessKnobs ModuleTestingKnobs - TelemetryLoggingKnobs ModuleTestingKnobs - DialerKnobs ModuleTestingKnobs - ProtectedTS ModuleTestingKnobs + Store ModuleTestingKnobs + KVClient ModuleTestingKnobs + RangeFeed ModuleTestingKnobs + SQLExecutor ModuleTestingKnobs + SQLLeaseManager ModuleTestingKnobs + SQLSchemaChanger ModuleTestingKnobs + SQLDeclarativeSchemaChanger ModuleTestingKnobs + SQLTypeSchemaChanger ModuleTestingKnobs + GCJob ModuleTestingKnobs + PGWireTestingKnobs ModuleTestingKnobs + StartupMigrationManager ModuleTestingKnobs + DistSQL ModuleTestingKnobs + SQLEvalContext ModuleTestingKnobs + NodeLiveness ModuleTestingKnobs + Server ModuleTestingKnobs + TenantTestingKnobs ModuleTestingKnobs + JobsTestingKnobs ModuleTestingKnobs + BackupRestore ModuleTestingKnobs + TTL ModuleTestingKnobs + Streaming ModuleTestingKnobs + MigrationManager ModuleTestingKnobs + IndexUsageStatsKnobs ModuleTestingKnobs + SQLStatsKnobs ModuleTestingKnobs + SpanConfig ModuleTestingKnobs + SQLLivenessKnobs ModuleTestingKnobs + TelemetryLoggingKnobs ModuleTestingKnobs + DialerKnobs ModuleTestingKnobs + ProtectedTS ModuleTestingKnobs + CapturedIndexUsageStatsKnobs ModuleTestingKnobs } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index b97f0ac17ee3..8167e3394073 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -167,6 +167,7 @@ go_library( "//pkg/sql/physicalplan", "//pkg/sql/querycache", "//pkg/sql/roleoption", + "//pkg/sql/scheduledlogging", "//pkg/sql/schemachanger/scdeps", "//pkg/sql/schemachanger/scjob", "//pkg/sql/schemachanger/scrun", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 5fc05fe84a1c..7f965d1dac35 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -72,6 +72,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness" "github.com/cockroachdb/cockroach/pkg/sql/pgwire" "github.com/cockroachdb/cockroach/pkg/sql/querycache" + "github.com/cockroachdb/cockroach/pkg/sql/scheduledlogging" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdeps" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scrun" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -787,6 +788,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if spanConfigKnobs := cfg.TestingKnobs.SpanConfig; spanConfigKnobs != nil { execCfg.SpanConfigTestingKnobs = spanConfigKnobs.(*spanconfig.TestingKnobs) } + if capturedIndexUsageStatsKnobs := cfg.TestingKnobs.CapturedIndexUsageStatsKnobs; capturedIndexUsageStatsKnobs != nil { + execCfg.CaptureIndexUsageStatsKnobs = capturedIndexUsageStatsKnobs.(*scheduledlogging.CaptureIndexUsageStatsTestingKnobs) + } statsRefresher := stats.MakeRefresher( cfg.AmbientCtx, @@ -1245,6 +1249,8 @@ func (s *SQLServer) preStart( scheduledjobs.ProdJobSchedulerEnv, ) + scheduledlogging.Start(ctx, stopper, s.execCfg.DB, s.execCfg.Settings, s.internalExecutor, s.execCfg.CaptureIndexUsageStatsKnobs) + return nil } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 1a806a2fc594..4d96a0b7aa13 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -357,6 +357,7 @@ go_library( "//pkg/sql/rowenc", "//pkg/sql/rowexec", "//pkg/sql/rowinfra", + "//pkg/sql/scheduledlogging", "//pkg/sql/schemachange", "//pkg/sql/schemachanger/scbuild", "//pkg/sql/schemachanger/scdeps", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 9b35dc464eca..5618e0b26222 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -75,6 +75,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/scheduledlogging" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scrun" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -1190,6 +1191,7 @@ type ExecutorConfig struct { SQLStatsTestingKnobs *sqlstats.TestingKnobs TelemetryLoggingTestingKnobs *TelemetryLoggingTestingKnobs SpanConfigTestingKnobs *spanconfig.TestingKnobs + CaptureIndexUsageStatsKnobs *scheduledlogging.CaptureIndexUsageStatsTestingKnobs // HistogramWindowInterval is (server.Config).HistogramWindowInterval. HistogramWindowInterval time.Duration diff --git a/pkg/sql/scheduledlogging/BUILD.bazel b/pkg/sql/scheduledlogging/BUILD.bazel new file mode 100644 index 000000000000..646bbdc953fa --- /dev/null +++ b/pkg/sql/scheduledlogging/BUILD.bazel @@ -0,0 +1,50 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "scheduledlogging", + srcs = ["captured_index_usage_stats.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/scheduledlogging", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv", + "//pkg/roachpb", + "//pkg/security", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", + "//pkg/sql/sqlutil", + "//pkg/util/log", + "//pkg/util/log/eventpb", + "//pkg/util/stop", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "scheduledlogging_test", + srcs = [ + "captured_index_usage_stats_test.go", + "main_test.go", + ], + embed = [":scheduledlogging"], + deps = [ + "//pkg/base", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/settings/cluster", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/log/channel", + "//pkg/util/log/logconfig", + "//pkg/util/randutil", + "//pkg/util/syncutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/sql/scheduledlogging/captured_index_usage_stats.go b/pkg/sql/scheduledlogging/captured_index_usage_stats.go new file mode 100644 index 000000000000..9b537cfa8209 --- /dev/null +++ b/pkg/sql/scheduledlogging/captured_index_usage_stats.go @@ -0,0 +1,320 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package scheduledlogging + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +var telemetryCaptureIndexUsageStatsEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.telemetry.capture_index_usage_stats.enabled", + "enable/disable capturing index usage statistics to the telemetry logging channel", + true, +) + +var telemetryCaptureIndexUsageStatsEnabledInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "sql.telemetry.capture_index_usage_stats.enabled_interval", + "the scheduled interval time between capturing index usage statistics when capturing index usage statistics is enabled", + 8*time.Second, + settings.NonNegativeDuration, +) + +var telemetryCaptureIndexUsageStatsStatusDisabledInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "sql.telemetry.capture_index_usage_stats.disabled_interval", + "the scheduled interval time between checks of to see if index usage statistics has been enabled", + 8*time.Second, + settings.NonNegativeDuration, +) + +var telemetryCaptureIndexUsageStatsLoggingDelay = settings.RegisterDurationSetting( + settings.SystemOnly, + "sql.telemetry.capture_index_usage_stats.logging_delay", + "the time delay between emitting individual index usage stats logs, this is done to "+ + "mitigate the log-line limit of 10 logs per second on the telemetry pipeline", + 500*time.Millisecond, + settings.NonNegativeDuration, +) + +// CaptureIndexUsageStatsTestingKnobs provides hooks and knobs for unit tests. +type CaptureIndexUsageStatsTestingKnobs struct { + // getLoggingDuration allows tests to override the duration of the index + // usage stats logging operation. + getLoggingDuration func() time.Duration + // getOverlapDuration allows tests to override the duration until the next + // scheduled interval in the case that the logging duration exceeds the + // default scheduled interval duration. + getOverlapDuration func() time.Duration +} + +// ModuleTestingKnobs implements base.ModuleTestingKnobs interface. +func (*CaptureIndexUsageStatsTestingKnobs) ModuleTestingKnobs() {} + +// CaptureIndexUsageStatsLoggingScheduler is responsible for logging index usage stats +// on a scheduled interval. +type CaptureIndexUsageStatsLoggingScheduler struct { + DB *kv.DB + cs *cluster.Settings + ie sqlutil.InternalExecutor + knobs *CaptureIndexUsageStatsTestingKnobs + currentCaptureStartTime time.Time +} + +func (s *CaptureIndexUsageStatsLoggingScheduler) getLoggingDuration() time.Duration { + if s.knobs != nil && s.knobs.getLoggingDuration != nil { + return s.knobs.getLoggingDuration() + } + return timeutil.Since(s.currentCaptureStartTime) +} + +func (s *CaptureIndexUsageStatsLoggingScheduler) durationOnOverlap() time.Duration { + if s.knobs != nil && s.knobs.getOverlapDuration != nil { + return s.knobs.getOverlapDuration() + } + // If the logging duration overlaps into the next scheduled interval, start + // the next scheduled interval immediately instead of waiting. + return 0 * time.Second +} + +func (s *CaptureIndexUsageStatsLoggingScheduler) durationUntilNextInterval() time.Duration { + // If telemetry is disabled, return the disabled interval duration. + if !telemetryCaptureIndexUsageStatsEnabled.Get(&s.cs.SV) { + return telemetryCaptureIndexUsageStatsStatusDisabledInterval.Get(&s.cs.SV) + } + // If the previous logging operation took longer than or equal to the set + // schedule interval, schedule the next interval immediately. + if s.getLoggingDuration() >= telemetryCaptureIndexUsageStatsEnabledInterval.Get(&s.cs.SV) { + return s.durationOnOverlap() + } + // Otherwise, schedule the next interval normally. + return telemetryCaptureIndexUsageStatsEnabledInterval.Get(&s.cs.SV) +} + +// Start starts the capture index usage statistics logging scheduler. +func Start( + ctx context.Context, + stopper *stop.Stopper, + db *kv.DB, + cs *cluster.Settings, + ie sqlutil.InternalExecutor, + knobs *CaptureIndexUsageStatsTestingKnobs, +) { + scheduler := CaptureIndexUsageStatsLoggingScheduler{ + DB: db, + cs: cs, + ie: ie, + knobs: knobs, + } + scheduler.start(ctx, stopper) +} + +func (s *CaptureIndexUsageStatsLoggingScheduler) start(ctx context.Context, stopper *stop.Stopper) { + _ = stopper.RunAsyncTask(ctx, "capture-index-usage-stats", func(ctx context.Context) { + // Start the scheduler immediately. + for timer := time.NewTimer(0 * time.Second); ; timer.Reset(s.durationUntilNextInterval()) { + select { + case <-stopper.ShouldQuiesce(): + timer.Stop() + return + case <-timer.C: + s.currentCaptureStartTime = timeutil.Now() + if !telemetryCaptureIndexUsageStatsEnabled.Get(&s.cs.SV) { + continue + } + + err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return captureIndexUsageStats(ctx, s.ie, stopper, telemetryCaptureIndexUsageStatsLoggingDelay.Get(&s.cs.SV)) + }) + if err != nil { + log.Errorf(ctx, "error capturing index usage stats: %+v", err) + } + } + } + }) +} + +// Emit implements the ScheduledLogEmitter interface. +func captureIndexUsageStats( + ctx context.Context, + ie sqlutil.InternalExecutor, + stopper *stop.Stopper, + loggingDelay time.Duration, +) error { + allDatabaseNames, err := getAllDatabaseNames(ctx, ie) + if err != nil { + return err + } + + // Capture index usage statistics for each database. + var ok bool + expectedNumDatums := 10 + var allCapturedIndexUsageStats []eventpb.EventPayload + for _, databaseName := range allDatabaseNames { + // Omit index usage statistics of the 'system' database. + if databaseName == "system" { + continue + } + stmt := fmt.Sprintf(` + SELECT + '%s' as database_name, + ti.descriptor_name as table_name, + ti.descriptor_id as table_id, + ti.index_name, + ti.index_id, + ti.index_type, + ti.is_unique, + ti.is_inverted, + total_reads, + last_read + FROM %s.crdb_internal.index_usage_statistics AS us + JOIN %s.crdb_internal.table_indexes ti + ON us.index_id = ti.index_id + AND us.table_id = ti.descriptor_id + ORDER BY total_reads ASC; + `, databaseName, databaseName, databaseName) + + it, err := ie.QueryIteratorEx( + ctx, + "capture-index-usage-stats", + nil, + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + stmt, + ) + if err != nil { + return err + } + + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + var row tree.Datums + if row = it.Cur(); row == nil { + return errors.New("unexpected null row while capturing index usage stats") + } + + if row.Len() != expectedNumDatums { + return errors.Newf("expected %d columns, received %d while capturing index usage stats", expectedNumDatums, row.Len()) + } + + databaseName := tree.MustBeDString(row[0]) + tableName := tree.MustBeDString(row[1]) + tableID := tree.MustBeDInt(row[2]) + indexName := tree.MustBeDString(row[3]) + indexID := tree.MustBeDInt(row[4]) + indexType := tree.MustBeDString(row[5]) + isUnique := tree.MustBeDBool(row[6]) + isInverted := tree.MustBeDBool(row[7]) + totalReads := uint64(tree.MustBeDInt(row[8])) + lastRead := time.Time{} + if row[9] != tree.DNull { + lastRead = tree.MustBeDTimestampTZ(row[9]).Time + } + + capturedIndexStats := &eventpb.CapturedIndexUsageStats{ + TableID: uint32(roachpb.TableID(tableID)), + IndexID: uint32(roachpb.IndexID(indexID)), + TotalReadCount: totalReads, + LastRead: lastRead.String(), + DatabaseName: string(databaseName), + TableName: string(tableName), + IndexName: string(indexName), + IndexType: string(indexType), + IsUnique: bool(isUnique), + IsInverted: bool(isInverted), + } + + allCapturedIndexUsageStats = append(allCapturedIndexUsageStats, capturedIndexStats) + } + err = it.Close() + if err != nil { + return err + } + } + logIndexUsageStatsWithDelay(ctx, allCapturedIndexUsageStats, stopper, loggingDelay) + return nil +} + +// logIndexUsageStatsWithDelay logs an eventpb.EventPayload at each +// telemetryCaptureIndexUsageStatsLoggingDelay to avoid exceeding the 10 +// log-line per second limit per node on the telemetry logging pipeline. +// Currently, this log-line limit is only shared with 1 other telemetry event, +// SampledQuery, which now has a logging frequency of 8 logs per second. +func logIndexUsageStatsWithDelay( + ctx context.Context, events []eventpb.EventPayload, stopper *stop.Stopper, delay time.Duration, +) { + + // Log the first event immediately. + timer := time.NewTimer(0 * time.Second) + for len(events) > 0 { + select { + case <-stopper.ShouldQuiesce(): + timer.Stop() + return + case <-timer.C: + event := events[0] + log.StructuredEvent(ctx, event) + events = events[1:] + // Apply a delay to subsequent events. + timer.Reset(delay) + } + } + timer.Stop() +} + +func getAllDatabaseNames(ctx context.Context, ie sqlutil.InternalExecutor) ([]string, error) { + var allDatabaseNames []string + var ok bool + var expectedNumDatums = 1 + + it, err := ie.QueryIteratorEx( + ctx, + "get-all-db-names", + nil, + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + `SELECT database_name FROM [SHOW DATABASES]`, + ) + if err != nil { + return []string{}, err + } + + // We have to make sure to close the iterator since we might return from the + // for loop early (before Next() returns false). + defer func() { err = errors.CombineErrors(err, it.Close()) }() + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + var row tree.Datums + if row = it.Cur(); row == nil { + return []string{}, errors.New("unexpected null row while capturing index usage stats") + } + if row.Len() != expectedNumDatums { + return []string{}, errors.Newf("expected %d columns, received %d while capturing index usage stats", expectedNumDatums, row.Len()) + } + + databaseName := string(tree.MustBeDString(row[0])) + allDatabaseNames = append(allDatabaseNames, databaseName) + } + return allDatabaseNames, nil +} diff --git a/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go b/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go new file mode 100644 index 000000000000..89a5ef6fad83 --- /dev/null +++ b/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go @@ -0,0 +1,311 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package scheduledlogging + +import ( + "context" + "math" + "regexp" + "sort" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/lint/passes/errwrap/testdata/src/github.com/cockroachdb/errors" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/channel" + "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/stretchr/testify/require" +) + +type stubDurations struct { + syncutil.RWMutex + loggingDuration time.Duration + overlapDuration time.Duration +} + +func (s *stubDurations) setLoggingDuration(d time.Duration) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.loggingDuration = d +} + +func (s *stubDurations) getLoggingDuration() time.Duration { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.loggingDuration +} + +func (s *stubDurations) setOverlapDuration(d time.Duration) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.overlapDuration = d +} + +func (s *stubDurations) getOverlapDuration() time.Duration { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.overlapDuration +} + +func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() { + // Enable logging channels. + log.TestingResetActive() + cfg := logconfig.DefaultConfig() + // Make a sink for just the session log. + cfg.Sinks.FileGroups = map[string]*logconfig.FileSinkConfig{ + "telemetry": { + Channels: logconfig.SelectChannels(channel.TELEMETRY), + }} + dir := sc.GetDirectory() + if err := cfg.Validate(&dir); err != nil { + t.Fatal(err) + } + cleanup, err := log.ApplyConfig(cfg) + if err != nil { + t.Fatal(err) + } + + return cleanup +} + +func TestCaptureIndexUsageStats(t *testing.T) { + defer leaktest.AfterTest(t)() + sc := log.ScopeWithoutShowLogs(t) + defer sc.Close(t) + + cleanup := installTelemetryLogFileSink(sc, t) + defer cleanup() + + sd := stubDurations{} + sd.setLoggingDuration(1 * time.Second) + sd.setOverlapDuration(10 * time.Second) + stubScheduleEnabledInterval := 20 * time.Second + stubScheduleDisabledInterval := 1 * time.Second + stubLoggingDelay := 0 * time.Second + + // timeBuffer is a short time buffer to account for delays in the schedule + // timings when running tests. The time buffer smaller than the difference + // between each schedule interval to ensure that there is no overlap. + timeBuffer := 5 * time.Second + + settings := cluster.MakeTestingClusterSettings() + // Configure capture index usage statistics to be disabled. This is to test + // whether the disabled interval works correctly. We start in a disabled + // state, once the disabled interval expires, we check whether we have + // transitioned to an enabled state, if we have, we check that the expected + // logs have been emitted. + telemetryCaptureIndexUsageStatsEnabled.Override(context.Background(), &settings.SV, false) + // Configure the schedule interval at which we capture index usage + // statistics. + telemetryCaptureIndexUsageStatsEnabledInterval.Override(context.Background(), &settings.SV, stubScheduleEnabledInterval) + // Configure the schedule interval at which we check whether capture index + // usage statistics has been enabled. + telemetryCaptureIndexUsageStatsStatusDisabledInterval.Override(context.Background(), &settings.SV, stubScheduleDisabledInterval) + // Configure the delay between each emission of index usage stats logs. + telemetryCaptureIndexUsageStatsLoggingDelay.Override(context.Background(), &settings.SV, stubLoggingDelay) + + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + CapturedIndexUsageStatsKnobs: &CaptureIndexUsageStatsTestingKnobs{ + getLoggingDuration: sd.getLoggingDuration, + getOverlapDuration: sd.getOverlapDuration, + }, + }, + }) + + defer s.Stopper().Stop(context.Background()) + + db := sqlutils.MakeSQLRunner(sqlDB) + + // Create test databases. + db.Exec(t, "CREATE DATABASE test") + db.Exec(t, "CREATE DATABASE test2") + + // Create a table for each database. + db.Exec(t, "CREATE TABLE test.test_table (num INT PRIMARY KEY, letter char)") + db.Exec(t, "CREATE TABLE test2.test2_table (num INT PRIMARY KEY, letter char)") + + // Create an index on each created table (each table now has two indices: + // primary and this one) + db.Exec(t, "CREATE INDEX ON test.test_table (letter)") + db.Exec(t, "CREATE INDEX ON test2.test2_table (letter)") + + // Check that telemetry log file contains all the entries we're expecting, at the scheduled intervals. + + // Enable capture of index usage stats. + telemetryCaptureIndexUsageStatsEnabled.Override(context.Background(), &s.ClusterSettings().SV, true) + + expectedTotalNumEntriesInSingleInterval := 4 + expectedNumberOfIndividualIndexEntriesInSingleInterval := 1 + + // Expect index usage statistics logs once the schedule disabled interval has passed. + // Assert that we have the expected number of total logs and expected number + // of logs for each index. + testutils.SucceedsWithin(t, func() error { + return checkNumTotalEntriesAndIndexEntries(expectedTotalNumEntriesInSingleInterval, expectedNumberOfIndividualIndexEntriesInSingleInterval) + }, stubScheduleDisabledInterval+timeBuffer) + + // Verify that a second schedule has run after the enabled interval has passed. + // Expect number of total entries to hold 2 times the number of entries in a + // single interval. + expectedTotalNumEntriesAfterTwoIntervals := expectedTotalNumEntriesInSingleInterval * 2 + // Expect number of individual index entries to hold 2 times the number of + // entries in a single interval. + expectedNumberOfIndividualIndexEntriesAfterTwoIntervals := expectedNumberOfIndividualIndexEntriesInSingleInterval * 2 + // Set the logging duration for the next run to be longer than the schedule + // interval duration. + stubLoggingDuration := stubScheduleEnabledInterval * 2 + sd.setLoggingDuration(stubLoggingDuration) + + // Expect index usage statistics logs once the schedule enabled interval has passed. + // Assert that we have the expected number of total logs and expected number + // of logs for each index. + testutils.SucceedsWithin(t, func() error { + return checkNumTotalEntriesAndIndexEntries(expectedTotalNumEntriesAfterTwoIntervals, expectedNumberOfIndividualIndexEntriesAfterTwoIntervals) + }, stubScheduleEnabledInterval+timeBuffer) + + // Verify that a third schedule has run after the overlap duration has passed. + // Expect number of total entries to hold 3 times the number of entries in a + // single interval. + expectedTotalNumEntriesAfterThreeIntervals := expectedTotalNumEntriesInSingleInterval * 3 + // Expect number of individual index entries to hold 3 times the number of + // entries in a single interval. + expectedNumberOfIndividualIndexEntriesAfterThreeIntervals := expectedNumberOfIndividualIndexEntriesInSingleInterval * 3 + + // Assert that we have the expected number of total logs and expected number + // of logs for each index. + testutils.SucceedsWithin(t, func() error { + return checkNumTotalEntriesAndIndexEntries(expectedTotalNumEntriesAfterThreeIntervals, expectedNumberOfIndividualIndexEntriesAfterThreeIntervals) + }, sd.getOverlapDuration()+timeBuffer) + // Stop capturing index usage statistics. + telemetryCaptureIndexUsageStatsEnabled.Override(context.Background(), &settings.SV, false) + + // Iterate through entries, ensure that the timestamp difference between each + // schedule is as expected. + entries, err := log.FetchEntriesFromFiles( + 0, + math.MaxInt64, + 10000, + regexp.MustCompile(`"EventType":"captured_index_usage_stats"`), + log.WithMarkedSensitiveData, + ) + + require.NoError(t, err, "expected no error fetching entries from files") + + // Sort slice by timestamp, ascending order. + sort.Slice(entries, func(a int, b int) bool { + return entries[a].Time < entries[b].Time + }) + + testData := []time.Duration{ + 0 * time.Second, + stubScheduleEnabledInterval, /* second difference between first and second schedule */ + sd.getOverlapDuration(), /* second difference between second and third schedule */ + } + + var ( + previousTimestamp = int64(0) + currentTimestamp = int64(0) + ) + + // Check the timestamp differences between schedules. + for idx, expectedDuration := range testData { + entriesLowerBound := idx * expectedTotalNumEntriesInSingleInterval + entriesUpperBound := (idx + 1) * expectedTotalNumEntriesInSingleInterval + scheduleEntryBlock := entries[entriesLowerBound:entriesUpperBound] + // Take the first log entry from the schedule. + currentTimestamp = scheduleEntryBlock[0].Time + // If this is the first iteration, initialize the previous timestamp. + if idx == 0 { + previousTimestamp = currentTimestamp + } + + nanoSecondDiff := currentTimestamp - previousTimestamp + // We allow for integer division to remove any miscellaneous nanosecond + // delay from the logging. + secondDiff := nanoSecondDiff / 1e9 + actualDuration := time.Duration(secondDiff) * time.Second + require.Equal(t, expectedDuration, actualDuration) + previousTimestamp = currentTimestamp + } +} + +// checkNumTotalEntriesAndIndexEntries is a helper function that verifies that +// we are getting the correct number of total log entries and correct number of +// log entries for each index. +func checkNumTotalEntriesAndIndexEntries( + expectedTotalEntries int, expectedIndividualIndexEntries int, +) error { + // Fetch log entries. + entries, err := log.FetchEntriesFromFiles( + 0, + math.MaxInt64, + 10000, + regexp.MustCompile(`"EventType":"captured_index_usage_stats"`), + log.WithMarkedSensitiveData, + ) + + if err != nil { + return errors.Newf("expected no error while fetching entries from files, got %v", err) + } + + // Assert that we have the correct number of entries. + if expectedTotalEntries != len(entries) { + return errors.Newf("expected %d total entries, got %d", expectedTotalEntries, len(entries)) + } + + var ( + numEntriesForTestTablePrimaryKeyIndex int + numEntriesForTestTableLetterIndex int + numEntriesForTest2TablePrimaryKeyIndex int + numEntriesForTest2TableLetterIndex int + ) + + for _, e := range entries { + if strings.Contains(e.Message, `"IndexName":"‹test_table_pkey›"`) { + numEntriesForTestTablePrimaryKeyIndex++ + } + if strings.Contains(e.Message, `"IndexName":"‹test_table_letter_idx›"`) { + numEntriesForTestTableLetterIndex++ + } + if strings.Contains(e.Message, `"TableName":"‹test2_table_pkey›"`) { + numEntriesForTest2TablePrimaryKeyIndex++ + } + if strings.Contains(e.Message, `"TableName":"‹test2_table_letter_idx›"`) { + numEntriesForTest2TableLetterIndex++ + } + } + + // Assert that we have the correct number index usage statistic entries for + // each index we created across the tables in each database. + if expectedIndividualIndexEntries != numEntriesForTestTablePrimaryKeyIndex { + return errors.Newf("expected %d test_table primary key index entries, got %d", expectedIndividualIndexEntries, numEntriesForTestTablePrimaryKeyIndex) + } + if expectedIndividualIndexEntries != numEntriesForTestTablePrimaryKeyIndex { + return errors.Newf("expected %d test_table letter index entries, got %d", expectedIndividualIndexEntries, numEntriesForTestTableLetterIndex) + } + if expectedIndividualIndexEntries != numEntriesForTestTablePrimaryKeyIndex { + return errors.Newf("expected %d test2_table primary key index entries, got %d", expectedIndividualIndexEntries, numEntriesForTest2TablePrimaryKeyIndex) + } + if expectedIndividualIndexEntries != numEntriesForTestTablePrimaryKeyIndex { + return errors.Newf("expected %d test2_table letter index entries, got %d", expectedIndividualIndexEntries, numEntriesForTest2TableLetterIndex) + } + return nil +} diff --git a/pkg/sql/scheduledlogging/main_test.go b/pkg/sql/scheduledlogging/main_test.go new file mode 100644 index 000000000000..fd6f7276e041 --- /dev/null +++ b/pkg/sql/scheduledlogging/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package scheduledlogging_test + +import ( + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index a7bdf2178cf2..c783a8b12778 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -21,7 +21,7 @@ import ( // Default value used to designate the maximum frequency at which events // are logged to the telemetry channel. -const defaultMaxEventFrequency = 10 +const defaultMaxEventFrequency = 8 var telemetryMaxEventFrequency = settings.RegisterIntSetting( settings.TenantWritable, diff --git a/pkg/util/log/eventpb/eventlog_channels_generated.go b/pkg/util/log/eventpb/eventlog_channels_generated.go index 982b6f0767bb..034059339244 100644 --- a/pkg/util/log/eventpb/eventlog_channels_generated.go +++ b/pkg/util/log/eventpb/eventlog_channels_generated.go @@ -266,6 +266,9 @@ func (m *DropRole) LoggingChannel() logpb.Channel { return logpb.Channel_USER_AD // LoggingChannel implements the EventPayload interface. func (m *PasswordHashConverted) LoggingChannel() logpb.Channel { return logpb.Channel_USER_ADMIN } +// LoggingChannel implements the EventPayload interface. +func (m *CapturedIndexUsageStats) LoggingChannel() logpb.Channel { return logpb.Channel_TELEMETRY } + // LoggingChannel implements the EventPayload interface. func (m *SampledQuery) LoggingChannel() logpb.Channel { return logpb.Channel_TELEMETRY } diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index 6db459363366..5bf28df80a7b 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -564,6 +564,117 @@ func (m *AlterTypeOwner) AppendJSONFields(printComma bool, b redact.RedactableBy return printComma, b } +// AppendJSONFields implements the EventPayload interface. +func (m *CapturedIndexUsageStats) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { + + printComma, b = m.CommonEventDetails.AppendJSONFields(printComma, b) + + if m.TotalReadCount != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"TotalReadCount\":"...) + b = strconv.AppendUint(b, uint64(m.TotalReadCount), 10) + } + + if m.LastRead != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"LastRead\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.LastRead))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.TableID != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"TableID\":"...) + b = strconv.AppendUint(b, uint64(m.TableID), 10) + } + + if m.IndexID != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"IndexID\":"...) + b = strconv.AppendUint(b, uint64(m.IndexID), 10) + } + + if m.DatabaseName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"DatabaseName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.DatabaseName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.TableName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"TableName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.TableName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.IndexName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"IndexName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.IndexName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.IndexType != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"IndexType\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.IndexType))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.IsUnique { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"IsUnique\":true"...) + } + + if m.IsInverted { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"IsInverted\":true"...) + } + + return printComma, b +} + // AppendJSONFields implements the EventPayload interface. func (m *CertsReload) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 785b3781c76c..912bf26fd782 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -16,6 +16,7 @@ import "gogoproto/gogo.proto"; import "util/log/eventpb/events.proto"; import "util/log/eventpb/sql_audit_events.proto"; + // Category: Telemetry events // Channel: TELEMETRY @@ -45,3 +46,29 @@ message SampledQuery { string distribution = 6 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""]; } +// CapturedIndexUsageStats +message CapturedIndexUsageStats { + CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; + + // Couldn't use roachpb.CollectedIndexUsageStatistics due to circular dependency. + + // TotalReadCount is the number of times this index has been read from. + uint64 total_read_count = 2; + + // LastRead is the timestamp that this index was last being read from. + string last_read = 3 [(gogoproto.jsontag) = ",omitempty"]; + + // TableID is the ID of the table this index is created on. This is same as + // descpb.TableID and is unique within the cluster. + uint32 table_id = 4 [(gogoproto.customname) = "TableID"]; + + // IndexID is the ID of the index within the scope of the given table. + uint32 index_id = 5 [(gogoproto.customname) = "IndexID"]; + + string database_name = 6 [(gogoproto.jsontag) = ",omitempty"]; + string table_name = 7 [(gogoproto.jsontag) = ",omitempty"]; + string index_name = 8 [(gogoproto.jsontag) = ",omitempty"]; + string index_type = 9 [(gogoproto.jsontag) = ",omitempty"]; + bool is_unique = 10 [(gogoproto.jsontag) = ",omitempty"]; + bool is_inverted = 11 [(gogoproto.jsontag) = ",omitempty"]; +}