From 7b88f08c25e42784e2b45316c097f5d254f96f4c Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Thu, 28 Dec 2023 11:27:19 -0500 Subject: [PATCH 1/5] sql/telemetry: add SkippedTransactions to SampledTransaction proto This commit adds the field SkippedTransactions to the SampledTransaction protobuf to count the number of transactions that were not sampled while telemetry transaction logging is enabled. The corresponding field is added to the telemetryLogging struct and will be used in the following commit to track skipped transactions. Some whitespace in the SampledTransaction proto definition is adjusted. Epic: none Release note (sql change): New field `SkippedTransactions` in the SampledTransaction event, which is emitted to the TELEMETRY logging channel when telemetry logging is enabled and set to "transaction" mode. --- docs/generated/eventlog.md | 1 + pkg/sql/telemetry_logging.go | 11 +++++++ pkg/util/log/eventpb/json_encode_generated.go | 9 +++++ pkg/util/log/eventpb/telemetry.proto | 33 +++++++++++-------- 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index c7c033d9ddb8..0ded8b5faae1 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -3081,6 +3081,7 @@ An event of type `sampled_transaction` is the event logged to telemetry at the e | `RowsRead` | RowsRead is the number of rows read from disk. | no | | `RowsWritten` | RowsWritten is the number of rows written to disk. | no | | `SampledExecStats` | SampledExecStats is a nested field containing execution statistics. This field will be omitted if the stats were not sampled. | yes | +| `SkippedTransactions` | SkippedTransactions is the number of transactions that were skipped as part of sampling prior to this one. We only count skipped transactions when telemetry logging is enabled and the sampling mode is set to "transaction". | no | #### Common fields diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 4ce7f932c0dc..8ec6ef356531 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -125,6 +125,9 @@ type telemetryLoggingMetrics struct { // skippedQueryCount is used to produce the count of non-sampled queries. skippedQueryCount atomic.Uint64 + + // skippedTransactionCount is used to produce the count of non-sampled transactions. + skippedTransactionCount atomic.Uint64 } func newTelemetryLoggingMetrics( @@ -333,3 +336,11 @@ func (t *telemetryLoggingMetrics) resetLastSampledTime() { defer t.mu.Unlock() t.mu.lastSampledTime = time.Time{} } + +func (t *TelemetryLoggingMetrics) resetSkippedTransactionCount() (res uint64) { + return t.skippedTransactionCount.Swap(0) +} + +func (t *TelemetryLoggingMetrics) incSkippedTransactionCount() { + t.skippedTransactionCount.Add(1) +} diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index de5291200178..b5e403afabe4 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -5227,6 +5227,15 @@ func (m *SampledTransaction) AppendJSONFields(printComma bool, b redact.Redactab b = append(b, '}') } + if m.SkippedTransactions != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"SkippedTransactions\":"...) + b = strconv.AppendInt(b, int64(m.SkippedTransactions), 10) + } + return printComma, b } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 4dcb31224b9e..7a531302c893 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -457,27 +457,32 @@ message SampledTransaction { int64 num_rows = 18 [(gogoproto.jsontag) = ",includeempty"]; // RetryLatNanos is the amount of time spent retrying the transaction. - int64 retry_lat_nanos = 19 [(gogoproto.jsontag) = ",omitempty"]; + int64 retry_lat_nanos = 19 [(gogoproto.jsontag) = ",omitempty"]; - // CommitLatNanos is the amount of time spent committing the transaction after all statement operations. - int64 commit_lat_nanos = 20 [(gogoproto.jsontag) = ",includeempty"]; + // CommitLatNanos is the amount of time spent committing the transaction after all statement operations. + int64 commit_lat_nanos = 20 [(gogoproto.jsontag) = ",includeempty"]; - // IdleLatNanos is the amount of time spent waiting for the client to send statements + // IdleLatNanos is the amount of time spent waiting for the client to send statements // while the transaction is open. - int64 idle_lat_nanos = 21 [(gogoproto.jsontag) = ",includeempty"]; + int64 idle_lat_nanos = 21 [(gogoproto.jsontag) = ",includeempty"]; - // BytesRead is the number of bytes read from disk. - int64 bytes_read = 22 [(gogoproto.jsontag) = ",includeempty"]; + // BytesRead is the number of bytes read from disk. + int64 bytes_read = 22 [(gogoproto.jsontag) = ",includeempty"]; - // RowsRead is the number of rows read from disk. - int64 rows_read = 23 [(gogoproto.jsontag) = ",includeempty"]; + // RowsRead is the number of rows read from disk. + int64 rows_read = 23 [(gogoproto.jsontag) = ",includeempty"]; - // RowsWritten is the number of rows written to disk. - int64 rows_written = 24 [(gogoproto.jsontag) = ",includeempty"]; + // RowsWritten is the number of rows written to disk. + int64 rows_written = 24 [(gogoproto.jsontag) = ",includeempty"]; - // SampledExecStats is a nested field containing execution statistics. - // This field will be omitted if the stats were not sampled. - SampledExecStats sampled_exec_stats = 25 [(gogoproto.jsontag) = ",omitempty"]; + // SampledExecStats is a nested field containing execution statistics. + // This field will be omitted if the stats were not sampled. + SampledExecStats sampled_exec_stats = 25 [(gogoproto.jsontag) = ",omitempty"]; + + // SkippedTransactions is the number of transactions that were skipped as part of sampling prior to + // this one. We only count skipped transactions when telemetry logging is enabled and the sampling + // mode is set to "transaction". + int64 skipped_transactions = 26 [(gogoproto.jsontag) = ",omitempty"]; } // CapturedIndexUsageStats From d0d20afec038752e60aa16e80ec9093980e4a8fd Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Thu, 18 Jan 2024 14:16:25 -0500 Subject: [PATCH 2/5] eventpb: make MVCCIteratorStats in SampledExecStats non-nullable This field should always exist in SampledExecStats. Since SampledTransaction is the only user of this message right now and is yet to be used we can safely change the proto definition. Epic: none Release note: None --- pkg/util/log/eventpb/eventpbgen/gen.go | 12 +++++++++++- pkg/util/log/eventpb/json_encode_generated.go | 16 +++++++--------- pkg/util/log/eventpb/telemetry.proto | 2 +- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pkg/util/log/eventpb/eventpbgen/gen.go b/pkg/util/log/eventpb/eventpbgen/gen.go index 912ec436535e..c1b70874165b 100644 --- a/pkg/util/log/eventpb/eventpbgen/gen.go +++ b/pkg/util/log/eventpb/eventpbgen/gen.go @@ -84,6 +84,7 @@ type fieldInfo struct { Inherited bool IsEnum bool AllowZeroValue bool + Nullable bool } var ( @@ -380,6 +381,8 @@ func readInput( return errors.Newf("unknown field definition syntax: %q", line) } + notNullable := notNullableRe.MatchString(line) + // Allow zero values if the field is annotated with 'includeempty'. allowZeroValue := strings.Contains(line, "includeempty") @@ -461,6 +464,7 @@ func readInput( MixedRedactable: mixed, IsEnum: isEnum, AllowZeroValue: allowZeroValue, + Nullable: !notNullable, } curMsg.Fields = append(curMsg.Fields, fi) curMsg.AllFields = append(curMsg.AllFields, fi) @@ -492,6 +496,8 @@ var fieldDefRe = regexp.MustCompile(`\s*(?P[a-z._A-Z0-9]+)` + var reservedDefRe = regexp.MustCompile(`\s*(reserved ([1-9][0-9]*);)`) +var notNullableRe = regexp.MustCompile(`\s*\(\s*gogoproto\.nullable\s*\)\s*=\s*false`) + func camelToSnake(typeName string) string { var res strings.Builder res.WriteByte(typeName[0] + 'a' - 'A') @@ -712,13 +718,17 @@ func (m *{{.GoType}}) AppendJSONFields(printComma bool, b redact.RedactableBytes } } {{- else if eq .FieldType "nestedMessage"}} -if m.{{.FieldName}} != nil { + {{ if .Nullable -}} + if m.{{.FieldName}} != nil { + {{- end }} if printComma { b = append(b, ',')}; printComma = true b = append(b, "\"{{.FieldName}}\":"...) b = append(b, '{') printComma, b = m.{{.FieldName}}.AppendJSONFields(false, b) b = append(b, '}') + {{ if .Nullable -}} } + {{- end }} {{- else}} {{ error .FieldType }} {{- end}} diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index b5e403afabe4..ea3e473b0b47 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -4301,16 +4301,14 @@ func (m *SampledExecStats) AppendJSONFields(printComma bool, b redact.Redactable b = append(b, "\"CPUSQLNanos\":"...) b = strconv.AppendInt(b, int64(m.CPUSQLNanos), 10) - if m.MVCCIteratorStats != nil { - if printComma { - b = append(b, ',') - } - printComma = true - b = append(b, "\"MVCCIteratorStats\":"...) - b = append(b, '{') - printComma, b = m.MVCCIteratorStats.AppendJSONFields(false, b) - b = append(b, '}') + if printComma { + b = append(b, ',') } + printComma = true + b = append(b, "\"MVCCIteratorStats\":"...) + b = append(b, '{') + printComma, b = m.MVCCIteratorStats.AppendJSONFields(false, b) + b = append(b, '}') return printComma, b } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 7a531302c893..8850b2667597 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -327,7 +327,7 @@ message SampledExecStats { int64 cpu_sql_nanos = 6 [(gogoproto.customname) = "CPUSQLNanos", (gogoproto.jsontag) = "CPUSQLNanos,includeempty"]; // Internal storage iteration statistics. - MVCCIteratorStats mvcc_iterator_stats = 7 [(gogoproto.customname) = "MVCCIteratorStats", (gogoproto.jsontag) = ",omitempty"]; + MVCCIteratorStats mvcc_iterator_stats = 7 [(gogoproto.nullable) = false, (gogoproto.customname) = "MVCCIteratorStats"]; } // Internal storage iteration statistics for a single execution. From f741c2f54fadf2c82420e2e52ac1ba6c40e271c5 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Wed, 17 Jan 2024 13:28:57 -0500 Subject: [PATCH 3/5] sql/telemetry: add sync.Pool for SampledQuery objects Add a pool for SampledQuery objects. At high logging frequencies we can avoid an allocation per statement. Epic: none Release note: None --- pkg/sql/exec_log.go | 7 +++++-- pkg/sql/telemetry_logging.go | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 1c39244d0cdb..4ccf32dd38a5 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -355,7 +355,10 @@ func (p *planner) maybeLogStatementInternal( }) } - sampledQuery := eventpb.SampledQuery{ + sampledQuery := getSampledQuery() + defer releaseSampledQuery(sampledQuery) + + *sampledQuery = eventpb.SampledQuery{ CommonSQLExecDetails: execDetails, SkippedQueries: skippedQueries, CostEstimate: p.curPlan.instrumentation.costEstimate, @@ -431,7 +434,7 @@ func (p *planner) maybeLogStatementInternal( SchemaChangerMode: p.curPlan.instrumentation.schemaChangerMode.String(), } - p.logOperationalEventsOnlyExternally(ctx, &sampledQuery) + p.logOperationalEventsOnlyExternally(ctx, sampledQuery) } } diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 8ec6ef356531..60463ddee4e3 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -11,12 +11,14 @@ package sql import ( + "sync" "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -96,6 +98,23 @@ var telemetrySamplingMode = settings.RegisterEnumSetting( settings.WithPublic, ) +// SampledQuery objects are short-lived but can be +// allocated frequently if logging frequency is high. +var sampledQueryPool = sync.Pool{ + New: func() interface{} { + return new(eventpb.SampledQuery) + }, +} + +func getSampledQuery() *eventpb.SampledQuery { + return sampledQueryPool.Get().(*eventpb.SampledQuery) +} + +func releaseSampledQuery(sq *eventpb.SampledQuery) { + *sq = eventpb.SampledQuery{} + sampledQueryPool.Put(sq) +} + // TelemetryLoggingMetrics keeps track of the last time at which an event // was sampled to the telemetry channel, and the number of skipped events // since the last sampled event. From bbc19dae19b4986f158b759d9df5444ba088c34a Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Thu, 25 Jan 2024 14:16:22 -0600 Subject: [PATCH 4/5] datapathutils: update comment for `DebuggableTempDir` Improve some of the wording here. Also I accidentally wrote "temp" instead of "test" which is confusing. Epic: none Release note: None --- pkg/testutils/datapathutils/data_path.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/testutils/datapathutils/data_path.go b/pkg/testutils/datapathutils/data_path.go index f76bc6ca3e6b..fd88ec7f40a3 100644 --- a/pkg/testutils/datapathutils/data_path.go +++ b/pkg/testutils/datapathutils/data_path.go @@ -24,16 +24,19 @@ import ( ) // DebuggableTempDir returns a temporary directory that is appropriate for -// retaining leftover artifacts in case the temp fails for debugging. -// If the test is not executing remotely, or the test is not built with Bazel, -// then os.TempDir() is returned (the person executing the test should set up -// their $TMPDIR appropriately so they know where to find these leftover -// artifacts). If the test is executing remotely, then we instead use -// TEST_UNDECLARED_OUTPUTS_DIR, and if any files are left-over in this directory -// after the test exits, then Bazel will package them up into outputs.zip. -// This function is specifically for when we want to capture left-over artifacts -// after the test completes; if you are always going to clean up the files you -// create either way, testutils.TempDir() may be more convenient. +// retaining leftover artifacts for debugging in case the test fails. That makes +// it an appropriate replacement for os.TempDir() in the case where the test is +// executing remotely. If the test is not executing remotely, or the test is not +// built with Bazel, then os.TempDir() is returned (the person executing the +// test should set up their $TMPDIR appropriately so they know where to find +// these leftover artifacts). If the test is executing remotely, then we instead +// use TEST_UNDECLARED_OUTPUTS_DIR, and if any files are left-over in this +// directory after the test exits, then Bazel will package them up into +// outputs.zip. If you are always going to clean up the files you create either +// way, testutils.TempDir() may be more convenient. +// +// This also means that os.MkdirTemp(datapathutils.DebuggableTempDir(), ...) +// is appropriate as a drop-in replacement for os.MkdirTemp("", ...) func DebuggableTempDir() string { if bazel.BuiltWithBazel() { isRemote := os.Getenv("REMOTE_EXEC") From 9199146fff64039beec208f1ea3ee0d841c368ae Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Thu, 28 Dec 2023 11:37:33 -0500 Subject: [PATCH 5/5] telemetry: log transaction exec events to TELEMETRY This commit sends SampledTransaction events to the telemetry channel. In "transaction" sampling mode, if a transaction is marked to be logged to telemetry, we will emit a SampledTranaction event on transaction end containing transaction level stats. It is expected that if a transaction event exists in telemetry, its statement events will also have been logged (with a maximum number according to the setting sql.telemetry.transaction_sampling.statement_events_per_transaction.max). Transaction recording for telemetry is decided at the start of transaction execution (including on restarts), and will not be refreshed for the remainder of transaction execution. Closes: #108284 Release note (ops change): Transactions sampled for the telemetry logging channel will now emit a SampledTransaction event. To sample transactions, set the cluster setting `sql.telemetry.query_sampling.mode = 'transaction'` and enable telemetry logging via `sql.telemetry.query_sampling.enabled = true`. --- pkg/sql/conn_executor.go | 12 +- pkg/sql/conn_executor_exec.go | 18 +- pkg/sql/exec_log.go | 82 +++++- pkg/sql/telemetry_datadriven_test.go | 101 +++++-- pkg/sql/telemetry_logging.go | 66 +++-- .../telemetry_logging/logging/force_logging | 254 ++++++++++++++++++ .../logging/stmts_per_txn_limit | 61 +++++ .../logging/transaction_mode | 150 +++++++++++ .../logging_decisions/shouldEmitStatementLog | 55 ++-- .../shouldEmitTransactionLog | 127 +++++---- .../telemetry_logging_test_utils.go | 46 +++- 11 files changed, 833 insertions(+), 139 deletions(-) create mode 100644 pkg/sql/testdata/telemetry_logging/logging/force_logging diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index eda34c86ee9e..3223cbf2a2bd 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1562,6 +1562,10 @@ type connExecutor struct { // Note that the number of statement events emitted per transaction is // capped by the cluster setting telemetryStatementsPerTransactionMax. shouldLogToTelemetry bool + + // telemetrySkippedTxns contains the number of transactions skipped by + // telemetry logging prior to this one. + telemetrySkippedTxns uint64 } // sessionDataStack contains the user-configurable connection variables. @@ -2010,9 +2014,11 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay ex.state.mu.Lock() defer ex.state.mu.Unlock() ex.state.mu.stmtCount = 0 - tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled() - ex.extraTxnState.shouldLogToTelemetry = - ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal) + isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled() + ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns = + ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing, + ex.executorType == executorTypeInternal, + ex.applicationName.Load().(string)) } // NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 4085db8a8fdb..532bcd9002a0 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -3182,9 +3182,11 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { TxnFingerprintID: appstatspb.InvalidTransactionFingerprintID, }) - tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled() - ex.extraTxnState.shouldLogToTelemetry = - ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal) + isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled() + ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns = + ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing, + ex.executorType == executorTypeInternal, + ex.applicationName.Load().(string)) ex.state.mu.RLock() txnStart := ex.state.mu.txnStart @@ -3305,6 +3307,16 @@ func (ex *connExecutor) recordTransactionFinish( ex.maybeRecordRetrySerializableContention(ev.txnID, transactionFingerprintID, txnErr) + if ex.extraTxnState.shouldLogToTelemetry { + + ex.planner.logTransaction(ctx, + int(ex.extraTxnState.txnCounter.Load()), + transactionFingerprintID, + &recordedTxnStats, + ex.extraTxnState.telemetrySkippedTxns, + ) + } + return ex.statsCollector.RecordTransaction( ctx, transactionFingerprintID, diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 4ccf32dd38a5..d57733a739cc 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -179,7 +179,7 @@ func (p *planner) maybeLogStatementInternal( slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV) auditEventsDetected := len(p.curPlan.auditEventBuilders) != 0 logConsoleQuery := telemetryInternalConsoleQueriesEnabled.Get(&p.execCfg.Settings.SV) && - strings.HasPrefix(p.SessionData().ApplicationName, "$ internal-console") + strings.HasPrefix(p.SessionData().ApplicationName, internalConsoleAppName) // We only consider non-internal SQL statements for telemetry logging unless // the telemetryInternalQueriesEnabled is true. @@ -438,6 +438,86 @@ func (p *planner) maybeLogStatementInternal( } } +// logTransaction records the current transaction to the TELEMETRY channel. +func (p *planner) logTransaction( + ctx context.Context, + txnCounter int, + txnFingerprintID appstatspb.TransactionFingerprintID, + txnStats *sqlstats.RecordedTxnStats, + skippedTransactions uint64, +) { + + // Redact error messages. + var execErrStr, retryErr redact.RedactableString + sqlErrState := "" + if txnStats.TxnErr != nil { + execErrStr = redact.Sprint(txnStats.TxnErr) + sqlErrState = pgerror.GetPGCode(txnStats.TxnErr).String() + } + + if txnStats.AutoRetryReason != nil { + retryErr = redact.Sprint(txnStats.AutoRetryReason) + } + + sampledTxn := getSampledTransaction() + defer releaseSampledTransaction(sampledTxn) + + *sampledTxn = eventpb.SampledTransaction{ + SkippedTransactions: int64(skippedTransactions), + User: txnStats.SessionData.SessionUser().Normalized(), + ApplicationName: txnStats.SessionData.ApplicationName, + TxnCounter: uint32(txnCounter), + SessionID: txnStats.SessionID.String(), + TransactionID: txnStats.TransactionID.String(), + TransactionFingerprintID: txnFingerprintID, + Committed: txnStats.Committed, + ImplicitTxn: txnStats.ImplicitTxn, + StartTimeUnixNanos: txnStats.StartTime.UnixNano(), + EndTimeUnixNanos: txnStats.EndTime.UnixNano(), + ServiceLatNanos: txnStats.ServiceLatency.Nanoseconds(), + SQLSTATE: sqlErrState, + ErrorText: execErrStr, + NumRetries: txnStats.RetryCount, + LastAutoRetryReason: retryErr, + StatementFingerprintIDs: txnStats.StatementFingerprintIDs, + NumRows: int64(txnStats.RowsAffected), + RetryLatNanos: txnStats.RetryLatency.Nanoseconds(), + CommitLatNanos: txnStats.CommitLatency.Nanoseconds(), + IdleLatNanos: txnStats.IdleLatency.Nanoseconds(), + BytesRead: txnStats.BytesRead, + RowsRead: txnStats.RowsRead, + RowsWritten: txnStats.RowsWritten, + } + + if txnStats.CollectedExecStats { + sampledTxn.SampledExecStats = &eventpb.SampledExecStats{ + NetworkBytes: txnStats.ExecStats.NetworkBytesSent, + MaxMemUsage: txnStats.ExecStats.MaxMemUsage, + ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()), + NetworkMessages: txnStats.ExecStats.NetworkMessages, + MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage, + CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(), + MVCCIteratorStats: eventpb.MVCCIteratorStats{ + StepCount: txnStats.ExecStats.MvccSteps, + StepCountInternal: txnStats.ExecStats.MvccStepsInternal, + SeekCount: txnStats.ExecStats.MvccSeeks, + SeekCountInternal: txnStats.ExecStats.MvccSeeksInternal, + BlockBytes: txnStats.ExecStats.MvccBlockBytes, + BlockBytesInCache: txnStats.ExecStats.MvccBlockBytesInCache, + KeyBytes: txnStats.ExecStats.MvccKeyBytes, + ValueBytes: txnStats.ExecStats.MvccValueBytes, + PointCount: txnStats.ExecStats.MvccPointCount, + PointsCoveredByRangeTombstones: txnStats.ExecStats.MvccPointsCoveredByRangeTombstones, + RangeKeyCount: txnStats.ExecStats.MvccRangeKeyCount, + RangeKeyContainedPoints: txnStats.ExecStats.MvccRangeKeyContainedPoints, + RangeKeySkippedPoints: txnStats.ExecStats.MvccRangeKeySkippedPoints, + }, + } + } + + log.StructuredEvent(ctx, sampledTxn) +} + func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) { // The API contract for logEventsWithOptions() is that it returns // no error when system.eventlog is not written to. diff --git a/pkg/sql/telemetry_datadriven_test.go b/pkg/sql/telemetry_datadriven_test.go index ebeaa04e51a2..3e243e102af4 100644 --- a/pkg/sql/telemetry_datadriven_test.go +++ b/pkg/sql/telemetry_datadriven_test.go @@ -12,6 +12,7 @@ package sql import ( "context" + "fmt" "strconv" "strings" "testing" @@ -48,8 +49,9 @@ import ( * - unixSecs: sets the current time used for telemetry log sampling to the given unix time in seconds. * If omitted, the current time is automatically changed by 0.1 seconds. * - restartUnixSecs: sets the stub time on txn restarts. - * - * tracing: sets the tracing status to the given value + * - tracing: sets the tracing status to the given value. If omitted, the tracing status is set to false. + * - useRealTracing: if set, the real tracing status is used instead of the stubbed one. + * - user: sets the user for the connection. If omitted, the root user is used. * * reset-last-sampled: resets the last sampled time. */ @@ -65,18 +67,30 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { ctx := context.Background() stmtSpy := logtestutils.NewSampledQueryLogScrubVolatileFields(t) stmtSpy.AddFilter(func(ev logpb.Entry) bool { - return strings.Contains(ev.Message, appName) + return strings.Contains(ev.Message, appName) || strings.Contains(ev.Message, internalConsoleAppName) }) cleanup := log.InterceptWith(ctx, stmtSpy) defer cleanup() + txnsSpy := logtestutils.NewSampledTransactionLogScrubVolatileFields(t) + txnsSpy.AddFilter(func(ev logpb.Entry) bool { + return strings.Contains(ev.Message, appName) || strings.Contains(ev.Message, internalConsoleAppName) + }) + cleanupTxnSpy := log.InterceptWith(ctx, txnsSpy) + defer cleanupTxnSpy() + datadriven.Walk(t, datapathutils.TestDataPath(t, "telemetry_logging/logging"), func(t *testing.T, path string) { stmtSpy.Reset() + txnsSpy.Reset() st := logtestutils.StubTime{} st.SetTime(timeutil.FromUnixMicros(0)) sts := logtestutils.StubTracingStatus{} stubTimeOnRestart := int64(0) + telemetryKnobs := &TelemetryLoggingTestingKnobs{ + getTimeNow: st.TimeNow, + getTracingStatus: sts.TracingStatus, + } tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -85,10 +99,7 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { st.SetTime(timeutil.FromUnixMicros(stubTimeOnRestart * 1e6)) }, }, - TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ - getTimeNow: st.TimeNow, - getTracingStatus: sts.TracingStatus, - }, + TelemetryLoggingKnobs: telemetryKnobs, }, }, }) @@ -97,12 +108,21 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { telemetryLogging := s.SQLServer().(*Server).TelemetryLoggingMetrics setupConn := s.SQLConn(t) + _, err := setupConn.Exec("CREATE USER testuser") + require.NoError(t, err) + + spiedConnRootUser := s.SQLConn(t) + spiedConnTestUser := s.SQLConn(t, serverutils.User("testuser")) + spiedConn := spiedConnRootUser - spiedConn := s.SQLConn(t) - _, err := spiedConn.Exec("SET application_name = $1", appName) + // Set spied connections to the app name observed by the log spy. + _, err = spiedConn.Exec("SET application_name = $1", appName) + require.NoError(t, err) + _, err = spiedConnTestUser.Exec("SET application_name = $1", appName) require.NoError(t, err) datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + spiedConn = spiedConnRootUser switch d.Cmd { case "exec-sql": sts.SetTracingStatus(false) @@ -112,13 +132,22 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { } return "" case "spy-sql": - logCount := stmtSpy.Count() + stmtLogCount := stmtSpy.Count() + txnLogCount := txnsSpy.Count() var stubTimeUnixSecs float64 - var tracing bool + var tracing, useRealTracing bool d.MaybeScanArgs(t, "tracing", &tracing) sts.SetTracingStatus(tracing) + d.MaybeScanArgs(t, "useRealTracing", &useRealTracing) + if useRealTracing { + telemetryKnobs.getTracingStatus = nil + defer func() { + telemetryKnobs.getTracingStatus = sts.TracingStatus + }() + } + // Set stubbed stubbed time if this txn is restarted. scanned := d.MaybeScanArgs(t, "restartUnixSecs", &stubTimeOnRestart) if !scanned { @@ -136,18 +165,38 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { } st.SetTime(timeutil.FromUnixMicros(stubTimeMicros)) + // Setup the sql user. + user := "root" + d.MaybeScanArgs(t, "user", &user) + switch user { + case "root": + case "testuser": + spiedConn = spiedConnTestUser + } + // Execute query input. _, err := spiedConn.Exec(d.Input) + var sb strings.Builder + if err != nil { - return err.Error() + sb.WriteString(err.Error()) + sb.WriteString("\n") + } + + newStmtLogCount := stmtSpy.Count() + sb.WriteString(stmtSpy.GetLastNLogs(newStmtLogCount - stmtLogCount)) + if newStmtLogCount > stmtLogCount { + sb.WriteString("\n") } - // Display any new statement logs have been generated since executing the query. - newLogCount := stmtSpy.Count() - return stmtSpy.GetLastNLogs(newLogCount - logCount) + newTxnLogCount := txnsSpy.Count() + sb.WriteString(txnsSpy.GetLastNLogs(newTxnLogCount - txnLogCount)) + return sb.String() case "reset-last-sampled": telemetryLogging.resetLastSampledTime() return "" + case "show-skipped-transactions": + return strconv.FormatUint(telemetryLogging.getSkippedTransactionCount(), 10) default: t.Fatal("unknown command") return "" @@ -185,8 +234,9 @@ func TestTelemetryLoggingDataDriven(t *testing.T) { * shouldEmitTransactionLog: calls shouldEmitTransactionLog with the provided arguments * Args: * - unixSecs: stubbed sampling time in unix seconds - * - force: value for the 'force' param + * - isTracing: value for the 'isTracing' param * - isInternal: value for the 'isInternal' param + * - appName: value for the 'appName' param */ func TestTelemetryLoggingDecision(t *testing.T) { defer leaktest.AfterTest(t)() @@ -201,11 +251,12 @@ func TestTelemetryLoggingDecision(t *testing.T) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { t.Cleanup(func() { telemetryLogging.resetLastSampledTime() + telemetryLogging.resetCounters() }) switch d.Cmd { case "set-cluster-settings": - var telemLoggingEnabled, internalStmtsEnabled bool + var telemLoggingEnabled, internalStmtsEnabled, consoleQueriesEnabled bool var samplingMode string var stmtSampleFreq, txnSampleFreq int stmtsPerTxnMax := 10 @@ -215,6 +266,7 @@ func TestTelemetryLoggingDecision(t *testing.T) { d.MaybeScanArgs(t, "txnSampleFreq", &txnSampleFreq) d.MaybeScanArgs(t, "stmtsPerTxnMax", &stmtsPerTxnMax) d.MaybeScanArgs(t, "internalStmtsOn", &internalStmtsEnabled) + d.MaybeScanArgs(t, "telemetryInternalConsoleQueriesEnabled", &consoleQueriesEnabled) mode := telemetryModeStatement if samplingMode == "transaction" { @@ -226,6 +278,7 @@ func TestTelemetryLoggingDecision(t *testing.T) { telemetryTransactionSamplingFrequency.Override(ctx, &cs.SV, int64(txnSampleFreq)) telemetryLoggingEnabled.Override(ctx, &cs.SV, telemLoggingEnabled) telemetryInternalQueriesEnabled.Override(ctx, &cs.SV, internalStmtsEnabled) + telemetryInternalConsoleQueriesEnabled.Override(ctx, &cs.SV, consoleQueriesEnabled) return "" case "shouldEmitStatementLog": @@ -238,18 +291,20 @@ func TestTelemetryLoggingDecision(t *testing.T) { d.ScanArgs(t, "force", &force) st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6))) - shouldEmit, _ := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force) - return strconv.FormatBool(shouldEmit) + shouldEmit, skipped := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force) + return fmt.Sprintf(`emit: %t, skippedQueries: %d`, shouldEmit, skipped) case "shouldEmitTransactionLog": var unixSecs float64 - var force, isInternal bool + var isTracing, isInternal bool + var appName string d.ScanArgs(t, "unixSecs", &unixSecs) - d.ScanArgs(t, "force", &force) + d.ScanArgs(t, "isTracing", &isTracing) d.MaybeScanArgs(t, "isInternal", &isInternal) st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6))) + d.ScanArgs(t, "appName", &appName) - shouldEmit := telemetryLogging.shouldEmitTransactionLog(force, isInternal) - return strconv.FormatBool(shouldEmit) + shouldEmit, skipped := telemetryLogging.shouldEmitTransactionLog(isTracing, isInternal, appName) + return fmt.Sprintf(`emit: %t, skippedTxns: %d`, shouldEmit, skipped) default: t.Fatal("unknown command") return "" diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 60463ddee4e3..6554d940f0ef 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -11,6 +11,7 @@ package sql import ( + "strings" "sync" "sync/atomic" "time" @@ -26,7 +27,10 @@ import ( // Default value used to designate the maximum frequency at which events // are logged to the telemetry channel. -const defaultMaxEventFrequency = 8 +const ( + internalConsoleAppName = "$ internal-console" + defaultMaxEventFrequency = 8 +) var TelemetryMaxStatementEventFrequency = settings.RegisterIntSetting( settings.ApplicationLevel, @@ -115,6 +119,23 @@ func releaseSampledQuery(sq *eventpb.SampledQuery) { sampledQueryPool.Put(sq) } +// SampledTransaction objects are short-lived but can be +// allocated frequently if logging frequency is high. +var sampledTransactionPool = sync.Pool{ + New: func() interface{} { + return new(eventpb.SampledTransaction) + }, +} + +func getSampledTransaction() *eventpb.SampledTransaction { + return sampledTransactionPool.Get().(*eventpb.SampledTransaction) +} + +func releaseSampledTransaction(st *eventpb.SampledTransaction) { + *st = eventpb.SampledTransaction{} + sampledTransactionPool.Put(st) +} + // TelemetryLoggingMetrics keeps track of the last time at which an event // was sampled to the telemetry channel, and the number of skipped events // since the last sampled event. @@ -185,32 +206,39 @@ func NewTelemetryLoggingTestingKnobs( // and at least one of the following conditions is true: // - the transaction is not internal OR internal queries are enabled // - the required amount of time has elapsed since the last transaction began sampling -// - the force parameter is true and the transaction is not internal or sampling for internal statements -// is enabled +// - the transaction is from the console and telemetryInternalConsoleQueriesEnabled is true +// - the transaction has tracing enabled // // If the conditions are met, the last sampled time is updated. -func (t *telemetryLoggingMetrics) shouldEmitTransactionLog(forceSampling, isInternal bool) bool { +func (t *telemetryLoggingMetrics) shouldEmitTransactionLog( + isTracing, isInternal bool, applicationName string, +) (emit bool, skippedTxns uint64) { + // We should not increase the skipped transaction count if telemetry logging is disabled. if !telemetryLoggingEnabled.Get(&t.st.SV) { - return false + return false, t.skippedTransactionCount.Load() } if telemetrySamplingMode.Get(&t.st.SV) != telemetryModeTransaction { - return false + return false, t.skippedTransactionCount.Load() } - if isInternal && !telemetryInternalQueriesEnabled.Get(&t.st.SV) { - return false + logConsoleQuery := telemetryInternalConsoleQueriesEnabled.Get(&t.st.SV) && + strings.HasPrefix(applicationName, internalConsoleAppName) + if !logConsoleQuery && isInternal && !telemetryInternalQueriesEnabled.Get(&t.st.SV) { + return false, t.skippedTransactionCount.Load() } maxEventFrequency := telemetryTransactionSamplingFrequency.Get(&t.st.SV) if maxEventFrequency == 0 { - return false + return false, t.skippedTransactionCount.Load() } txnSampleTime := t.timeNow() + tracingEnabled := t.isTracing(nil, isTracing) - if forceSampling { + if logConsoleQuery || tracingEnabled { + // Force log. t.mu.Lock() defer t.mu.Unlock() t.mu.lastSampledTime = txnSampleTime - return true + return true, t.skippedTransactionCount.Swap(0) } requiredTimeElapsed := time.Second / time.Duration(maxEventFrequency) @@ -223,19 +251,19 @@ func (t *telemetryLoggingMetrics) shouldEmitTransactionLog(forceSampling, isInte }() if !enoughTimeElapsed { - return false + return false, t.skippedTransactionCount.Add(1) } t.mu.Lock() defer t.mu.Unlock() if txnSampleTime.Sub(t.mu.lastSampledTime) < requiredTimeElapsed { - return false + return false, t.skippedTransactionCount.Add(1) } t.mu.lastSampledTime = txnSampleTime - return true + return true, t.skippedTransactionCount.Swap(0) } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. @@ -356,10 +384,12 @@ func (t *telemetryLoggingMetrics) resetLastSampledTime() { t.mu.lastSampledTime = time.Time{} } -func (t *TelemetryLoggingMetrics) resetSkippedTransactionCount() (res uint64) { - return t.skippedTransactionCount.Swap(0) +// resetCounters resets the skipped query and transaction counters +func (t *telemetryLoggingMetrics) resetCounters() { + t.skippedQueryCount.Swap(0) + t.skippedTransactionCount.Swap(0) } -func (t *TelemetryLoggingMetrics) incSkippedTransactionCount() { - t.skippedTransactionCount.Add(1) +func (t *telemetryLoggingMetrics) getSkippedTransactionCount() uint64 { + return t.skippedTransactionCount.Load() } diff --git a/pkg/sql/testdata/telemetry_logging/logging/force_logging b/pkg/sql/testdata/telemetry_logging/logging/force_logging new file mode 100644 index 000000000000..1e46c966c952 --- /dev/null +++ b/pkg/sql/testdata/telemetry_logging/logging/force_logging @@ -0,0 +1,254 @@ +subtest txn_mode_tracing_on +# When tracing is on we should log all execution events. + +exec-sql +SET CLUSTER SETTING sql.telemetry.transaction_sampling.max_event_frequency = 10; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 100; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "transaction"; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true; +---- + +exec-sql +CREATE TABLE t() +---- + +spy-sql unixSecs=0.1 +SET TRACING = ON; +---- + +spy-sql unixSecs=0.1 useRealTracing=true +SELECT * FROM t LIMIT 1; +---- +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "full", + "EventType": "sampled_query", + "PlanGist": "AgHQAQIAAAAAAg==", + "ScanCount": 1, + "Statement": "SELECT * FROM \"\".\"\".t LIMIT ‹1›", + "StatementFingerprintID": 17888549871399373000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 17888549871399373000 + ], + "TransactionFingerprintID": 6278959165882708000, + "User": "root" +} + +spy-sql unixSecs=0.1 useRealTracing=true +SELECT 1, 2, 3; +---- +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "NumRows": 1, + "OutputRowsEstimate": 1, + "PlanGist": "AgICBgYG", + "Statement": "SELECT ‹1›, ‹2›, ‹3›", + "StatementFingerprintID": 16698501115058330000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16698501115058330000 + ], + "TransactionFingerprintID": 5250989384299556000, + "User": "root" +} + +spy-sql unixSecs=0.1 useRealTracing=true +SELECT 'hello'; +---- +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "NumRows": 1, + "OutputRowsEstimate": 1, + "PlanGist": "AgICAgYC", + "Statement": "SELECT ‹'hello'›", + "StatementFingerprintID": 15578946620736494000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 15578946620736494000 + ], + "TransactionFingerprintID": 8597429024882746000, + "User": "root" +} + +spy-sql unixSecs=0.1 +SET TRACING = off; +---- + +subtest end + +subtest txn_mode_console_query +# When tracing is on we should log all execution events. + +exec-sql +SET CLUSTER SETTING sql.telemetry.transaction_sampling.max_event_frequency = 10; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 100; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "transaction"; +---- + +exec-sql +SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true; +---- + +spy-sql unixSecs=0.1 +SET application_name = '$ internal-console-app'; +---- +{ + "ApplicationName": "$ internal-console-app", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "PlanGist": "Ais=", + "Statement": "SET application_name = ‹'$ internal-console-app'›", + "StatementFingerprintID": 13549091137440920000, + "StmtPosInTxn": 1, + "Tag": "SET", + "User": "root" +} + +spy-sql unixSecs=0.1 +SELECT * FROM t LIMIT 1; +---- +{ + "ApplicationName": "$ internal-console-app", + "Database": "defaultdb", + "Distribution": "full", + "EventType": "sampled_query", + "PlanGist": "AgHQAQIAAAAAAg==", + "ScanCount": 1, + "Statement": "SELECT * FROM \"\".\"\".t LIMIT ‹1›", + "StatementFingerprintID": 17888549871399373000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "$ internal-console-app", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 5, + "StatementFingerprintIDs": [ + 17888549871399373000 + ], + "TransactionFingerprintID": 6278959165882708000, + "User": "root" +} + +spy-sql unixSecs=0.1 +SELECT 1, 2, 3; +---- +{ + "ApplicationName": "$ internal-console-app", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "NumRows": 1, + "OutputRowsEstimate": 1, + "PlanGist": "AgICBgYG", + "Statement": "SELECT ‹1›, ‹2›, ‹3›", + "StatementFingerprintID": 16698501115058330000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "$ internal-console-app", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16698501115058330000 + ], + "TransactionFingerprintID": 5250989384299556000, + "User": "root" +} + +spy-sql unixSecs=0.1 +SELECT 'hello'; +---- +{ + "ApplicationName": "$ internal-console-app", + "Database": "defaultdb", + "Distribution": "local", + "EventType": "sampled_query", + "NumRows": 1, + "OutputRowsEstimate": 1, + "PlanGist": "AgICAgYC", + "Statement": "SELECT ‹'hello'›", + "StatementFingerprintID": 15578946620736494000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "root" +} +{ + "ApplicationName": "$ internal-console-app", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 15578946620736494000 + ], + "TransactionFingerprintID": 8597429024882746000, + "User": "root" +} + +subtest end diff --git a/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit b/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit index b659c487db0a..5cf2d9392b84 100644 --- a/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit +++ b/pkg/sql/testdata/telemetry_logging/logging/stmts_per_txn_limit @@ -32,6 +32,20 @@ BEGIN; SELECT 1; SELECT 2; COMMIT; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 2, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 14263644654231036000, + "User": "root" +} spy-sql SELECT 5; @@ -51,6 +65,19 @@ SELECT 5; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16085855936700856000 + ], + "TransactionFingerprintID": 8097417102642120000, + "User": "root" +} exec-sql SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 3; @@ -102,6 +129,23 @@ BEGIN; SELECT 1; SELECT 2; SELECT 3; SELECT 4; COMMIT; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 4, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000, + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 63991751604173224, + "User": "root" +} exec-sql SET CLUSTER SETTING sql.telemetry.transaction_sampling.statement_events_per_transaction.max = 4; @@ -167,3 +211,20 @@ BEGIN; SELECT 1; SELECT 2; SELECT 3; SELECT 4; COMMIT; "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 4, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000, + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 63991751604173224, + "User": "root" +} diff --git a/pkg/sql/testdata/telemetry_logging/logging/transaction_mode b/pkg/sql/testdata/telemetry_logging/logging/transaction_mode index cf6b9783c397..48f49ebe8c18 100644 --- a/pkg/sql/testdata/telemetry_logging/logging/transaction_mode +++ b/pkg/sql/testdata/telemetry_logging/logging/transaction_mode @@ -1,5 +1,7 @@ # Test that transaction mode for telemetry logging logs all stmts for a tracked txn. +subtest transaction_mode_events + exec-sql SET CLUSTER SETTING sql.telemetry.transaction_sampling.max_event_frequency = 10; ---- @@ -47,6 +49,19 @@ BEGIN; TRUNCATE t; COMMIT "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 802230861479752300 + ], + "TransactionFingerprintID": 11835923794509859000, + "User": "root" +} # Note that because we skip BEGIN statements in transaction mode, the first # statement logged from this transaction has SkippedQueries=1 even though @@ -108,6 +123,21 @@ BEGIN; SELECT 1; SELECT 2; SELECT 3; COMMIT "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 3, + "RowsRead": 0, + "RowsWritten": 0, + "StatementFingerprintIDs": [ + 16085855936700856000, + 16085855936700856000, + 16085855936700856000 + ], + "TransactionFingerprintID": 17099066530943440000, + "User": "root" +} # Skipped due to not enough time elapsed. spy-sql unixSecs=1 @@ -133,6 +163,20 @@ SELECT 1, 2 "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 1569305422589581000 + ], + "TransactionFingerprintID": 13449146770429469000, + "User": "root" +} # Skipped due to not enough time elapsed. spy-sql unixSecs=1.05 @@ -162,6 +206,20 @@ SELECT * FROM t LIMIT 2 "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 2, + "StatementFingerprintIDs": [ + 17888549871399373000 + ], + "TransactionFingerprintID": 6278959165882708000, + "User": "root" +} # Skipped, not enough time elapsed. spy-sql unixSecs=1.15 @@ -210,6 +268,23 @@ BEGIN; SELECT * FROM t LIMIT 4; SELECT * FROM t LIMIT 5; COMMIT "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 17888549871399373000, + 17888549871399373000 + ], + "TransactionFingerprintID": 5223910247858315000, + "User": "root" +} + +subtest end subtest txn_retry_is_sampled @@ -241,6 +316,20 @@ SELECT 1, 2, 3 "Tag": "SELECT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "NumRows": 1, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 16698501115058330000 + ], + "TransactionFingerprintID": 5250989384299556000, + "User": "root" +} # The stub time will only be advanced in the retry. # Note that the crdb_internal.force_retry will always appear in the logs due to @@ -294,5 +383,66 @@ COMMIT; "Tag": "COMMIT", "User": "root" } +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": true, + "EventType": "sampled_transaction", + "LastAutoRetryReason": "crdb_internal.force_retry(): TransactionRetryWithProtoRefreshError: forced by crdb_internal.force_retry()", + "NumRetries": 1, + "NumRows": 2, + "RowsRead": 0, + "RowsWritten": 0, + "SkippedTransactions": 1, + "StatementFingerprintIDs": [ + 15578946620736494000, + 9891387630896048000 + ], + "TransactionFingerprintID": 823601646009140400, + "User": "root" +} + +subtest end + +subtest redacts_sensitive_info + +reset-last-sampled +---- + + +# testuser name should be redacted +spy-sql user=testuser unixSecs=1 +SELECT 1/0 +---- +pq: division by zero +{ + "ApplicationName": "telemetry-logging-datadriven", + "Database": "defaultdb", + "Distribution": "local", + "ErrorText": "division by zero", + "EventType": "sampled_query", + "OutputRowsEstimate": 1, + "PlanGist": "AgICAgYC", + "SQLSTATE": "22012", + "Statement": "SELECT ‹1› / ‹0›", + "StatementFingerprintID": 16154612194363576000, + "StmtPosInTxn": 1, + "Tag": "SELECT", + "User": "‹testuser›" +} +{ + "ApplicationName": "telemetry-logging-datadriven", + "Committed": false, + "ErrorText": "division by zero", + "EventType": "sampled_transaction", + "NumRows": 0, + "RowsRead": 0, + "RowsWritten": 0, + "SQLSTATE": "22012", + "StatementFingerprintIDs": [ + 16154612194363576000 + ], + "TransactionFingerprintID": 5715924995226314000, + "User": "‹testuser›" +} subtest end diff --git a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog index a85c06980f2b..aceadf52d0b6 100644 --- a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog +++ b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitStatementLog @@ -6,19 +6,19 @@ set-cluster-settings telemetryLoggingEnabled=false samplingMode=statement stmtSa shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=true ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=2 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 subtest end @@ -30,19 +30,19 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=statement stmtSam shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=true ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=2 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 subtest end @@ -54,31 +54,31 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=statement stmtSam shouldEmitStatementLog unixSecs=1 isTrackedTxn=false stmtNum=1 force=false ---- -true +emit: true, skippedQueries: 0 shouldEmitStatementLog unixSecs=1.05 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 1 # Force should override the sampling frequency. shouldEmitStatementLog unixSecs=1.05 isTrackedTxn=false stmtNum=1 force=true ---- -true +emit: true, skippedQueries: 1 # Tracked txn is true but we are not in txn mode. shouldEmitStatementLog unixSecs=1.1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 1 # Enough time has elapsed. stmtNum is greater than stmtsPerTxnMax but we are not in txn mode. shouldEmitStatementLog unixSecs=1.15 isTrackedTxn=false stmtNum=1000 force=false ---- -true +emit: true, skippedQueries: 1 # Enough time has elapsed. shouldEmitStatementLog unixSecs=2 isTrackedTxn=false stmtNum=1 force=false ---- -true +emit: true, skippedQueries: 0 subtest end @@ -91,35 +91,35 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction stmtS shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -true +emit: true, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=2 force=false ---- -true +emit: true, skippedQueries: 0 shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=3 force=false ---- -true +emit: true, skippedQueries: 0 # Enough time has elapsed but stmtNum is greater than stmtsPerTxnMax. shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=4 force=false ---- -false +emit: false, skippedQueries: 1 # stmtNum is greater than stmtsPerTxnMax but force is true. shouldEmitStatementLog unixSecs=4 isTrackedTxn=true stmtNum=4 force=true ---- -true +emit: true, skippedQueries: 1 # Enough time has elapsed but txn is not tracked. shouldEmitStatementLog unixSecs=5 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 1 # Txn is not tracked but force is true. shouldEmitStatementLog unixSecs=6 isTrackedTxn=false stmtNum=1 force=true ---- -true +emit: true, skippedQueries: 1 subtest end @@ -131,27 +131,26 @@ set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction stmtS shouldEmitStatementLog unixSecs=1 isTrackedTxn=true stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=2 isTrackedTxn=true stmtNum=2 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=3 isTrackedTxn=true stmtNum=3 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=4 isTrackedTxn=true stmtNum=4 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=5 isTrackedTxn=false stmtNum=1 force=false ---- -false +emit: false, skippedQueries: 0 shouldEmitStatementLog unixSecs=6 isTrackedTxn=false stmtNum=1 force=true ---- -false +emit: false, skippedQueries: 0 subtest end - diff --git a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog index 9f4ee54cb81e..859e52f97b4f 100644 --- a/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog +++ b/pkg/sql/testdata/telemetry_logging/logging_decisions/shouldEmitTransactionLog @@ -5,21 +5,21 @@ subtest telemetry_disabled set-cluster-settings telemetryLoggingEnabled=false samplingMode=transaction stmtSampleFreq=10 txnSampleFreq=10 ---- -shouldEmitTransactionLog unixSecs=1 force=false +shouldEmitTransactionLog unixSecs=1 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=2 force=true +shouldEmitTransactionLog unixSecs=2 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=3 force=false +shouldEmitTransactionLog unixSecs=3 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=true +shouldEmitTransactionLog unixSecs=4 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 subtest end @@ -28,21 +28,21 @@ subtest telemetry_enabled_txn_freq_0 set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction txnSampleFreq=0 ---- -shouldEmitTransactionLog unixSecs=1 force=false +shouldEmitTransactionLog unixSecs=1 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=2 force=true +shouldEmitTransactionLog unixSecs=2 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=3 force=false +shouldEmitTransactionLog unixSecs=3 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=true +shouldEmitTransactionLog unixSecs=4 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 subtest end @@ -51,21 +51,21 @@ subtest telemetry_enabled_txn_mode_off set-cluster-settings telemetryLoggingEnabled=true samplingMode=statement txnSampleFreq=10 ---- -shouldEmitTransactionLog unixSecs=1 force=false +shouldEmitTransactionLog unixSecs=1 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=2 force=true +shouldEmitTransactionLog unixSecs=2 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=3 force=false +shouldEmitTransactionLog unixSecs=3 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=true +shouldEmitTransactionLog unixSecs=4 isTracing=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 subtest end @@ -74,34 +74,59 @@ subtest telemetry_enabled_txn_mode_on set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction txnSampleFreq=1 ---- -shouldEmitTransactionLog unixSecs=1 force=false +shouldEmitTransactionLog unixSecs=1 isTracing=false appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 # Not enough time elapsed. -shouldEmitTransactionLog unixSecs=1.5 force=false +shouldEmitTransactionLog unixSecs=1.5 isTracing=false appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 1 # Enough time elapsed. -shouldEmitTransactionLog unixSecs=2 force=false +shouldEmitTransactionLog unixSecs=2 isTracing=false appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 1 -# Force should override the sampling frequency. -shouldEmitTransactionLog unixSecs=2.5 force=true +# isTracing should override the sampling frequency. +shouldEmitTransactionLog unixSecs=2.5 isTracing=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 # Sampling internal queries is off. -shouldEmitTransactionLog unixSecs=4 force=false isInternal=true +shouldEmitTransactionLog unixSecs=4 isTracing=false isInternal=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=true isInternal=true +shouldEmitTransactionLog unixSecs=4 isTracing=true isInternal=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 0 + +subtest end + +subtest log_internal_console_queries +# Transactions with app name prefix '$ internal-console' should be force logged when +# sql.telemetry.query_sampling.internal_console.enabled is on. + +set-cluster-settings telemetryLoggingEnabled=true telemetryInternalConsoleQueriesEnabled=false samplingMode=transaction txnSampleFreq=1 +---- + +# Should return false because telemetryInternalConsoleQueriesEnabled is false and internal queries logging is off. +shouldEmitTransactionLog unixSecs=1 isTracing=false isInternal=true appName="$ internal-console" +---- +emit: false, skippedTxns: 0 + +set-cluster-settings telemetryLoggingEnabled=true telemetryInternalConsoleQueriesEnabled=true samplingMode=transaction txnSampleFreq=1 +---- + +shouldEmitTransactionLog unixSecs=1 isTracing=false isInternal=true appName=($ internal-console) +---- +emit: true, skippedTxns: 0 + +shouldEmitTransactionLog unixSecs=1 isTracing=false isInternal=true appName=($ internal-console) +---- +emit: true, skippedTxns: 0 subtest end @@ -111,32 +136,32 @@ subtest telemetry_enabled_txn_mode_on_internal_queries_on set-cluster-settings telemetryLoggingEnabled=true samplingMode=transaction txnSampleFreq=1 internalStmtsOn=true ---- -shouldEmitTransactionLog unixSecs=1 force=false isInternal=true +shouldEmitTransactionLog unixSecs=1 isTracing=false isInternal=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=1.5 force=false isInternal=true +shouldEmitTransactionLog unixSecs=1.5 isTracing=false isInternal=true appName="telemetry-logging" ---- -false +emit: false, skippedTxns: 1 -shouldEmitTransactionLog unixSecs=2 force=false isInternal=true +shouldEmitTransactionLog unixSecs=2 isTracing=false isInternal=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 1 -shouldEmitTransactionLog unixSecs=2.5 force=true isInternal=true +shouldEmitTransactionLog unixSecs=2.5 isTracing=true isInternal=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=2.5 force=true isInternal=true +shouldEmitTransactionLog unixSecs=2.5 isTracing=true isInternal=true appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=4 force=false isInternal=false +shouldEmitTransactionLog unixSecs=4 isTracing=false isInternal=false appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 -shouldEmitTransactionLog unixSecs=5 force=true isInternal=false +shouldEmitTransactionLog unixSecs=5 isTracing=true isInternal=false appName="telemetry-logging" ---- -true +emit: true, skippedTxns: 0 subtest end diff --git a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go index 7a92e4ea10c9..14b2b8696ad8 100644 --- a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go +++ b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go @@ -126,6 +126,14 @@ var includeByDefault = map[string]struct{}{ "KVRowsRead": {}, "IndexRecommendations": {}, "ScanCount": {}, + + // Transaction fields. + "Committed": {}, + "Implicit": {}, + "LastAutoRetryReason": {}, + "SkippedTransactions": {}, + "TransactionFingerprintID": {}, + "StatementFingerprintIDs": {}, } // printJSONMap prints a map as a JSON string. In the future we can @@ -150,10 +158,19 @@ type TelemetryLogSpy struct { syncutil.RWMutex logs []string filters []func(entry logpb.Entry) bool - format func(entry logpb.Entry) string + format func(entry logpb.Entry) (string, error) } } +func defaultFormatEntry(entry logpb.Entry) (string, error) { + var jsonMap map[string]interface{} + if err := json.Unmarshal([]byte(entry.Message[entry.StructuredStart:entry.StructuredEnd]), &jsonMap); err != nil { + return "", err + } + + return printJSONMap(jsonMap) +} + func NewSampledQueryLogScrubVolatileFields(testState *testing.T) *TelemetryLogSpy { s := &TelemetryLogSpy{ testState: testState, @@ -161,17 +178,19 @@ func NewSampledQueryLogScrubVolatileFields(testState *testing.T) *TelemetryLogSp s.mu.filters = append(s.mu.filters, func(entry logpb.Entry) bool { return strings.Contains(entry.Message, "sampled_query") }) - s.mu.format = func(entry logpb.Entry) string { - var jsonMap map[string]interface{} - if err := json.Unmarshal([]byte(entry.Message[entry.StructuredStart:entry.StructuredEnd]), &jsonMap); err != nil { - s.testState.Fatal(err) - } - out, err := printJSONMap(jsonMap) - if err != nil { - s.testState.Error(err) - } - return out + s.mu.format = defaultFormatEntry + + return s +} + +func NewSampledTransactionLogScrubVolatileFields(testState *testing.T) *TelemetryLogSpy { + s := &TelemetryLogSpy{ + testState: testState, } + s.mu.filters = append(s.mu.filters, func(entry logpb.Entry) bool { + return strings.Contains(entry.Message, "sampled_transaction") + }) + s.mu.format = defaultFormatEntry return s } @@ -216,7 +235,10 @@ func (s *TelemetryLogSpy) Intercept(entry []byte) { } } - formattedLogStr := s.mu.format(logEntry) + formattedLogStr, err := s.mu.format(logEntry) + if err != nil { + s.testState.Fatal(err) + } s.mu.Lock() defer s.mu.Unlock()