diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index fa678a315d5e..92a6c5499e84 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1572,6 +1572,10 @@ type connExecutor struct { // Note that the number of statement events emitted per transaction is // capped by the cluster setting telemetryStatementsPerTransactionMax. shouldLogToTelemetry bool + + // telemetrySkippedTxns contains the number of transactions skipped by + // telemetry logging prior to this one. + telemetrySkippedTxns uint64 } // sessionDataStack contains the user-configurable connection variables. @@ -2020,9 +2024,11 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay ex.state.mu.Lock() defer ex.state.mu.Unlock() ex.state.mu.stmtCount = 0 - tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled() - ex.extraTxnState.shouldLogToTelemetry = - ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal) + forceSampling := ex.server.TelemetryLoggingMetrics.shouldForceTxnSampling( + ex.applicationName.Load().(string), + ex.planner.ExtendedEvalContext().Tracing.Enabled()) + ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns = + ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(forceSampling, ex.executorType == executorTypeInternal) } // NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index c252e86868c4..6ce5bde08dbe 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -3175,9 +3175,11 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { TxnFingerprintID: appstatspb.InvalidTransactionFingerprintID, }) - tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled() - ex.extraTxnState.shouldLogToTelemetry = - ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal) + forceSampling := ex.server.TelemetryLoggingMetrics.shouldForceTxnSampling( + ex.applicationName.Load().(string), + ex.planner.ExtendedEvalContext().Tracing.Enabled()) + ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns = + ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(forceSampling, ex.executorType == executorTypeInternal) ex.state.mu.RLock() txnStart := ex.state.mu.txnStart @@ -3298,6 +3300,16 @@ func (ex *connExecutor) recordTransactionFinish( ex.maybeRecordRetrySerializableContention(ev.txnID, transactionFingerprintID, txnErr) + ex.planner.maybeLogTransaction(ctx, + ex.executorType, + int(ex.extraTxnState.txnCounter.Load()), + transactionFingerprintID, + &recordedTxnStats, + ex.server.TelemetryLoggingMetrics, + ex.extraTxnState.shouldLogToTelemetry, + ex.extraTxnState.telemetrySkippedTxns, + ) + return ex.statsCollector.RecordTransaction( ctx, transactionFingerprintID, diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 4ccf32dd38a5..953ad7d98022 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -438,6 +438,103 @@ func (p *planner) maybeLogStatementInternal( } } +// maybeLogTransaction conditionally records the current transaction +// to the TELEMETRY channel. +func (p *planner) maybeLogTransaction( + ctx context.Context, + execType executorType, + txnCounter int, + txnFingerprintID appstatspb.TransactionFingerprintID, + txnStats *sqlstats.RecordedTxnStats, + telemetryLoggingMetrics *telemetryLoggingMetrics, + trackedForTelemetry bool, + skippedTransactions uint64, +) { + isTxnTelemetryMode := telemetrySamplingMode.Get(&p.execCfg.Settings.SV) == telemetryModeTransaction + + // Exit if transaction telemetry logging is not enabled for this statement type. + // Note that this prevents us from incrementing the skipped transaction counter, which + // should only be incremented when telemetry is enabled. + if !telemetryLoggingEnabled.Get(&p.execCfg.Settings.SV) || !isTxnTelemetryMode || + (execType == executorTypeInternal && !telemetryInternalQueriesEnabled.Get(&p.execCfg.Settings.SV)) { + return + } + + if !trackedForTelemetry { + return + } + + // Redact error messages. + var execErrStr, retryErr redact.RedactableString + sqlErrState := "" + if txnStats.TxnErr != nil { + execErrStr = redact.Sprint(txnStats.TxnErr) + sqlErrState = pgerror.GetPGCode(txnStats.TxnErr).String() + } + + if txnStats.AutoRetryReason != nil { + retryErr = redact.Sprint(txnStats.AutoRetryReason) + } + + sampledTxn := getSampledTransaction() + defer releaseSampledTransaction(sampledTxn) + + *sampledTxn = eventpb.SampledTransaction{ + SkippedTransactions: int64(skippedTransactions), + User: txnStats.SessionData.SessionUser().Normalized(), + ApplicationName: txnStats.SessionData.ApplicationName, + TxnCounter: uint32(txnCounter), + SessionID: txnStats.SessionID.String(), + TransactionID: txnStats.TransactionID.String(), + TransactionFingerprintID: txnFingerprintID, + Committed: txnStats.Committed, + ImplicitTxn: txnStats.ImplicitTxn, + StartTimeUnixNanos: txnStats.StartTime.UnixNano(), + EndTimeUnixNanos: txnStats.EndTime.UnixNano(), + ServiceLatNanos: txnStats.ServiceLatency.Nanoseconds(), + SQLSTATE: sqlErrState, + ErrorText: execErrStr, + NumRetries: txnStats.RetryCount, + LastAutoRetryReason: retryErr, + StatementFingerprintIDs: txnStats.StatementFingerprintIDs, + NumRows: int64(txnStats.RowsAffected), + RetryLatNanos: txnStats.RetryLatency.Nanoseconds(), + CommitLatNanos: txnStats.CommitLatency.Nanoseconds(), + IdleLatNanos: txnStats.IdleLatency.Nanoseconds(), + BytesRead: txnStats.BytesRead, + RowsRead: txnStats.RowsRead, + RowsWritten: txnStats.RowsWritten, + } + + if txnStats.CollectedExecStats { + sampledTxn.SampledExecStats = &eventpb.SampledExecStats{ + NetworkBytes: txnStats.ExecStats.NetworkBytesSent, + MaxMemUsage: txnStats.ExecStats.MaxMemUsage, + ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()), + NetworkMessages: txnStats.ExecStats.NetworkMessages, + MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage, + CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(), + MVCCIteratorStats: eventpb.MVCCIteratorStats{ + StepCount: txnStats.ExecStats.MvccSteps, + StepCountInternal: txnStats.ExecStats.MvccStepsInternal, + SeekCount: txnStats.ExecStats.MvccSeeks, + SeekCountInternal: txnStats.ExecStats.MvccSeeksInternal, + BlockBytes: txnStats.ExecStats.MvccBlockBytes, + BlockBytesInCache: txnStats.ExecStats.MvccBlockBytesInCache, + KeyBytes: txnStats.ExecStats.MvccKeyBytes, + ValueBytes: txnStats.ExecStats.MvccValueBytes, + PointCount: txnStats.ExecStats.MvccPointCount, + PointsCoveredByRangeTombstones: txnStats.ExecStats.MvccPointsCoveredByRangeTombstones, + RangeKeyCount: txnStats.ExecStats.MvccRangeKeyCount, + RangeKeyContainedPoints: txnStats.ExecStats.MvccRangeKeyContainedPoints, + RangeKeySkippedPoints: txnStats.ExecStats.MvccRangeKeySkippedPoints, + }, + } + } + + log.StructuredEvent(ctx, sampledTxn) +} + func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) { // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. diff --git a/pkg/sql/telemetry_datadriven_test.go b/pkg/sql/telemetry_datadriven_test.go index ebeaa04e51a2..c243010e2ca4 100644 --- a/pkg/sql/telemetry_datadriven_test.go +++ b/pkg/sql/telemetry_datadriven_test.go @@ -12,6 +12,7 @@ package sql import ( "context" + "fmt" "strconv" "strings" "testing" @@ -70,6 +71,13 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() + txnsSpy := logtestutils.NewSampledTransactionLogScrubVolatileFields(t) + txnsSpy.AddFilter(func(ev logpb.Entry) bool { + return strings.Contains(ev.Message, appName) + }) + cleanupTxnSpy := log.InterceptWith(ctx, txnsSpy) + defer cleanupTxnSpy() + datadriven.Walk(t, datapathutils.TestDataPath(t, "telemetry_logging/logging"), func(t *testing.T, path string) { stmtSpy.Reset() @@ -97,12 +105,21 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { telemetryLogging := s.SQLServer().(*Server).TelemetryLoggingMetrics setupConn := s.SQLConn(t) + _, err := setupConn.Exec("CREATE USER testuser") + require.NoError(t, err) + + spiedConnRootUser := s.SQLConn(t) + spiedConnTestUser := s.SQLConn(t, serverutils.User("testuser")) + spiedConn := spiedConnRootUser - spiedConn := s.SQLConn(t) - _, err := spiedConn.Exec("SET application_name = $1", appName) + // Set spied connections to the app name observed by the log spy. + _, err = spiedConn.Exec("SET application_name = $1", appName) + require.NoError(t, err) + _, err = spiedConnTestUser.Exec("SET application_name = $1", appName) require.NoError(t, err) datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + spiedConn = spiedConnRootUser switch d.Cmd { case "exec-sql": sts.SetTracingStatus(false) @@ -112,7 +129,8 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { } return "" case "spy-sql": - logCount := stmtSpy.Count() + stmtLogCount := stmtSpy.Count() + txnLogCount := txnsSpy.Count() var stubTimeUnixSecs float64 var tracing bool @@ -136,18 +154,38 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { } st.SetTime(timeutil.FromUnixMicros(stubTimeMicros)) + // Setup the sql user. + user := "root" + d.MaybeScanArgs(t, "user", &user) + switch user { + case "root": + case "testuser": + spiedConn = spiedConnTestUser + } + // Execute query input. _, err := spiedConn.Exec(d.Input) + var sb strings.Builder + if err != nil { - return err.Error() + sb.WriteString(err.Error()) + sb.WriteString("\n") + } + + newStmtLogCount := stmtSpy.Count() + sb.WriteString(stmtSpy.GetLastNLogs(newStmtLogCount - stmtLogCount)) + if newStmtLogCount > stmtLogCount { + sb.WriteString("\n") } - // Display any new statement logs have been generated since executing the query. - newLogCount := stmtSpy.Count() - return stmtSpy.GetLastNLogs(newLogCount - logCount) + newTxnLogCount := txnsSpy.Count() + sb.WriteString(txnsSpy.GetLastNLogs(newTxnLogCount - txnLogCount)) + return sb.String() case "reset-last-sampled": telemetryLogging.resetLastSampledTime() return "" + case "show-skipped-transactions": + return strconv.FormatUint(telemetryLogging.getSkippedTransactionCount(), 10) default: t.Fatal("unknown command") return "" @@ -201,6 +239,7 @@ func TestTelemetryLoggingDecision(t *testing.T) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { t.Cleanup(func() { telemetryLogging.resetLastSampledTime() + telemetryLogging.resetCounters() }) switch d.Cmd { @@ -238,8 +277,8 @@ func TestTelemetryLoggingDecision(t *testing.T) { d.ScanArgs(t, "force", &force) st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6))) - shouldEmit, _ := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force) - return strconv.FormatBool(shouldEmit) + shouldEmit, skipped := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force) + return fmt.Sprintf(`emit: %t, skippedQueries: %d`, shouldEmit, skipped) case "shouldEmitTransactionLog": var unixSecs float64 var force, isInternal bool @@ -248,8 +287,8 @@ func TestTelemetryLoggingDecision(t *testing.T) { d.MaybeScanArgs(t, "isInternal", &isInternal) st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6))) - shouldEmit := telemetryLogging.shouldEmitTransactionLog(force, isInternal) - return strconv.FormatBool(shouldEmit) + shouldEmit, skipped := telemetryLogging.shouldEmitTransactionLog(force, isInternal) + return fmt.Sprintf(`emit: %t, skippedTxns: %d`, shouldEmit, skipped) default: t.Fatal("unknown command") return "" diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 60463ddee4e3..3fb03f3af136 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -11,6 +11,7 @@ package sql import ( + "strings" "sync" "sync/atomic" "time" @@ -115,6 +116,23 @@ func releaseSampledQuery(sq *eventpb.SampledQuery) { sampledQueryPool.Put(sq) } +// SampledTransaction objects are short-lived but can be +// allocated frequently if logging frequency is high. +var sampledTransactionPool = sync.Pool{ + New: func() interface{} { + return new(eventpb.SampledTransaction) + }, +} + +func getSampledTransaction() *eventpb.SampledTransaction { + return sampledTransactionPool.Get().(*eventpb.SampledTransaction) +} + +func releaseSampledTransaction(st *eventpb.SampledTransaction) { + *st = eventpb.SampledTransaction{} + sampledTransactionPool.Put(st) +} + // TelemetryLoggingMetrics keeps track of the last time at which an event // was sampled to the telemetry channel, and the number of skipped events // since the last sampled event. @@ -189,19 +207,22 @@ func NewTelemetryLoggingTestingKnobs( // is enabled // // If the conditions are met, the last sampled time is updated. -func (t *telemetryLoggingMetrics) shouldEmitTransactionLog(forceSampling, isInternal bool) bool { +func (t *telemetryLoggingMetrics) shouldEmitTransactionLog( + forceSampling, isInternal bool, +) (emit bool, skippedTxns uint64) { + // We should not increase the skipped transaction count if telemetry logging is disabled. if !telemetryLoggingEnabled.Get(&t.st.SV) { - return false + return false, t.skippedTransactionCount.Load() } if telemetrySamplingMode.Get(&t.st.SV) != telemetryModeTransaction { - return false + return false, t.skippedTransactionCount.Load() } if isInternal && !telemetryInternalQueriesEnabled.Get(&t.st.SV) { - return false + return false, t.skippedTransactionCount.Load() } maxEventFrequency := telemetryTransactionSamplingFrequency.Get(&t.st.SV) if maxEventFrequency == 0 { - return false + return false, t.skippedTransactionCount.Load() } txnSampleTime := t.timeNow() @@ -210,7 +231,7 @@ func (t *telemetryLoggingMetrics) shouldEmitTransactionLog(forceSampling, isInte t.mu.Lock() defer t.mu.Unlock() t.mu.lastSampledTime = txnSampleTime - return true + return true, t.skippedTransactionCount.Swap(0) } requiredTimeElapsed := time.Second / time.Duration(maxEventFrequency) @@ -223,19 +244,19 @@ func (t *telemetryLoggingMetrics) shouldEmitTransactionLog(forceSampling, isInte }() if !enoughTimeElapsed { - return false + return false, t.skippedTransactionCount.Add(1) } t.mu.Lock() defer t.mu.Unlock() if txnSampleTime.Sub(t.mu.lastSampledTime) < requiredTimeElapsed { - return false + return false, t.skippedTransactionCount.Add(1) } t.mu.lastSampledTime = txnSampleTime - return true + return true, t.skippedTransactionCount.Swap(0) } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. @@ -350,16 +371,34 @@ func (t *telemetryLoggingMetrics) isTracing(_ *tracing.Span, tracingEnabled bool return tracingEnabled } +func (t *telemetryLoggingMetrics) shouldForceTxnSampling( + applicationName string, tracingOn bool, +) bool { + if !telemetryLoggingEnabled.Get(&t.st.SV) { + return false + } + if telemetrySamplingMode.Get(&t.st.SV) != telemetryModeTransaction { + return false + } + + tracingEnabled := t.isTracing(nil, tracingOn) + logConsoleQuery := telemetryInternalConsoleQueriesEnabled.Get(&t.st.SV) && + strings.HasPrefix(applicationName, "$ internal-console") + return tracingEnabled || logConsoleQuery +} + func (t *telemetryLoggingMetrics) resetLastSampledTime() { t.mu.Lock() defer t.mu.Unlock() t.mu.lastSampledTime = time.Time{} } -func (t *TelemetryLoggingMetrics) resetSkippedTransactionCount() (res uint64) { - return t.skippedTransactionCount.Swap(0) +// resetCounters resets the skipped query and transaction counters +func (t *telemetryLoggingMetrics) resetCounters() { + t.skippedQueryCount.Swap(0) + t.skippedTransactionCount.Swap(0) } -func (t *TelemetryLoggingMetrics) incSkippedTransactionCount() { - t.skippedTransactionCount.Add(1) +func (t *telemetryLoggingMetrics) getSkippedTransactionCount() uint64 { + return t.skippedTransactionCount.Load() } diff --git a/pkg/sql/testdata/telemetry_logging/logging/skipped_transactions b/pkg/sql/testdata/telemetry_logging/logging/skipped_transactions new file mode 100644 index 000000000000..778660831159 --- /dev/null +++ b/pkg/sql/testdata/telemetry_logging/logging/skipped_transactions @@ -0,0 +1,83 @@ +# SkippedTransactions tracked by telemetry should only be incremented when +# telemetry logging is enabled for transactions. + +subtest telemetry_logging_off + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = false +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "transaction" +---- + +exec-sql +CREATE TABLE t (a INT) +---- + + +# Logged if telemetry transaction sampling was enabled. +spy-sql unixSedcs=1 +BEGIN; TRUNCATE t; COMMIT +---- + +# Skipped if telemetry transaction sampling was enabled. +spy-sql unixSedcs=1 +SELECT 1, 2 +---- + +# Skipped if telemetry transaction sampling was enabled. +spy-sql unixSedcs=1.01 +SELECT * FROM t LIMIT 1 +---- + +show-skipped-transactions +---- +0 + +subtest end + +subtest telemetry_logging_on_stmt_mode + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "statement" +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true +---- + + +# Note that only a statement log appears. +spy-sql unixSedcs=1 +BEGIN; TRUNCATE t; COMMIT +---- +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "PlanGist": "Ais=", + "SkippedQueries": 1, + "Statement": "TRUNCATE TABLE defaultdb.public.t", + "StatementFingerprintID": 802230861479752300, + "StmtPosInTxn": 1, + "Tag": "TRUNCATE", + "User": "root" +} + +# Skipped if telemetry transaction sampling was enabled. +spy-sql unixSedcs=1 +SELECT 1, 2 +---- + +# Skipped if telemetry transaction sampling was enabled. +spy-sql unixSedcs=1.01 +SELECT * FROM t LIMIT 1 +---- + +show-skipped-transactions +---- +0 + +subtest end diff --git a/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit b/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit index b659c487db0a..5cf2d9392b84 100644 --- a/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit +++ b/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit @@ -32,6 +32,20 @@ BEGIN; SELECT 1; SELECT 2; COMMIT; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 2, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 14263644654231036000, + "User": "root" +} spy-sql SELECT 5; @@ -51,6 +65,19 @@ SELECT 5; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16085855936700856000 + ], + "TransactionFingerprintID": 8097417102642120000, + "User": "root" +} exec-sql SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 3; @@ -102,6 +129,23 @@ BEGIN; SELECT 1; SELECT 2; SELECT 3; SELECT 4; COMMIT; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 4, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000, + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 63991751604173224, + "User": "root" +} exec-sql SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 4; @@ -167,3 +211,20 @@ BEGIN; SELECT 1; SELECT 2; SELECT 3; SELECT 4; COMMIT; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 4, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000, + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 63991751604173224, + "User": "root" +} diff --git a/pkg/sql/testdata/telemetry_logging/logging/transaction_mode b/pkg/sql/testdata/telemetry_logging/logging/transaction_mode index cf6b9783c397..48f49ebe8c18 100644 --- a/pkg/sql/testdata/telemetry_logging/logging/transaction_mode +++ b/pkg/sql/testdata/telemetry_logging/logging/transaction_mode @@ -1,5 +1,7 @@ # Test that transaction mode for telemetry logging logs all stmts for a tracked txn. +subtest transaction_mode_events + exec-sql SET CLUSTER SETTING sql.telemetry.transaction_sampling.max_event_frequency = 10; ---- @@ -47,6 +49,19 @@ BEGIN; TRUNCATE t; COMMIT "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 802230861479752300 + ], + "TransactionFingerprintID": 11835923794509859000, + "User": "root" +} # Note that because we skip BEGIN statements in transaction mode, the first # statement logged from this transaction has SkippedQueries=1 even though @@ -108,6 +123,21 @@ BEGIN; SELECT 1; SELECT 2; SELECT 3; COMMIT "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 3, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 17099066530943440000, + "User": "root" +} # Skipped due to not enough time elapsed. spy-sql unixSecs=1 @@ -133,6 +163,20 @@ SELECT 1, 2 "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 1569305422589581000 + ], + "TransactionFingerprintID": 13449146770429469000, + "User": "root" +} # Skipped due to not enough time elapsed. spy-sql unixSecs=1.05 @@ -162,6 +206,20 @@ SELECT * FROM t LIMIT 2 "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 2, + "StatementFingerprintIDs": [ + 17888549871399373000 + ], + "TransactionFingerprintID": 6278959165882708000, + "User": "root" +} # Skipped, not enough time elapsed. spy-sql unixSecs=1.15 @@ -210,6 +268,23 @@ BEGIN; SELECT * FROM t LIMIT 4; SELECT * FROM t LIMIT 5; COMMIT "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 17888549871399373000, + 17888549871399373000 + ], + "TransactionFingerprintID": 5223910247858315000, + "User": "root" +} + +subtest end subtest txn_retry_is_sampled @@ -241,6 +316,20 @@ SELECT 1, 2, 3 "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 16698501115058330000 + ], + "TransactionFingerprintID": 5250989384299556000, + "User": "root" +} # The stub time will only be advanced in the retry. # Note that the crdb_internal.force_retry will always appear in the logs due to @@ -294,5 +383,66 @@ COMMIT; "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "LastAutoRetryReason": "crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry()", + "NumRetries": 1, + "NumRows": 2, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 15578946620736494000, + 9891387630896048000 + ], + "TransactionFingerprintID": 823601646009140400, + "User": "root" +} + +subtest end + +subtest redacts_sensitive_info + +reset-last-sampled +---- + + +# testuser name should be redacted +spy-sql user=testuser unixSecs=1 +SELECT 1/0 +---- +pq: division by zero +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "local", + "ErrorText": "division by zero", + "EventType": "sampled_query", + "OutputRowsEstimate": 1, + "PlanGist": "AgICAgYC", + "SQLSTATE": "22012", + "Statement": "SELECT ‹1› / ‹0›", + "StatementFingerprintID": 16154612194363576000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "‹testuser›" +} +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": false, + "ErrorText": "division by zero", + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SQLSTATE": "22012", + "StatementFingerprintIDs": [ + 16154612194363576000 + ], + "TransactionFingerprintID": 5715924995226314000, + "User": "‹testuser›" +} subtest end diff --git a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog index a85c06980f2b..aceadf52d0b6 100644 --- a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog +++ b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog @@ -6,19 +6,19 @@ set-cluster-settings telemetryLoggingEnabled=false samplingMode=statement stmtSa shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=true ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=2 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 subtest end @@ -30,19 +30,19 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=statement stmtSam shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=true ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=2 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 subtest end @@ -54,31 +54,31 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=statement stmtSam shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=false ---- -true +emit: true, skippedQueries: 0 shouldEmitStatementLog unixSecs=1.05 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 1 # Force should override the sampling frequency. shouldEmitStatementLog unixSecs=1.05 isTrackedTxn=false stmtNum=1 force=true ---- -true +emit: true, skippedQueries: 1 # Tracked txn is true but we are not in txn mode. shouldEmitStatementLog unixSecs=1.1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 1 # Enough time has elapsed. stmtNum is greater than stmtsPerTxnMax but we are not in txn mode. shouldEmitStatementLog unixSecs=1.15 isTrackedTxn=false stmtNum=1000 force=false ---- -true +emit: true, skippedQueries: 1 # Enough time has elapsed. shouldEmitStatementLog unixSecs=2 isTrackedTxn=false stmtNum=1 force=false ---- -true +emit: true, skippedQueries: 0 subtest end @@ -91,35 +91,35 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction stmtS shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -true +emit: true, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=2 force=false ---- -true +emit: true, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=3 force=false ---- -true +emit: true, skippedQueries: 0 # Enough time has elapsed but stmtNum is greater than stmtsPerTxnMax. shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=4 force=false ---- -false +emit: false, skippedQueries: 1 # stmtNum is greater than stmtsPerTxnMax but force is true. shouldEmitStatementLog unixSecs=4 isTrackedTxn=true stmtNum=4 force=true ---- -true +emit: true, skippedQueries: 1 # Enough time has elapsed but txn is not tracked. shouldEmitStatementLog unixSecs=5 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 1 # Txn is not tracked but force is true. shouldEmitStatementLog unixSecs=6 isTrackedTxn=false stmtNum=1 force=true ---- -true +emit: true, skippedQueries: 1 subtest end @@ -131,27 +131,26 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction stmtS shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=2 isTrackedTxn=true stmtNum=2 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=3 isTrackedTxn=true stmtNum=3 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=4 isTrackedTxn=true stmtNum=4 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=5 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=6 isTrackedTxn=false stmtNum=1 force=true ---- -false +emit: false, skippedQueries: 0 subtest end - diff --git a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog index 9f4ee54cb81e..a1232514e71a 100644 --- a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog +++ b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog @@ -7,19 +7,19 @@ set-cluster-settings telemetryLoggingEnabled=false samplingMode=transaction stmt shouldEmitTransactionLog unixSecs=1 force=false ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=2 force=true ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=3 force=false ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=4 force=true ---- -false +emit: false, skippedTxns: 0 subtest end @@ -30,19 +30,19 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction txnSa shouldEmitTransactionLog unixSecs=1 force=false ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=2 force=true ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=3 force=false ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=4 force=true ---- -false +emit: false, skippedTxns: 0 subtest end @@ -53,19 +53,19 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=statement txnSamp shouldEmitTransactionLog unixSecs=1 force=false ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=2 force=true ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=3 force=false ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=4 force=true ---- -false +emit: false, skippedTxns: 0 subtest end @@ -76,32 +76,32 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction txnSa shouldEmitTransactionLog unixSecs=1 force=false ---- -true +emit: true, skippedTxns: 0 # Not enough time elapsed. shouldEmitTransactionLog unixSecs=1.5 force=false ---- -false +emit: false, skippedTxns: 1 # Enough time elapsed. shouldEmitTransactionLog unixSecs=2 force=false ---- -true +emit: true, skippedTxns: 1 # Force should override the sampling frequency. shouldEmitTransactionLog unixSecs=2.5 force=true ---- -true +emit: true, skippedTxns: 0 # Sampling internal queries is off. shouldEmitTransactionLog unixSecs=4 force=false isInternal=true ---- -false +emit: false, skippedTxns: 0 shouldEmitTransactionLog unixSecs=4 force=true isInternal=true ---- -false +emit: false, skippedTxns: 0 subtest end @@ -113,30 +113,30 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction txnSa shouldEmitTransactionLog unixSecs=1 force=false isInternal=true ---- -true +emit: true, skippedTxns: 0 shouldEmitTransactionLog unixSecs=1.5 force=false isInternal=true ---- -false +emit: false, skippedTxns: 1 shouldEmitTransactionLog unixSecs=2 force=false isInternal=true ---- -true +emit: true, skippedTxns: 1 shouldEmitTransactionLog unixSecs=2.5 force=true isInternal=true ---- -true +emit: true, skippedTxns: 0 shouldEmitTransactionLog unixSecs=2.5 force=true isInternal=true ---- -true +emit: true, skippedTxns: 0 shouldEmitTransactionLog unixSecs=4 force=false isInternal=false ---- -true +emit: true, skippedTxns: 0 shouldEmitTransactionLog unixSecs=5 force=true isInternal=false ---- -true +emit: true, skippedTxns: 0 subtest end diff --git a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go index 7a92e4ea10c9..14b2b8696ad8 100644 --- a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go +++ b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go @@ -126,6 +126,14 @@ var includeByDefault = map[string]struct{}{ "KVRowsRead": {}, "IndexRecommendations": {}, "ScanCount": {}, + + // Transaction fields. + "Committed": {}, + "Implicit": {}, + "LastAutoRetryReason": {}, + "SkippedTransactions": {}, + "TransactionFingerprintID": {}, + "StatementFingerprintIDs": {}, } // printJSONMap prints a map as a JSON string. In the future we can @@ -150,10 +158,19 @@ type TelemetryLogSpy struct { syncutil.RWMutex logs []string filters []func(entry logpb.Entry) bool - format func(entry logpb.Entry) string + format func(entry logpb.Entry) (string, error) } } +func defaultFormatEntry(entry logpb.Entry) (string, error) { + var jsonMap map[string]interface{} + if err := json.Unmarshal([]byte(entry.Message[entry.StructuredStart:entry.StructuredEnd]), &jsonMap); err != nil { + return "", err + } + + return printJSONMap(jsonMap) +} + func NewSampledQueryLogScrubVolatileFields(testState *testing.T) *TelemetryLogSpy { s := &TelemetryLogSpy{ testState: testState, @@ -161,17 +178,19 @@ func NewSampledQueryLogScrubVolatileFields(testState *testing.T) *TelemetryLogSp s.mu.filters = append(s.mu.filters, func(entry logpb.Entry) bool { return strings.Contains(entry.Message, "sampled_query") }) - s.mu.format = func(entry logpb.Entry) string { - var jsonMap map[string]interface{} - if err := json.Unmarshal([]byte(entry.Message[entry.StructuredStart:entry.StructuredEnd]), &jsonMap); err != nil { - s.testState.Fatal(err) - } - out, err := printJSONMap(jsonMap) - if err != nil { - s.testState.Error(err) - } - return out + s.mu.format = defaultFormatEntry + + return s +} + +func NewSampledTransactionLogScrubVolatileFields(testState *testing.T) *TelemetryLogSpy { + s := &TelemetryLogSpy{ + testState: testState, } + s.mu.filters = append(s.mu.filters, func(entry logpb.Entry) bool { + return strings.Contains(entry.Message, "sampled_transaction") + }) + s.mu.format = defaultFormatEntry return s } @@ -216,7 +235,10 @@ func (s *TelemetryLogSpy) Intercept(entry []byte) { } } - formattedLogStr := s.mu.format(logEntry) + formattedLogStr, err := s.mu.format(logEntry) + if err != nil { + s.testState.Fatal(err) + } s.mu.Lock() defer s.mu.Unlock()