Skip to content

Commit

Permalink
sql: created a scheduled logger to capture index usage stats
Browse files Browse the repository at this point in the history
This change introduces a scheduled logger that runs background processes
intended to emit logs on a time interval.

Release note (sql change): Initial implementation of a scheduled logger,
used to capture index usage statistics to the telemetry logging channel.
  • Loading branch information
Thomas Hardy committed Feb 23, 2022
1 parent 5b3067f commit 7efa752
Show file tree
Hide file tree
Showing 10 changed files with 556 additions and 0 deletions.
26 changes: 26 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2305,6 +2305,32 @@ are automatically converted server-side.
Events in this category are logged to the `TELEMETRY` channel.


### `captured_index_usage_stats`

An event of type `captured_index_usage_stats`


| Field | Description | Sensitive |
|--|--|--|
| `TotalReadCount` | TotalReadCount is the number of times this index has been read from. | no |
| `LastRead` | LastRead is the timestamp that this index was last being read from. | yes |
| `TableID` | TableID is the ID of the table this index is created on. This is same as descpb.TableID and is unique within the cluster. | no |
| `IndexID` | IndexID is the ID of the index within the scope of the given table. | no |
| `DatabaseName` | | yes |
| `TableName` | | yes |
| `IndexName` | | yes |
| `IndexType` | | yes |
| `IsUnique` | | no |
| `IsInverted` | | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |

### `sampled_query`

An event of type `sampled_query` is the SQL query event logged to the telemetry channel. It
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ ALL_TESTS = [
"//pkg/sql/rowexec:rowexec_test",
"//pkg/sql/rowflow:rowflow_test",
"//pkg/sql/scanner:scanner_test",
"//pkg/sql/scheduledlogging:scheduledlogging_test",
"//pkg/sql/schemachange:schemachange_test",
"//pkg/sql/schemachanger/rel:rel_test",
"//pkg/sql/schemachanger/scbuild:scbuild_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ go_library(
"//pkg/sql/physicalplan",
"//pkg/sql/querycache",
"//pkg/sql/roleoption",
"//pkg/sql/scheduledlogging",
"//pkg/sql/schemachanger/scdeps",
"//pkg/sql/schemachanger/scjob",
"//pkg/sql/schemachanger/scrun",
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/scheduledlogging"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdeps"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -1245,6 +1246,8 @@ func (s *SQLServer) preStart(
scheduledjobs.ProdJobSchedulerEnv,
)

scheduledlogging.StartScheduledLogging(ctx, stopper, s.execCfg.DB, s.execCfg.Settings, s.internalExecutor)

return nil
}

Expand Down
44 changes: 44 additions & 0 deletions pkg/sql/scheduledlogging/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "scheduledlogging",
srcs = [
"captured_index_usage_stats.go",
"scheduled_logger.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/scheduledlogging",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/roachpb",
"//pkg/security",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "scheduledlogging_test",
srcs = [
"captured_index_usage_stats_test.go",
"scheduled_logger_test.go",
],
embed = [":scheduledlogging"],
deps = [
"//pkg/base",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/channel",
"//pkg/util/log/logconfig",
],
)
228 changes: 228 additions & 0 deletions pkg/sql/scheduledlogging/captured_index_usage_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package scheduledlogging

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

var telemetryCaptureIndexUsageStatsEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.telemetry.capture_index_usage_stats.enabled",
"enable/disable capturing index usage statistics to the telemetry logging channel",
true,
)

const captureIndexUsageStatsScheduleInterval = 8 * time.Second

// CaptureIndexUsageStatsEmitter type implements the ScheduledLogEmitter interface.
type CaptureIndexUsageStatsEmitter struct {
syncutil.Mutex
isRunning bool
}

// Emit implements the ScheduledLogEmitter interface.
func (s *CaptureIndexUsageStatsEmitter) Emit(
ctx context.Context, ie sqlutil.InternalExecutor, stopper *stop.Stopper,
) error {

allDatabaseNames, err := getAllDatabaseNames(ctx, ie)
if err != nil {
return err
}

// Capture index usage statistics for each database.
var ok bool
expectedNumDatums := 10
var allCapturedIndexUsageStats []eventpb.EventPayload
for _, databaseName := range allDatabaseNames {
// Omit index usage statistics of the 'system' database.
if databaseName == "system" {
continue
}
stmt := fmt.Sprintf(`
SELECT
'%s' as database_name,
ti.descriptor_name as table_name,
ti.descriptor_id as table_id,
ti.index_name,
ti.index_id,
ti.index_type,
ti.is_unique,
ti.is_inverted,
total_reads,
last_read
FROM %s.crdb_internal.index_usage_statistics AS us
JOIN %s.crdb_internal.table_indexes ti
ON us.index_id = ti.index_id
AND us.table_id = ti.descriptor_id
ORDER BY total_reads ASC;
`, databaseName, databaseName, databaseName)

it, err := ie.QueryIteratorEx(
ctx,
"capture-index-usage-stats",
nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
stmt,
)
if err != nil {
return err
}

for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
var row tree.Datums
if row = it.Cur(); row == nil {
return errors.New("unexpected null row while capturing index usage stats")
}

if row.Len() != expectedNumDatums {
return errors.Newf("expected %d columns, received %d while capturing index usage stats", expectedNumDatums, row.Len())
}

databaseName := tree.MustBeDString(row[0])
tableName := tree.MustBeDString(row[1])
tableID := tree.MustBeDInt(row[2])
indexName := tree.MustBeDString(row[3])
indexID := tree.MustBeDInt(row[4])
indexType := tree.MustBeDString(row[5])
isUnique := tree.MustBeDBool(row[6])
isInverted := tree.MustBeDBool(row[7])
totalReads := uint64(tree.MustBeDInt(row[8]))
lastRead := time.Time{}
if row[9] != tree.DNull {
lastRead = tree.MustBeDTimestampTZ(row[9]).Time
}

capturedIndexStats := &eventpb.CapturedIndexUsageStats{
TableID: uint32(roachpb.TableID(tableID)),
IndexID: uint32(roachpb.IndexID(indexID)),
TotalReadCount: totalReads,
LastRead: lastRead.String(),
DatabaseName: string(databaseName),
TableName: string(tableName),
IndexName: string(indexName),
IndexType: string(indexType),
IsUnique: bool(isUnique),
IsInverted: bool(isInverted),
}

allCapturedIndexUsageStats = append(allCapturedIndexUsageStats, capturedIndexStats)
}
err = it.Close()
if err != nil {
return err
}
}
s.logIndexUsageStatsWithDelay(ctx, allCapturedIndexUsageStats, stopper)
return nil
}

// logIndexUsageStatsWithDelay logs a slice of eventpb.EventPayload at half
// second intervals (2 logs per second) to avoid exceeding the 10 log-line per
// second limit per node on the telemetry logging pipeline.
func (s *CaptureIndexUsageStatsEmitter) logIndexUsageStatsWithDelay(
ctx context.Context, events []eventpb.EventPayload, stopper *stop.Stopper,
) {
_ = stopper.RunAsyncTask(ctx, "logging-index-usage-stats-with-delay", func(ctx context.Context) {
s.Lock()
s.isRunning = true
s.Unlock()
defer func() {
s.Lock()
s.isRunning = false
s.Unlock()
}()

ticker := time.NewTicker(1000 * time.Millisecond)

for len(events) > 0 {
select {
case <-stopper.ShouldQuiesce():
ticker.Stop()
return
case <-ticker.C:
event := events[0]
log.StructuredEvent(ctx, event)
events = events[1:]
}
}
ticker.Stop()
})
}

func getAllDatabaseNames(ctx context.Context, ie sqlutil.InternalExecutor) ([]string, error) {
var allDatabaseNames []string
var ok bool
var expectedNumDatums = 1

it, err := ie.QueryIteratorEx(
ctx,
"get-all-db-names",
nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
`SELECT database_name FROM [SHOW DATABASES]`,
)
if err != nil {
return []string{}, err
}

// We have to make sure to close the iterator since we might return from the
// for loop early (before Next() returns false).
defer func() { err = errors.CombineErrors(err, it.Close()) }()
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
var row tree.Datums
if row = it.Cur(); row == nil {
return []string{}, errors.New("unexpected null row while capturing index usage stats")
}
if row.Len() != expectedNumDatums {
return []string{}, errors.Newf("expected %d columns, received %d while capturing index usage stats", expectedNumDatums, row.Len())
}

databaseName := string(tree.MustBeDString(row[0]))
allDatabaseNames = append(allDatabaseNames, databaseName)
}
return allDatabaseNames, nil
}

// Interval implements the ScheduledLogEmitter interface.
func (s *CaptureIndexUsageStatsEmitter) Interval() time.Duration {
return captureIndexUsageStatsScheduleInterval
}

// IsEnabled implements the ScheduledLogEmitter interface.
func (s *CaptureIndexUsageStatsEmitter) IsEnabled(cs *cluster.Settings) bool {
return telemetryCaptureIndexUsageStatsEnabled.Get(&cs.SV)
}

// IsRunning implements the ScheduledLogEmitter interface.
func (s *CaptureIndexUsageStatsEmitter) IsRunning() bool {
return s.isRunning
}

func init() {
RegisterLogEmitter(CaptureIndexUsageStatsEmitterType, &CaptureIndexUsageStatsEmitter{})
}
Loading

0 comments on commit 7efa752

Please sign in to comment.