diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index f65762aa550c..6b0d5a527978 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" @@ -395,12 +396,12 @@ func (p *planner) maybeLogStatementInternal( requiredTimeElapsed = 0 } if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { - var contentionNanos int64 + var stats execstats.QueryLevelStats if queryLevelStats, ok := p.instrumentation.GetQueryLevelStats(); ok { - contentionNanos = queryLevelStats.ContentionTime.Nanoseconds() + stats = *queryLevelStats } - contentionNanos = telemetryMetrics.getContentionTime(contentionNanos) + stats = telemetryMetrics.getQueryLevelStats(stats) skippedQueries := telemetryMetrics.resetSkippedQueryCount() sampledQuery := eventpb.SampledQuery{ @@ -437,8 +438,14 @@ func (p *planner) maybeLogStatementInternal( InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]), ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]), ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]), - ContentionNanos: contentionNanos, + ContentionNanos: stats.ContentionTime.Nanoseconds(), Regions: p.curPlan.instrumentation.regions, + NetworkBytesSent: stats.NetworkBytesSent, + MaxMemUsage: stats.MaxMemUsage, + MaxDiskUsage: stats.MaxDiskUsage, + KVBytesRead: stats.KVBytesRead, + KVRowsRead: stats.KVRowsRead, + NetworkMessages: stats.NetworkMessages, } p.logOperationalEventsOnlyExternally(ctx, &sampledQuery) } else { diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 26360e9ef49b..6d933811d1e8 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -54,10 +55,8 @@ type TelemetryLoggingTestingKnobs struct { // getTimeNow allows tests to override the timeutil.Now() function used // when updating rolling query counts. getTimeNow func() time.Time - // getContentionNanos allows tests to override the recorded contention time - // for the query. Used to stub non-zero values to populate the log's contention - // time field. - getContentionNanos func() int64 + // getQueryLevelMetrics allows tests to override the recorded query level stats. + getQueryLevelStats func() execstats.QueryLevelStats // getTracingStatus allows tests to override whether the current query has tracing // enabled or not. Queries with tracing enabled are always sampled to telemetry. getTracingStatus func() bool @@ -91,11 +90,11 @@ func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime( return false } -func (t *TelemetryLoggingMetrics) getContentionTime(contentionTimeInNanoseconds int64) int64 { - if t.Knobs != nil && t.Knobs.getContentionNanos != nil { - return t.Knobs.getContentionNanos() +func (t *TelemetryLoggingMetrics) getQueryLevelStats(queryLevelStats execstats.QueryLevelStats) execstats.QueryLevelStats { + if t.Knobs != nil && t.Knobs.getQueryLevelStats != nil { + return t.Knobs.getQueryLevelStats() } - return contentionTimeInNanoseconds + return queryLevelStats } func (t *TelemetryLoggingMetrics) isTracing(_ *tracing.Span, tracingEnabled bool) bool { diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index a8367eaa4669..9e58752fa0f6 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -22,68 +22,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logtestutils" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) -type stubTime struct { - syncutil.RWMutex - t time.Time -} - -func (s *stubTime) setTime(t time.Time) { - s.RWMutex.Lock() - defer s.RWMutex.Unlock() - s.t = t -} - -func (s *stubTime) TimeNow() time.Time { - s.RWMutex.RLock() - defer s.RWMutex.RUnlock() - return s.t -} - -type stubQueryMetrics struct { - syncutil.RWMutex - contentionNanos int64 -} - -func (s *stubQueryMetrics) setContentionNanos(t int64) { - s.RWMutex.Lock() - defer s.RWMutex.Unlock() - s.contentionNanos = t -} - -func (s *stubQueryMetrics) ContentionNanos() int64 { - s.RWMutex.RLock() - defer s.RWMutex.RUnlock() - return s.contentionNanos -} - -type stubTracingStatus struct { - syncutil.RWMutex - isTracing bool -} - -func (s *stubTracingStatus) setTracingStatus(t bool) { - s.RWMutex.Lock() - defer s.RWMutex.Unlock() - s.isTracing = t -} - -func (s *stubTracingStatus) TracingStatus() bool { - s.RWMutex.RLock() - defer s.RWMutex.RUnlock() - return s.isTracing -} - // TestTelemetryLogging verifies that telemetry events are logged to the telemetry log // and are sampled according to the configured sample rate. func TestTelemetryLogging(t *testing.T) { @@ -94,9 +43,9 @@ func TestTelemetryLogging(t *testing.T) { cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t) defer cleanup() - st := stubTime{} - sqm := stubQueryMetrics{} - sts := stubTracingStatus{} + st := logtestutils.StubTime{} + sqm := logtestutils.StubQueryStats{} + sts := logtestutils.StubTracingStatus{} s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -107,7 +56,7 @@ func TestTelemetryLogging(t *testing.T) { }, TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ getTimeNow: st.TimeNow, - getContentionNanos: sqm.ContentionNanos, + getQueryLevelStats: sqm.QueryLevelStats, getTracingStatus: sts.TracingStatus, }, }, @@ -159,7 +108,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead bool expectedWrite bool expectedErr string // Empty string means no error is expected. - contentionNanos int64 + queryLevelStats execstats.QueryLevelStats enableTracing bool }{ { @@ -180,8 +129,16 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: false, expectedRead: false, expectedWrite: false, - contentionNanos: 0, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 0 * time.Nanosecond, + NetworkBytesSent: 1, + MaxMemUsage: 2, + MaxDiskUsage: 3, + KVBytesRead: 4, + KVRowsRead: 5, + NetworkMessages: 6, + }, + enableTracing: false, }, { // Test case with statement that is of type DML. @@ -199,8 +156,10 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: false, expectedRead: false, expectedWrite: false, - contentionNanos: 1, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 1 * time.Nanosecond, + }, + enableTracing: false, }, { // Test case with statement that is of type DML. @@ -219,8 +178,13 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, - contentionNanos: 2, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 2 * time.Nanosecond, + NetworkBytesSent: 1, + MaxMemUsage: 2, + NetworkMessages: 6, + }, + enableTracing: false, }, { // Test case with statement that is of type DML. @@ -238,8 +202,16 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, - contentionNanos: 3, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 3 * time.Nanosecond, + NetworkBytesSent: 1124, + MaxMemUsage: 132, + MaxDiskUsage: 3, + KVBytesRead: 4, + KVRowsRead: 2345, + NetworkMessages: 36, + }, + enableTracing: false, }, { // Test case with a full scan. @@ -257,8 +229,15 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, - contentionNanos: 0, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 0 * time.Nanosecond, + NetworkBytesSent: 124235, + MaxMemUsage: 12412, + MaxDiskUsage: 3, + KVRowsRead: 5, + NetworkMessages: 6235, + }, + enableTracing: false, }, { // Test case with a write. @@ -276,8 +255,14 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: true, - contentionNanos: 0, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 0 * time.Nanosecond, + NetworkBytesSent: 1, + KVBytesRead: 4, + KVRowsRead: 5, + NetworkMessages: 6, + }, + enableTracing: false, }, // Not of type DML so not sampled { @@ -314,8 +299,16 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, - contentionNanos: 2, - enableTracing: true, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 2 * time.Nanosecond, + NetworkBytesSent: 10, + MaxMemUsage: 20, + MaxDiskUsage: 33, + KVBytesRead: 24, + KVRowsRead: 55, + NetworkMessages: 66, + }, + enableTracing: true, }, } @@ -323,9 +316,9 @@ func TestTelemetryLogging(t *testing.T) { telemetryMaxEventFrequency.Override(context.Background(), &s.ClusterSettings().SV, tc.stubMaxEventFrequency) for _, execTimestamp := range tc.execTimestampsSeconds { stubTime := timeutil.FromUnixMicros(int64(execTimestamp * 1e6)) - st.setTime(stubTime) - sqm.setContentionNanos(tc.contentionNanos) - sts.setTracingStatus(tc.enableTracing) + st.SetTime(stubTime) + sqm.SetQueryLevelStats(tc.queryLevelStats) + sts.SetTracingStatus(tc.enableTracing) _, err := db.DB.ExecContext(context.Background(), tc.query) if err != nil && tc.expectedErr == "" { t.Errorf("unexpected error executing query `%s`: %v", tc.query, err) @@ -490,13 +483,61 @@ func TestTelemetryLogging(t *testing.T) { } } contentionNanos := regexp.MustCompile("\"ContentionNanos\":[0-9]*") - if tc.contentionNanos > 0 && !contentionNanos.MatchString(e.Message) { + if tc.queryLevelStats.ContentionTime.Nanoseconds() > 0 && !contentionNanos.MatchString(e.Message) { // If we have contention, we expect the ContentionNanos field to be populated. t.Errorf("expected to find ContentionNanos but none was found") - } else if tc.contentionNanos == 0 && contentionNanos.MatchString(e.Message) { + } else if tc.queryLevelStats.ContentionTime.Nanoseconds() == 0 && contentionNanos.MatchString(e.Message) { // If we do not have contention, expect no ContentionNanos field. t.Errorf("expected no ContentionNanos field, but was found") } + networkBytesSent := regexp.MustCompile("\"NetworkBytesSent\":[0-9]*") + if tc.queryLevelStats.NetworkBytesSent > 0 && !networkBytesSent.MatchString(e.Message) { + // If we have sent network bytes, we expect the NetworkBytesSent field to be populated. + t.Errorf("expected to find NetworkBytesSent but none was found") + } else if tc.queryLevelStats.NetworkBytesSent == 0 && networkBytesSent.MatchString(e.Message) { + // If we have not sent network bytes, expect no NetworkBytesSent field. + t.Errorf("expected no NetworkBytesSent field, but was found") + } + maxMemUsage := regexp.MustCompile("\"MaxMemUsage\":[0-9]*") + if tc.queryLevelStats.MaxMemUsage > 0 && !maxMemUsage.MatchString(e.Message) { + // If we have a max memory usage, we expect the MaxMemUsage field to be populated. + t.Errorf("expected to find MaxMemUsage but none was found") + } else if tc.queryLevelStats.MaxMemUsage == 0 && maxMemUsage.MatchString(e.Message) { + // If we do not have a max memory usage, expect no MaxMemUsage field. + t.Errorf("expected no MaxMemUsage field, but was found") + } + maxDiskUsage := regexp.MustCompile("\"MaxDiskUsage\":[0-9]*") + if tc.queryLevelStats.MaxDiskUsage > 0 && !maxDiskUsage.MatchString(e.Message) { + // If we have a max disk usage, we expect the MaxDiskUsage field to be populated. + t.Errorf("expected to find MaxDiskUsage but none was found") + } else if tc.queryLevelStats.MaxDiskUsage == 0 && maxDiskUsage.MatchString(e.Message) { + // If we do not a max disk usage, expect no MaxDiskUsage field. + t.Errorf("expected no MaxDiskUsage field, but was found") + } + kvBytesRead := regexp.MustCompile("\"KVBytesRead\":[0-9]*") + if tc.queryLevelStats.KVBytesRead > 0 && !kvBytesRead.MatchString(e.Message) { + // If we have read bytes from KV, we expect the KVBytesRead field to be populated. + t.Errorf("expected to find KVBytesRead but none was found") + } else if tc.queryLevelStats.KVBytesRead == 0 && kvBytesRead.MatchString(e.Message) { + // If we have not read bytes from KV, expect no KVBytesRead field. + t.Errorf("expected no KVBytesRead field, but was found") + } + kvRowsRead := regexp.MustCompile("\"KVRowsRead\":[0-9]*") + if tc.queryLevelStats.KVRowsRead > 0 && !kvRowsRead.MatchString(e.Message) { + // If we have read rows from KV, we expect the KVRowsRead field to be populated. + t.Errorf("expected to find KVRowsRead but none was found") + } else if tc.queryLevelStats.KVRowsRead == 0 && kvRowsRead.MatchString(e.Message) { + // If we have not read rows from KV, expect no KVRowsRead field. + t.Errorf("expected no KVRowsRead field, but was found") + } + networkMessages := regexp.MustCompile("\"NetworkMessages\":[0-9]*") + if tc.queryLevelStats.NetworkMessages > 0 && !networkMessages.MatchString(e.Message) { + // If we have network messages, we expect the NetworkMessages field to be populated. + t.Errorf("expected to find NetworkMessages but none was found") + } else if tc.queryLevelStats.NetworkMessages == 0 && networkMessages.MatchString(e.Message) { + // If we do not have network messages, expect no NetworkMessages field. + t.Errorf("expected no NetworkMessages field, but was found") + } if tc.expectedErr != "" { if !strings.Contains(e.Message, tc.expectedErr) { t.Errorf("%s: missing error %s in message %s", tc.name, tc.expectedErr, e.Message) @@ -520,7 +561,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t) defer cleanup() - st := stubTime{} + st := logtestutils.StubTime{} s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -572,7 +613,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { // Set the time for when we issue a query to enable/disable // troubleshooting mode. setTroubleshootModeTime := timeutil.FromUnixMicros(int64(idx * 1e6)) - st.setTime(setTroubleshootModeTime) + st.SetTime(setTroubleshootModeTime) if tc.enableTroubleshootingMode { db.Exec(t, `SET troubleshooting_mode = true;`) } else { @@ -581,7 +622,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { // Advance time 1 second from previous query. Ensure enough time has passed // from when we set troubleshooting mode for this query to be sampled. setQueryTime := timeutil.FromUnixMicros(int64((idx + 1) * 1e6)) - st.setTime(setQueryTime) + st.SetTime(setQueryTime) db.Exec(t, tc.query) } diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index be1c0b94095b..6a2979323aec 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -3884,6 +3884,60 @@ func (m *SampledQuery) AppendJSONFields(printComma bool, b redact.RedactableByte b = append(b, ']') } + if m.NetworkBytesSent != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"NetworkBytesSent\":"...) + b = strconv.AppendInt(b, int64(m.NetworkBytesSent), 10) + } + + if m.MaxMemUsage != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"MaxMemUsage\":"...) + b = strconv.AppendInt(b, int64(m.MaxMemUsage), 10) + } + + if m.MaxDiskUsage != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"MaxDiskUsage\":"...) + b = strconv.AppendInt(b, int64(m.MaxDiskUsage), 10) + } + + if m.KVBytesRead != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"KVBytesRead\":"...) + b = strconv.AppendInt(b, int64(m.KVBytesRead), 10) + } + + if m.KVRowsRead != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"KVRowsRead\":"...) + b = strconv.AppendInt(b, int64(m.KVRowsRead), 10) + } + + if m.NetworkMessages != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"NetworkMessages\":"...) + b = strconv.AppendInt(b, int64(m.NetworkMessages), 10) + } + return printComma, b } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 9574ca924b77..b7cc93496bea 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -143,6 +143,24 @@ message SampledQuery { // The regions of the nodes where SQL processors ran. repeated string regions = 38 [(gogoproto.jsontag) = ',omitempty', (gogoproto.moretags) = "redact:\"nonsensitive\""]; + // The number of network bytes sent by nodes for this query. + int64 network_bytes_sent = 39 [(gogoproto.jsontag) = ',omitempty']; + + // The maximum amount of memory usage by nodes for this query. + int64 max_mem_usage = 40 [(gogoproto.jsontag) = ',omitempty']; + + // The maximum amount of disk usage by nodes for this query. + int64 max_disk_usage = 41 [(gogoproto.jsontag) = ',omitempty']; + + // The number of bytes read at the KV layer for this query. + int64 kv_bytes_read = 42 [(gogoproto.customname) = "KVBytesRead", (gogoproto.jsontag) = ',omitempty']; + + // The number of rows read at the KV layer for this query. + int64 kv_rows_read = 43 [(gogoproto.customname) = "KVRowsRead", (gogoproto.jsontag) = ',omitempty']; + + // The number of network messages sent by nodes for this query. + int64 network_messages = 44 [(gogoproto.jsontag) = ',omitempty']; + reserved 12; } diff --git a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go new file mode 100644 index 000000000000..b764f36be123 --- /dev/null +++ b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go @@ -0,0 +1,59 @@ +package logtestutils + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +type StubTime struct { + syncutil.RWMutex + t time.Time +} + +func (s *StubTime) SetTime(t time.Time) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.t = t +} + +func (s *StubTime) TimeNow() time.Time { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.t +} + +type StubQueryStats struct { + syncutil.RWMutex + stats execstats.QueryLevelStats +} + +func (s *StubQueryStats) SetQueryLevelStats(stats execstats.QueryLevelStats) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.stats = stats +} + +func (s *StubQueryStats) QueryLevelStats() execstats.QueryLevelStats { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.stats +} + +type StubTracingStatus struct { + syncutil.RWMutex + isTracing bool +} + +func (s *StubTracingStatus) SetTracingStatus(t bool) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.isTracing = t +} + +func (s *StubTracingStatus) TracingStatus() bool { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.isTracing +}