diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index c7c033d9ddb8..0ded8b5faae1 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -3081,6 +3081,7 @@ An event of type `sampled_transaction` is the event logged to telemetry at the e | `RowsRead` | RowsRead is the number of rows read from disk. | no | | `RowsWritten` | RowsWritten is the number of rows written to disk. | no | | `SampledExecStats` | SampledExecStats is a nested field containing execution statistics. This field will be omitted if the stats were not sampled. | yes | +| `SkippedTransactions` | SkippedTransactions is the number of transactions that were skipped as part of sampling prior to this one. We only count skipped transactions when telemetry logging is enabled and the sampling mode is set to "transaction". | no | #### Common fields diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index eda34c86ee9e..3223cbf2a2bd 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1562,6 +1562,10 @@ type connExecutor struct { // Note that the number of statement events emitted per transaction is // capped by the cluster setting telemetryStatementsPerTransactionMax. shouldLogToTelemetry bool + + // telemetrySkippedTxns contains the number of transactions skipped by + // telemetry logging prior to this one. + telemetrySkippedTxns uint64 } // sessionDataStack contains the user-configurable connection variables. @@ -2010,9 +2014,11 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay ex.state.mu.Lock() defer ex.state.mu.Unlock() ex.state.mu.stmtCount = 0 - tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled() - ex.extraTxnState.shouldLogToTelemetry = - ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal) + isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled() + ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns = + ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing, + ex.executorType == executorTypeInternal, + ex.applicationName.Load().(string)) } // NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 4085db8a8fdb..532bcd9002a0 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -3182,9 +3182,11 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { TxnFingerprintID: appstatspb.InvalidTransactionFingerprintID, }) - tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled() - ex.extraTxnState.shouldLogToTelemetry = - ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal) + isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled() + ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns = + ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing, + ex.executorType == executorTypeInternal, + ex.applicationName.Load().(string)) ex.state.mu.RLock() txnStart := ex.state.mu.txnStart @@ -3305,6 +3307,16 @@ func (ex *connExecutor) recordTransactionFinish( ex.maybeRecordRetrySerializableContention(ev.txnID, transactionFingerprintID, txnErr) + if ex.extraTxnState.shouldLogToTelemetry { + + ex.planner.logTransaction(ctx, + int(ex.extraTxnState.txnCounter.Load()), + transactionFingerprintID, + &recordedTxnStats, + ex.extraTxnState.telemetrySkippedTxns, + ) + } + return ex.statsCollector.RecordTransaction( ctx, transactionFingerprintID, diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 1c39244d0cdb..d57733a739cc 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -179,7 +179,7 @@ func (p *planner) maybeLogStatementInternal( slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV) auditEventsDetected := len(p.curPlan.auditEventBuilders) != 0 logConsoleQuery := telemetryInternalConsoleQueriesEnabled.Get(&p.execCfg.Settings.SV) && - strings.HasPrefix(p.SessionData().ApplicationName, "$ internal-console") + strings.HasPrefix(p.SessionData().ApplicationName, internalConsoleAppName) // We only consider non-internal SQL statements for telemetry logging unless // the telemetryInternalQueriesEnabled is true. @@ -355,7 +355,10 @@ func (p *planner) maybeLogStatementInternal( }) } - sampledQuery := eventpb.SampledQuery{ + sampledQuery := getSampledQuery() + defer releaseSampledQuery(sampledQuery) + + *sampledQuery = eventpb.SampledQuery{ CommonSQLExecDetails: execDetails, SkippedQueries: skippedQueries, CostEstimate: p.curPlan.instrumentation.costEstimate, @@ -431,10 +434,90 @@ func (p *planner) maybeLogStatementInternal( SchemaChangerMode: p.curPlan.instrumentation.schemaChangerMode.String(), } - p.logOperationalEventsOnlyExternally(ctx, &sampledQuery) + p.logOperationalEventsOnlyExternally(ctx, sampledQuery) } } +// logTransaction records the current transaction to the TELEMETRY channel. +func (p *planner) logTransaction( + ctx context.Context, + txnCounter int, + txnFingerprintID appstatspb.TransactionFingerprintID, + txnStats *sqlstats.RecordedTxnStats, + skippedTransactions uint64, +) { + + // Redact error messages. + var execErrStr, retryErr redact.RedactableString + sqlErrState := "" + if txnStats.TxnErr != nil { + execErrStr = redact.Sprint(txnStats.TxnErr) + sqlErrState = pgerror.GetPGCode(txnStats.TxnErr).String() + } + + if txnStats.AutoRetryReason != nil { + retryErr = redact.Sprint(txnStats.AutoRetryReason) + } + + sampledTxn := getSampledTransaction() + defer releaseSampledTransaction(sampledTxn) + + *sampledTxn = eventpb.SampledTransaction{ + SkippedTransactions: int64(skippedTransactions), + User: txnStats.SessionData.SessionUser().Normalized(), + ApplicationName: txnStats.SessionData.ApplicationName, + TxnCounter: uint32(txnCounter), + SessionID: txnStats.SessionID.String(), + TransactionID: txnStats.TransactionID.String(), + TransactionFingerprintID: txnFingerprintID, + Committed: txnStats.Committed, + ImplicitTxn: txnStats.ImplicitTxn, + StartTimeUnixNanos: txnStats.StartTime.UnixNano(), + EndTimeUnixNanos: txnStats.EndTime.UnixNano(), + ServiceLatNanos: txnStats.ServiceLatency.Nanoseconds(), + SQLSTATE: sqlErrState, + ErrorText: execErrStr, + NumRetries: txnStats.RetryCount, + LastAutoRetryReason: retryErr, + StatementFingerprintIDs: txnStats.StatementFingerprintIDs, + NumRows: int64(txnStats.RowsAffected), + RetryLatNanos: txnStats.RetryLatency.Nanoseconds(), + CommitLatNanos: txnStats.CommitLatency.Nanoseconds(), + IdleLatNanos: txnStats.IdleLatency.Nanoseconds(), + BytesRead: txnStats.BytesRead, + RowsRead: txnStats.RowsRead, + RowsWritten: txnStats.RowsWritten, + } + + if txnStats.CollectedExecStats { + sampledTxn.SampledExecStats = &eventpb.SampledExecStats{ + NetworkBytes: txnStats.ExecStats.NetworkBytesSent, + MaxMemUsage: txnStats.ExecStats.MaxMemUsage, + ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()), + NetworkMessages: txnStats.ExecStats.NetworkMessages, + MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage, + CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(), + MVCCIteratorStats: eventpb.MVCCIteratorStats{ + StepCount: txnStats.ExecStats.MvccSteps, + StepCountInternal: txnStats.ExecStats.MvccStepsInternal, + SeekCount: txnStats.ExecStats.MvccSeeks, + SeekCountInternal: txnStats.ExecStats.MvccSeeksInternal, + BlockBytes: txnStats.ExecStats.MvccBlockBytes, + BlockBytesInCache: txnStats.ExecStats.MvccBlockBytesInCache, + KeyBytes: txnStats.ExecStats.MvccKeyBytes, + ValueBytes: txnStats.ExecStats.MvccValueBytes, + PointCount: txnStats.ExecStats.MvccPointCount, + PointsCoveredByRangeTombstones: txnStats.ExecStats.MvccPointsCoveredByRangeTombstones, + RangeKeyCount: txnStats.ExecStats.MvccRangeKeyCount, + RangeKeyContainedPoints: txnStats.ExecStats.MvccRangeKeyContainedPoints, + RangeKeySkippedPoints: txnStats.ExecStats.MvccRangeKeySkippedPoints, + }, + } + } + + log.StructuredEvent(ctx, sampledTxn) +} + func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) { // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. diff --git a/pkg/sql/telemetry_datadriven_test.go b/pkg/sql/telemetry_datadriven_test.go index ebeaa04e51a2..3e243e102af4 100644 --- a/pkg/sql/telemetry_datadriven_test.go +++ b/pkg/sql/telemetry_datadriven_test.go @@ -12,6 +12,7 @@ package sql import ( "context" + "fmt" "strconv" "strings" "testing" @@ -48,8 +49,9 @@ import ( * - unixSecs: sets the current time used for telemetry log sampling to the given unix time in seconds. * If omitted, the current time is automatically changed by 0.1 seconds. * - restartUnixSecs: sets the stub time on txn restarts. - * - * tracing: sets the tracing status to the given value + * - tracing: sets the tracing status to the given value. If omitted, the tracing status is set to false. + * - useRealTracing: if set, the real tracing status is used instead of the stubbed one. + * - user: sets the user for the connection. If omitted, the root user is used. * * reset-last-sampled: resets the last sampled time. */ @@ -65,18 +67,30 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { ctx := context.Background() stmtSpy := logtestutils.NewSampledQueryLogScrubVolatileFields(t) stmtSpy.AddFilter(func(ev logpb.Entry) bool { - return strings.Contains(ev.Message, appName) + return strings.Contains(ev.Message, appName) || strings.Contains(ev.Message, internalConsoleAppName) }) cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() + txnsSpy := logtestutils.NewSampledTransactionLogScrubVolatileFields(t) + txnsSpy.AddFilter(func(ev logpb.Entry) bool { + return strings.Contains(ev.Message, appName) || strings.Contains(ev.Message, internalConsoleAppName) + }) + cleanupTxnSpy := log.InterceptWith(ctx, txnsSpy) + defer cleanupTxnSpy() + datadriven.Walk(t, datapathutils.TestDataPath(t, "telemetry_logging/logging"), func(t *testing.T, path string) { stmtSpy.Reset() + txnsSpy.Reset() st := logtestutils.StubTime{} st.SetTime(timeutil.FromUnixMicros(0)) sts := logtestutils.StubTracingStatus{} stubTimeOnRestart := int64(0) + telemetryKnobs := &TelemetryLoggingTestingKnobs{ + getTimeNow: st.TimeNow, + getTracingStatus: sts.TracingStatus, + } tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -85,10 +99,7 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { st.SetTime(timeutil.FromUnixMicros(stubTimeOnRestart * 1e6)) }, }, - TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ - getTimeNow: st.TimeNow, - getTracingStatus: sts.TracingStatus, - }, + TelemetryLoggingKnobs: telemetryKnobs, }, }, }) @@ -97,12 +108,21 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { telemetryLogging := s.SQLServer().(*Server).TelemetryLoggingMetrics setupConn := s.SQLConn(t) + _, err := setupConn.Exec("CREATE USER testuser") + require.NoError(t, err) + + spiedConnRootUser := s.SQLConn(t) + spiedConnTestUser := s.SQLConn(t, serverutils.User("testuser")) + spiedConn := spiedConnRootUser - spiedConn := s.SQLConn(t) - _, err := spiedConn.Exec("SET application_name = $1", appName) + // Set spied connections to the app name observed by the log spy. + _, err = spiedConn.Exec("SET application_name = $1", appName) + require.NoError(t, err) + _, err = spiedConnTestUser.Exec("SET application_name = $1", appName) require.NoError(t, err) datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + spiedConn = spiedConnRootUser switch d.Cmd { case "exec-sql": sts.SetTracingStatus(false) @@ -112,13 +132,22 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { } return "" case "spy-sql": - logCount := stmtSpy.Count() + stmtLogCount := stmtSpy.Count() + txnLogCount := txnsSpy.Count() var stubTimeUnixSecs float64 - var tracing bool + var tracing, useRealTracing bool d.MaybeScanArgs(t, "tracing", &tracing) sts.SetTracingStatus(tracing) + d.MaybeScanArgs(t, "useRealTracing", &useRealTracing) + if useRealTracing { + telemetryKnobs.getTracingStatus = nil + defer func() { + telemetryKnobs.getTracingStatus = sts.TracingStatus + }() + } + // Set stubbed stubbed time if this txn is restarted. scanned := d.MaybeScanArgs(t, "restartUnixSecs", &stubTimeOnRestart) if !scanned { @@ -136,18 +165,38 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { } st.SetTime(timeutil.FromUnixMicros(stubTimeMicros)) + // Setup the sql user. + user := "root" + d.MaybeScanArgs(t, "user", &user) + switch user { + case "root": + case "testuser": + spiedConn = spiedConnTestUser + } + // Execute query input. _, err := spiedConn.Exec(d.Input) + var sb strings.Builder + if err != nil { - return err.Error() + sb.WriteString(err.Error()) + sb.WriteString("\n") + } + + newStmtLogCount := stmtSpy.Count() + sb.WriteString(stmtSpy.GetLastNLogs(newStmtLogCount - stmtLogCount)) + if newStmtLogCount > stmtLogCount { + sb.WriteString("\n") } - // Display any new statement logs have been generated since executing the query. - newLogCount := stmtSpy.Count() - return stmtSpy.GetLastNLogs(newLogCount - logCount) + newTxnLogCount := txnsSpy.Count() + sb.WriteString(txnsSpy.GetLastNLogs(newTxnLogCount - txnLogCount)) + return sb.String() case "reset-last-sampled": telemetryLogging.resetLastSampledTime() return "" + case "show-skipped-transactions": + return strconv.FormatUint(telemetryLogging.getSkippedTransactionCount(), 10) default: t.Fatal("unknown command") return "" @@ -185,8 +234,9 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { * shouldEmitTransactionLog: calls shouldEmitTransactionLog with the provided arguments * Args: * - unixSecs: stubbed sampling time in unix seconds - * - force: value for the 'force' param + * - isTracing: value for the 'isTracing' param * - isInternal: value for the 'isInternal' param + * - appName: value for the 'appName' param */ func TestTelemetryLoggingDecision(t *testing.T) { defer leaktest.AfterTest(t)() @@ -201,11 +251,12 @@ func TestTelemetryLoggingDecision(t *testing.T) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { t.Cleanup(func() { telemetryLogging.resetLastSampledTime() + telemetryLogging.resetCounters() }) switch d.Cmd { case "set-cluster-settings": - var telemLoggingEnabled, internalStmtsEnabled bool + var telemLoggingEnabled, internalStmtsEnabled, consoleQueriesEnabled bool var samplingMode string var stmtSampleFreq, txnSampleFreq int stmtsPerTxnMax := 10 @@ -215,6 +266,7 @@ func TestTelemetryLoggingDecision(t *testing.T) { d.MaybeScanArgs(t, "txnSampleFreq", &txnSampleFreq) d.MaybeScanArgs(t, "stmtsPerTxnMax", &stmtsPerTxnMax) d.MaybeScanArgs(t, "internalStmtsOn", &internalStmtsEnabled) + d.MaybeScanArgs(t, "telemetryInternalConsoleQueriesEnabled", &consoleQueriesEnabled) mode := telemetryModeStatement if samplingMode == "transaction" { @@ -226,6 +278,7 @@ func TestTelemetryLoggingDecision(t *testing.T) { telemetryTransactionSamplingFrequency.Override(ctx, &cs.SV, int64(txnSampleFreq)) telemetryLoggingEnabled.Override(ctx, &cs.SV, telemLoggingEnabled) telemetryInternalQueriesEnabled.Override(ctx, &cs.SV, internalStmtsEnabled) + telemetryInternalConsoleQueriesEnabled.Override(ctx, &cs.SV, consoleQueriesEnabled) return "" case "shouldEmitStatementLog": @@ -238,18 +291,20 @@ func TestTelemetryLoggingDecision(t *testing.T) { d.ScanArgs(t, "force", &force) st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6))) - shouldEmit, _ := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force) - return strconv.FormatBool(shouldEmit) + shouldEmit, skipped := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force) + return fmt.Sprintf(`emit: %t, skippedQueries: %d`, shouldEmit, skipped) case "shouldEmitTransactionLog": var unixSecs float64 - var force, isInternal bool + var isTracing, isInternal bool + var appName string d.ScanArgs(t, "unixSecs", &unixSecs) - d.ScanArgs(t, "force", &force) + d.ScanArgs(t, "isTracing", &isTracing) d.MaybeScanArgs(t, "isInternal", &isInternal) st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6))) + d.ScanArgs(t, "appName", &appName) - shouldEmit := telemetryLogging.shouldEmitTransactionLog(force, isInternal) - return strconv.FormatBool(shouldEmit) + shouldEmit, skipped := telemetryLogging.shouldEmitTransactionLog(isTracing, isInternal, appName) + return fmt.Sprintf(`emit: %t, skippedTxns: %d`, shouldEmit, skipped) default: t.Fatal("unknown command") return "" diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 4ce7f932c0dc..6554d940f0ef 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -11,12 +11,15 @@ package sql import ( + "strings" + "sync" "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -24,7 +27,10 @@ import ( // Default value used to designate the maximum frequency at which events // are logged to the telemetry channel. -const defaultMaxEventFrequency = 8 +const ( + internalConsoleAppName = "$ internal-console" + defaultMaxEventFrequency = 8 +) var TelemetryMaxStatementEventFrequency = settings.RegisterIntSetting( settings.ApplicationLevel, @@ -96,6 +102,40 @@ var telemetrySamplingMode = settings.RegisterEnumSetting( settings.WithPublic, ) +// SampledQuery objects are short-lived but can be +// allocated frequently if logging frequency is high. +var sampledQueryPool = sync.Pool{ + New: func() interface{} { + return new(eventpb.SampledQuery) + }, +} + +func getSampledQuery() *eventpb.SampledQuery { + return sampledQueryPool.Get().(*eventpb.SampledQuery) +} + +func releaseSampledQuery(sq *eventpb.SampledQuery) { + *sq = eventpb.SampledQuery{} + sampledQueryPool.Put(sq) +} + +// SampledTransaction objects are short-lived but can be +// allocated frequently if logging frequency is high. +var sampledTransactionPool = sync.Pool{ + New: func() interface{} { + return new(eventpb.SampledTransaction) + }, +} + +func getSampledTransaction() *eventpb.SampledTransaction { + return sampledTransactionPool.Get().(*eventpb.SampledTransaction) +} + +func releaseSampledTransaction(st *eventpb.SampledTransaction) { + *st = eventpb.SampledTransaction{} + sampledTransactionPool.Put(st) +} + // TelemetryLoggingMetrics keeps track of the last time at which an event // was sampled to the telemetry channel, and the number of skipped events // since the last sampled event. @@ -125,6 +165,9 @@ type telemetryLoggingMetrics struct { // skippedQueryCount is used to produce the count of non-sampled queries. skippedQueryCount atomic.Uint64 + + // skippedTransactionCount is used to produce the count of non-sampled transactions. + skippedTransactionCount atomic.Uint64 } func newTelemetryLoggingMetrics( @@ -163,32 +206,39 @@ func NewTelemetryLoggingTestingKnobs( // and at least one of the following conditions is true: // - the transaction is not internal OR internal queries are enabled // - the required amount of time has elapsed since the last transaction began sampling -// - the force parameter is true and the transaction is not internal or sampling for internal statements -// is enabled +// - the transaction is from the console and telemetryInternalConsoleQueriesEnabled is true +// - the transaction has tracing enabled // // If the conditions are met, the last sampled time is updated. -func (t *telemetryLoggingMetrics) shouldEmitTransactionLog(forceSampling, isInternal bool) bool { +func (t *telemetryLoggingMetrics) shouldEmitTransactionLog( + isTracing, isInternal bool, applicationName string, +) (emit bool, skippedTxns uint64) { + // We should not increase the skipped transaction count if telemetry logging is disabled. if !telemetryLoggingEnabled.Get(&t.st.SV) { - return false + return false, t.skippedTransactionCount.Load() } if telemetrySamplingMode.Get(&t.st.SV) != telemetryModeTransaction { - return false + return false, t.skippedTransactionCount.Load() } - if isInternal && !telemetryInternalQueriesEnabled.Get(&t.st.SV) { - return false + logConsoleQuery := telemetryInternalConsoleQueriesEnabled.Get(&t.st.SV) && + strings.HasPrefix(applicationName, internalConsoleAppName) + if !logConsoleQuery && isInternal && !telemetryInternalQueriesEnabled.Get(&t.st.SV) { + return false, t.skippedTransactionCount.Load() } maxEventFrequency := telemetryTransactionSamplingFrequency.Get(&t.st.SV) if maxEventFrequency == 0 { - return false + return false, t.skippedTransactionCount.Load() } txnSampleTime := t.timeNow() + tracingEnabled := t.isTracing(nil, isTracing) - if forceSampling { + if logConsoleQuery || tracingEnabled { + // Force log. t.mu.Lock() defer t.mu.Unlock() t.mu.lastSampledTime = txnSampleTime - return true + return true, t.skippedTransactionCount.Swap(0) } requiredTimeElapsed := time.Second / time.Duration(maxEventFrequency) @@ -201,19 +251,19 @@ func (t *telemetryLoggingMetrics) shouldEmitTransactionLog(forceSampling, isInte }() if !enoughTimeElapsed { - return false + return false, t.skippedTransactionCount.Add(1) } t.mu.Lock() defer t.mu.Unlock() if txnSampleTime.Sub(t.mu.lastSampledTime) < requiredTimeElapsed { - return false + return false, t.skippedTransactionCount.Add(1) } t.mu.lastSampledTime = txnSampleTime - return true + return true, t.skippedTransactionCount.Swap(0) } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. @@ -333,3 +383,13 @@ func (t *telemetryLoggingMetrics) resetLastSampledTime() { defer t.mu.Unlock() t.mu.lastSampledTime = time.Time{} } + +// resetCounters resets the skipped query and transaction counters +func (t *telemetryLoggingMetrics) resetCounters() { + t.skippedQueryCount.Swap(0) + t.skippedTransactionCount.Swap(0) +} + +func (t *telemetryLoggingMetrics) getSkippedTransactionCount() uint64 { + return t.skippedTransactionCount.Load() +} diff --git a/pkg/sql/testdata/telemetry_logging/logging/force_logging b/pkg/sql/testdata/telemetry_logging/logging/force_logging new file mode 100644 index 000000000000..1e46c966c952 --- /dev/null +++ b/pkg/sql/testdata/telemetry_logging/logging/force_logging @@ -0,0 +1,254 @@ +subtest txn_mode_tracing_on +# When tracing is on we should log all execution events. + +exec-sql +SET CLUSTER SETTING sql.telemetry.transaction_sampling.max_event_frequency = 10; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 100; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "transaction"; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true; +---- + +exec-sql +CREATE TABLE t() +---- + +spy-sql unixSecs=0.1 +SET TRACING = ON; +---- + +spy-sql unixSecs=0.1 useRealTracing=true +SELECT * FROM t LIMIT 1; +---- +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "full", + "EventType": "sampled_query", + "PlanGist": "AgHQAQIAAAAAAg==", + "ScanCount": 1, + "Statement": "SELECT * FROM \"\".\"\".t LIMIT ‹1›", + "StatementFingerprintID": 17888549871399373000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 17888549871399373000 + ], + "TransactionFingerprintID": 6278959165882708000, + "User": "root" +} + +spy-sql unixSecs=0.1 useRealTracing=true +SELECT 1, 2, 3; +---- +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "NumRows": 1, + "OutputRowsEstimate": 1, + "PlanGist": "AgICBgYG", + "Statement": "SELECT ‹1›, ‹2›, ‹3›", + "StatementFingerprintID": 16698501115058330000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16698501115058330000 + ], + "TransactionFingerprintID": 5250989384299556000, + "User": "root" +} + +spy-sql unixSecs=0.1 useRealTracing=true +SELECT 'hello'; +---- +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "NumRows": 1, + "OutputRowsEstimate": 1, + "PlanGist": "AgICAgYC", + "Statement": "SELECT ‹'hello'›", + "StatementFingerprintID": 15578946620736494000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 15578946620736494000 + ], + "TransactionFingerprintID": 8597429024882746000, + "User": "root" +} + +spy-sql unixSecs=0.1 +SET TRACING = off; +---- + +subtest end + +subtest txn_mode_console_query +# When tracing is on we should log all execution events. + +exec-sql +SET CLUSTER SETTING sql.telemetry.transaction_sampling.max_event_frequency = 10; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 100; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "transaction"; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true; +---- + +spy-sql unixSecs=0.1 +SET application_name = '$ internal-console-app'; +---- +{ + "ApplicationName": "$ internal-console-app", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "PlanGist": "Ais=", + "Statement": "SET application_name = ‹'$ internal-console-app'›", + "StatementFingerprintID": 13549091137440920000, + "StmtPosInTxn": 1, + "Tag": "SET", + "User": "root" +} + +spy-sql unixSecs=0.1 +SELECT * FROM t LIMIT 1; +---- +{ + "ApplicationName": "$ internal-console-app", + "Database": "defaultdb", + "Distribution": "full", + "EventType": "sampled_query", + "PlanGist": "AgHQAQIAAAAAAg==", + "ScanCount": 1, + "Statement": "SELECT * FROM \"\".\"\".t LIMIT ‹1›", + "StatementFingerprintID": 17888549871399373000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "$ internal-console-app", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 5, + "StatementFingerprintIDs": [ + 17888549871399373000 + ], + "TransactionFingerprintID": 6278959165882708000, + "User": "root" +} + +spy-sql unixSecs=0.1 +SELECT 1, 2, 3; +---- +{ + "ApplicationName": "$ internal-console-app", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "NumRows": 1, + "OutputRowsEstimate": 1, + "PlanGist": "AgICBgYG", + "Statement": "SELECT ‹1›, ‹2›, ‹3›", + "StatementFingerprintID": 16698501115058330000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "$ internal-console-app", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16698501115058330000 + ], + "TransactionFingerprintID": 5250989384299556000, + "User": "root" +} + +spy-sql unixSecs=0.1 +SELECT 'hello'; +---- +{ + "ApplicationName": "$ internal-console-app", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "NumRows": 1, + "OutputRowsEstimate": 1, + "PlanGist": "AgICAgYC", + "Statement": "SELECT ‹'hello'›", + "StatementFingerprintID": 15578946620736494000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "$ internal-console-app", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 15578946620736494000 + ], + "TransactionFingerprintID": 8597429024882746000, + "User": "root" +} + +subtest end diff --git a/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit b/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit index b659c487db0a..5cf2d9392b84 100644 --- a/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit +++ b/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit @@ -32,6 +32,20 @@ BEGIN; SELECT 1; SELECT 2; COMMIT; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 2, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 14263644654231036000, + "User": "root" +} spy-sql SELECT 5; @@ -51,6 +65,19 @@ SELECT 5; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16085855936700856000 + ], + "TransactionFingerprintID": 8097417102642120000, + "User": "root" +} exec-sql SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 3; @@ -102,6 +129,23 @@ BEGIN; SELECT 1; SELECT 2; SELECT 3; SELECT 4; COMMIT; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 4, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000, + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 63991751604173224, + "User": "root" +} exec-sql SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 4; @@ -167,3 +211,20 @@ BEGIN; SELECT 1; SELECT 2; SELECT 3; SELECT 4; COMMIT; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 4, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000, + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 63991751604173224, + "User": "root" +} diff --git a/pkg/sql/testdata/telemetry_logging/logging/transaction_mode b/pkg/sql/testdata/telemetry_logging/logging/transaction_mode index cf6b9783c397..48f49ebe8c18 100644 --- a/pkg/sql/testdata/telemetry_logging/logging/transaction_mode +++ b/pkg/sql/testdata/telemetry_logging/logging/transaction_mode @@ -1,5 +1,7 @@ # Test that transaction mode for telemetry logging logs all stmts for a tracked txn. +subtest transaction_mode_events + exec-sql SET CLUSTER SETTING sql.telemetry.transaction_sampling.max_event_frequency = 10; ---- @@ -47,6 +49,19 @@ BEGIN; TRUNCATE t; COMMIT "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 802230861479752300 + ], + "TransactionFingerprintID": 11835923794509859000, + "User": "root" +} # Note that because we skip BEGIN statements in transaction mode, the first # statement logged from this transaction has SkippedQueries=1 even though @@ -108,6 +123,21 @@ BEGIN; SELECT 1; SELECT 2; SELECT 3; COMMIT "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 3, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 17099066530943440000, + "User": "root" +} # Skipped due to not enough time elapsed. spy-sql unixSecs=1 @@ -133,6 +163,20 @@ SELECT 1, 2 "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 1569305422589581000 + ], + "TransactionFingerprintID": 13449146770429469000, + "User": "root" +} # Skipped due to not enough time elapsed. spy-sql unixSecs=1.05 @@ -162,6 +206,20 @@ SELECT * FROM t LIMIT 2 "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 2, + "StatementFingerprintIDs": [ + 17888549871399373000 + ], + "TransactionFingerprintID": 6278959165882708000, + "User": "root" +} # Skipped, not enough time elapsed. spy-sql unixSecs=1.15 @@ -210,6 +268,23 @@ BEGIN; SELECT * FROM t LIMIT 4; SELECT * FROM t LIMIT 5; COMMIT "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 17888549871399373000, + 17888549871399373000 + ], + "TransactionFingerprintID": 5223910247858315000, + "User": "root" +} + +subtest end subtest txn_retry_is_sampled @@ -241,6 +316,20 @@ SELECT 1, 2, 3 "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 16698501115058330000 + ], + "TransactionFingerprintID": 5250989384299556000, + "User": "root" +} # The stub time will only be advanced in the retry. # Note that the crdb_internal.force_retry will always appear in the logs due to @@ -294,5 +383,66 @@ COMMIT; "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "LastAutoRetryReason": "crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry()", + "NumRetries": 1, + "NumRows": 2, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 15578946620736494000, + 9891387630896048000 + ], + "TransactionFingerprintID": 823601646009140400, + "User": "root" +} + +subtest end + +subtest redacts_sensitive_info + +reset-last-sampled +---- + + +# testuser name should be redacted +spy-sql user=testuser unixSecs=1 +SELECT 1/0 +---- +pq: division by zero +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "local", + "ErrorText": "division by zero", + "EventType": "sampled_query", + "OutputRowsEstimate": 1, + "PlanGist": "AgICAgYC", + "SQLSTATE": "22012", + "Statement": "SELECT ‹1› / ‹0›", + "StatementFingerprintID": 16154612194363576000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "‹testuser›" +} +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": false, + "ErrorText": "division by zero", + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SQLSTATE": "22012", + "StatementFingerprintIDs": [ + 16154612194363576000 + ], + "TransactionFingerprintID": 5715924995226314000, + "User": "‹testuser›" +} subtest end diff --git a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog index a85c06980f2b..aceadf52d0b6 100644 --- a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog +++ b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog @@ -6,19 +6,19 @@ set-cluster-settings telemetryLoggingEnabled=false samplingMode=statement stmtSa shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=true ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=2 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 subtest end @@ -30,19 +30,19 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=statement stmtSam shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=true ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=2 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 subtest end @@ -54,31 +54,31 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=statement stmtSam shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=false ---- -true +emit: true, skippedQueries: 0 shouldEmitStatementLog unixSecs=1.05 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 1 # Force should override the sampling frequency. shouldEmitStatementLog unixSecs=1.05 isTrackedTxn=false stmtNum=1 force=true ---- -true +emit: true, skippedQueries: 1 # Tracked txn is true but we are not in txn mode. shouldEmitStatementLog unixSecs=1.1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 1 # Enough time has elapsed. stmtNum is greater than stmtsPerTxnMax but we are not in txn mode. shouldEmitStatementLog unixSecs=1.15 isTrackedTxn=false stmtNum=1000 force=false ---- -true +emit: true, skippedQueries: 1 # Enough time has elapsed. shouldEmitStatementLog unixSecs=2 isTrackedTxn=false stmtNum=1 force=false ---- -true +emit: true, skippedQueries: 0 subtest end @@ -91,35 +91,35 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction stmtS shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -true +emit: true, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=2 force=false ---- -true +emit: true, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=3 force=false ---- -true +emit: true, skippedQueries: 0 # Enough time has elapsed but stmtNum is greater than stmtsPerTxnMax. shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=4 force=false ---- -false +emit: false, skippedQueries: 1 # stmtNum is greater than stmtsPerTxnMax but force is true. shouldEmitStatementLog unixSecs=4 isTrackedTxn=true stmtNum=4 force=true ---- -true +emit: true, skippedQueries: 1 # Enough time has elapsed but txn is not tracked. shouldEmitStatementLog unixSecs=5 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 1 # Txn is not tracked but force is true. shouldEmitStatementLog unixSecs=6 isTrackedTxn=false stmtNum=1 force=true ---- -true +emit: true, skippedQueries: 1 subtest end @@ -131,27 +131,26 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction stmtS shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=2 isTrackedTxn=true stmtNum=2 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=3 isTrackedTxn=true stmtNum=3 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=4 isTrackedTxn=true stmtNum=4 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=5 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=6 isTrackedTxn=false stmtNum=1 force=true ---- -false +emit: false, skippedQueries: 0 subtest end - diff --git a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog index 9f4ee54cb81e..859e52f97b4f 100644 --- a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog +++ b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog @@ -5,21 +5,21 @@ subtest telemetry_disabled set-cluster-settings telemetryLoggingEnabled=false samplingMode=transaction stmtSampleFreq=10 txnSampleFreq=10 ---- -shouldEmitTransactionLog unixSecs=1 force=false +shouldEmitTransactionLog unixSecs=1 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=2 force=true +shouldEmitTransactionLog unixSecs=2 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=3 force=false +shouldEmitTransactionLog unixSecs=3 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=true +shouldEmitTransactionLog unixSecs=4 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 subtest end @@ -28,21 +28,21 @@ subtest telemetry_enabled_txn_freq_0 set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction txnSampleFreq=0 ---- -shouldEmitTransactionLog unixSecs=1 force=false +shouldEmitTransactionLog unixSecs=1 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=2 force=true +shouldEmitTransactionLog unixSecs=2 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=3 force=false +shouldEmitTransactionLog unixSecs=3 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=true +shouldEmitTransactionLog unixSecs=4 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 subtest end @@ -51,21 +51,21 @@ subtest telemetry_enabled_txn_mode_off set-cluster-settings telemetryLoggingEnabled=true samplingMode=statement txnSampleFreq=10 ---- -shouldEmitTransactionLog unixSecs=1 force=false +shouldEmitTransactionLog unixSecs=1 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=2 force=true +shouldEmitTransactionLog unixSecs=2 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=3 force=false +shouldEmitTransactionLog unixSecs=3 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=true +shouldEmitTransactionLog unixSecs=4 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 subtest end @@ -74,34 +74,59 @@ subtest telemetry_enabled_txn_mode_on set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction txnSampleFreq=1 ---- -shouldEmitTransactionLog unixSecs=1 force=false +shouldEmitTransactionLog unixSecs=1 isTracing=false appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 # Not enough time elapsed. -shouldEmitTransactionLog unixSecs=1.5 force=false +shouldEmitTransactionLog unixSecs=1.5 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 1 # Enough time elapsed. -shouldEmitTransactionLog unixSecs=2 force=false +shouldEmitTransactionLog unixSecs=2 isTracing=false appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 1 -# Force should override the sampling frequency. -shouldEmitTransactionLog unixSecs=2.5 force=true +# isTracing should override the sampling frequency. +shouldEmitTransactionLog unixSecs=2.5 isTracing=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 # Sampling internal queries is off. -shouldEmitTransactionLog unixSecs=4 force=false isInternal=true +shouldEmitTransactionLog unixSecs=4 isTracing=false isInternal=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=true isInternal=true +shouldEmitTransactionLog unixSecs=4 isTracing=true isInternal=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 + +subtest end + +subtest log_internal_console_queries +# Transactions with app name prefix '$ internal-console' should be force logged when +# sql.telemetry.query_sampling.internal_console.enabled is on. + +set-cluster-settings telemetryLoggingEnabled=true telemetryInternalConsoleQueriesEnabled=false samplingMode=transaction txnSampleFreq=1 +---- + +# Should return false because telemetryInternalConsoleQueriesEnabled is false and internal queries logging is off. +shouldEmitTransactionLog unixSecs=1 isTracing=false isInternal=true appName="$ internal-console" +---- +emit: false, skippedTxns: 0 + +set-cluster-settings telemetryLoggingEnabled=true telemetryInternalConsoleQueriesEnabled=true samplingMode=transaction txnSampleFreq=1 +---- + +shouldEmitTransactionLog unixSecs=1 isTracing=false isInternal=true appName=($ internal-console) +---- +emit: true, skippedTxns: 0 + +shouldEmitTransactionLog unixSecs=1 isTracing=false isInternal=true appName=($ internal-console) +---- +emit: true, skippedTxns: 0 subtest end @@ -111,32 +136,32 @@ subtest telemetry_enabled_txn_mode_on_internal_queries_on set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction txnSampleFreq=1 internalStmtsOn=true ---- -shouldEmitTransactionLog unixSecs=1 force=false isInternal=true +shouldEmitTransactionLog unixSecs=1 isTracing=false isInternal=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=1.5 force=false isInternal=true +shouldEmitTransactionLog unixSecs=1.5 isTracing=false isInternal=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 1 -shouldEmitTransactionLog unixSecs=2 force=false isInternal=true +shouldEmitTransactionLog unixSecs=2 isTracing=false isInternal=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 1 -shouldEmitTransactionLog unixSecs=2.5 force=true isInternal=true +shouldEmitTransactionLog unixSecs=2.5 isTracing=true isInternal=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=2.5 force=true isInternal=true +shouldEmitTransactionLog unixSecs=2.5 isTracing=true isInternal=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=false isInternal=false +shouldEmitTransactionLog unixSecs=4 isTracing=false isInternal=false appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=5 force=true isInternal=false +shouldEmitTransactionLog unixSecs=5 isTracing=true isInternal=false appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 subtest end diff --git a/pkg/util/log/eventpb/eventpbgen/gen.go b/pkg/util/log/eventpb/eventpbgen/gen.go index 912ec436535e..c1b70874165b 100644 --- a/pkg/util/log/eventpb/eventpbgen/gen.go +++ b/pkg/util/log/eventpb/eventpbgen/gen.go @@ -84,6 +84,7 @@ type fieldInfo struct { Inherited bool IsEnum bool AllowZeroValue bool + Nullable bool } var ( @@ -380,6 +381,8 @@ func readInput( return errors.Newf("unknown field definition syntax: %q", line) } + notNullable := notNullableRe.MatchString(line) + // Allow zero values if the field is annotated with 'includeempty'. allowZeroValue := strings.Contains(line, "includeempty") @@ -461,6 +464,7 @@ func readInput( MixedRedactable: mixed, IsEnum: isEnum, AllowZeroValue: allowZeroValue, + Nullable: !notNullable, } curMsg.Fields = append(curMsg.Fields, fi) curMsg.AllFields = append(curMsg.AllFields, fi) @@ -492,6 +496,8 @@ var fieldDefRe = regexp.MustCompile(`\s*(?P[a-z._A-Z0-9]+)` + var reservedDefRe = regexp.MustCompile(`\s*(reserved ([1-9][0-9]*);)`) +var notNullableRe = regexp.MustCompile(`\s*\(\s*gogoproto\.nullable\s*\)\s*=\s*false`) + func camelToSnake(typeName string) string { var res strings.Builder res.WriteByte(typeName[0] + 'a' - 'A') @@ -712,13 +718,17 @@ func (m *{{.GoType}}) AppendJSONFields(printComma bool, b redact.RedactableBytes } } {{- else if eq .FieldType "nestedMessage"}} -if m.{{.FieldName}} != nil { + {{ if .Nullable -}} + if m.{{.FieldName}} != nil { + {{- end }} if printComma { b = append(b, ',')}; printComma = true b = append(b, "\"{{.FieldName}}\":"...) b = append(b, '{') printComma, b = m.{{.FieldName}}.AppendJSONFields(false, b) b = append(b, '}') + {{ if .Nullable -}} } + {{- end }} {{- else}} {{ error .FieldType }} {{- end}} diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index de5291200178..ea3e473b0b47 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -4301,16 +4301,14 @@ func (m *SampledExecStats) AppendJSONFields(printComma bool, b redact.Redactable b = append(b, "\"CPUSQLNanos\":"...) b = strconv.AppendInt(b, int64(m.CPUSQLNanos), 10) - if m.MVCCIteratorStats != nil { - if printComma { - b = append(b, ',') - } - printComma = true - b = append(b, "\"MVCCIteratorStats\":"...) - b = append(b, '{') - printComma, b = m.MVCCIteratorStats.AppendJSONFields(false, b) - b = append(b, '}') + if printComma { + b = append(b, ',') } + printComma = true + b = append(b, "\"MVCCIteratorStats\":"...) + b = append(b, '{') + printComma, b = m.MVCCIteratorStats.AppendJSONFields(false, b) + b = append(b, '}') return printComma, b } @@ -5227,6 +5225,15 @@ func (m *SampledTransaction) AppendJSONFields(printComma bool, b redact.Redactab b = append(b, '}') } + if m.SkippedTransactions != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"SkippedTransactions\":"...) + b = strconv.AppendInt(b, int64(m.SkippedTransactions), 10) + } + return printComma, b } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 4dcb31224b9e..8850b2667597 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -327,7 +327,7 @@ message SampledExecStats { int64 cpu_sql_nanos = 6 [(gogoproto.customname) = "CPUSQLNanos", (gogoproto.jsontag) = "CPUSQLNanos,includeempty"]; // Internal storage iteration statistics. - MVCCIteratorStats mvcc_iterator_stats = 7 [(gogoproto.customname) = "MVCCIteratorStats", (gogoproto.jsontag) = ",omitempty"]; + MVCCIteratorStats mvcc_iterator_stats = 7 [(gogoproto.nullable) = false, (gogoproto.customname) = "MVCCIteratorStats"]; } // Internal storage iteration statistics for a single execution. @@ -457,27 +457,32 @@ message SampledTransaction { int64 num_rows = 18 [(gogoproto.jsontag) = ",includeempty"]; // RetryLatNanos is the amount of time spent retrying the transaction. - int64 retry_lat_nanos = 19 [(gogoproto.jsontag) = ",omitempty"]; + int64 retry_lat_nanos = 19 [(gogoproto.jsontag) = ",omitempty"]; - // CommitLatNanos is the amount of time spent committing the transaction after all statement operations. - int64 commit_lat_nanos = 20 [(gogoproto.jsontag) = ",includeempty"]; + // CommitLatNanos is the amount of time spent committing the transaction after all statement operations. + int64 commit_lat_nanos = 20 [(gogoproto.jsontag) = ",includeempty"]; - // IdleLatNanos is the amount of time spent waiting for the client to send statements + // IdleLatNanos is the amount of time spent waiting for the client to send statements // while the transaction is open. - int64 idle_lat_nanos = 21 [(gogoproto.jsontag) = ",includeempty"]; + int64 idle_lat_nanos = 21 [(gogoproto.jsontag) = ",includeempty"]; - // BytesRead is the number of bytes read from disk. - int64 bytes_read = 22 [(gogoproto.jsontag) = ",includeempty"]; + // BytesRead is the number of bytes read from disk. + int64 bytes_read = 22 [(gogoproto.jsontag) = ",includeempty"]; - // RowsRead is the number of rows read from disk. - int64 rows_read = 23 [(gogoproto.jsontag) = ",includeempty"]; + // RowsRead is the number of rows read from disk. + int64 rows_read = 23 [(gogoproto.jsontag) = ",includeempty"]; - // RowsWritten is the number of rows written to disk. - int64 rows_written = 24 [(gogoproto.jsontag) = ",includeempty"]; + // RowsWritten is the number of rows written to disk. + int64 rows_written = 24 [(gogoproto.jsontag) = ",includeempty"]; - // SampledExecStats is a nested field containing execution statistics. - // This field will be omitted if the stats were not sampled. - SampledExecStats sampled_exec_stats = 25 [(gogoproto.jsontag) = ",omitempty"]; + // SampledExecStats is a nested field containing execution statistics. + // This field will be omitted if the stats were not sampled. + SampledExecStats sampled_exec_stats = 25 [(gogoproto.jsontag) = ",omitempty"]; + + // SkippedTransactions is the number of transactions that were skipped as part of sampling prior to + // this one. We only count skipped transactions when telemetry logging is enabled and the sampling + // mode is set to "transaction". + int64 skipped_transactions = 26 [(gogoproto.jsontag) = ",omitempty"]; } // CapturedIndexUsageStats diff --git a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go index 7a92e4ea10c9..14b2b8696ad8 100644 --- a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go +++ b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go @@ -126,6 +126,14 @@ var includeByDefault = map[string]struct{}{ "KVRowsRead": {}, "IndexRecommendations": {}, "ScanCount": {}, + + // Transaction fields. + "Committed": {}, + "Implicit": {}, + "LastAutoRetryReason": {}, + "SkippedTransactions": {}, + "TransactionFingerprintID": {}, + "StatementFingerprintIDs": {}, } // printJSONMap prints a map as a JSON string. In the future we can @@ -150,10 +158,19 @@ type TelemetryLogSpy struct { syncutil.RWMutex logs []string filters []func(entry logpb.Entry) bool - format func(entry logpb.Entry) string + format func(entry logpb.Entry) (string, error) } } +func defaultFormatEntry(entry logpb.Entry) (string, error) { + var jsonMap map[string]interface{} + if err := json.Unmarshal([]byte(entry.Message[entry.StructuredStart:entry.StructuredEnd]), &jsonMap); err != nil { + return "", err + } + + return printJSONMap(jsonMap) +} + func NewSampledQueryLogScrubVolatileFields(testState *testing.T) *TelemetryLogSpy { s := &TelemetryLogSpy{ testState: testState, @@ -161,17 +178,19 @@ func NewSampledQueryLogScrubVolatileFields(testState *testing.T) *TelemetryLogSp s.mu.filters = append(s.mu.filters, func(entry logpb.Entry) bool { return strings.Contains(entry.Message, "sampled_query") }) - s.mu.format = func(entry logpb.Entry) string { - var jsonMap map[string]interface{} - if err := json.Unmarshal([]byte(entry.Message[entry.StructuredStart:entry.StructuredEnd]), &jsonMap); err != nil { - s.testState.Fatal(err) - } - out, err := printJSONMap(jsonMap) - if err != nil { - s.testState.Error(err) - } - return out + s.mu.format = defaultFormatEntry + + return s +} + +func NewSampledTransactionLogScrubVolatileFields(testState *testing.T) *TelemetryLogSpy { + s := &TelemetryLogSpy{ + testState: testState, } + s.mu.filters = append(s.mu.filters, func(entry logpb.Entry) bool { + return strings.Contains(entry.Message, "sampled_transaction") + }) + s.mu.format = defaultFormatEntry return s } @@ -216,7 +235,10 @@ func (s *TelemetryLogSpy) Intercept(entry []byte) { } } - formattedLogStr := s.mu.format(logEntry) + formattedLogStr, err := s.mu.format(logEntry) + if err != nil { + s.testState.Fatal(err) + } s.mu.Lock() defer s.mu.Unlock()